Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Some Leafnode issues #1106

Merged
merged 1 commit into from
Aug 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 39 additions & 15 deletions server/leafnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,11 @@ type leaf struct {
type leafNodeCfg struct {
sync.RWMutex
*RemoteLeafOpts
urls []*url.URL
curURL *url.URL
tlsName string
urls []*url.URL
curURL *url.URL
tlsName string
username string
password string
}

// Check to see if this is a solicited leafnode. We do special processing for solicited.
Expand Down Expand Up @@ -117,8 +119,11 @@ func newLeafNodeCfg(remote *RemoteLeafOpts) *leafNodeCfg {
// array when receiving async leafnode INFOs.
cfg.urls = append(cfg.urls, cfg.URLs...)
// If we are TLS make sure we save off a proper servername if possible.
// Do same for user/password since we may need them to connect to
// a bare URL that we get from INFO protocol.
for _, u := range cfg.urls {
cfg.saveTLSHostname(u)
cfg.saveUserPassword(u)
}
return cfg
}
Expand Down Expand Up @@ -225,10 +230,19 @@ func (s *Server) connectToRemoteLeafNode(remote *leafNodeCfg, firstConnect bool)

// Save off the tlsName for when we use TLS and mix hostnames and IPs. IPs usually
// come from the server we connect to.
func (lcfg *leafNodeCfg) saveTLSHostname(u *url.URL) {
isTLS := lcfg.TLSConfig != nil || u.Scheme == "tls"
if isTLS && lcfg.tlsName == "" && net.ParseIP(u.Hostname()) == nil {
lcfg.tlsName = u.Hostname()
func (cfg *leafNodeCfg) saveTLSHostname(u *url.URL) {
isTLS := cfg.TLSConfig != nil || u.Scheme == "tls"
if isTLS && cfg.tlsName == "" && net.ParseIP(u.Hostname()) == nil {
cfg.tlsName = u.Hostname()
}
}

// Save off the username/password for when we connect using a bare URL
// that we get from the INFO protocol.
func (cfg *leafNodeCfg) saveUserPassword(u *url.URL) {
if cfg.username == _EMPTY_ && u.User != nil {
cfg.username = u.User.Username()
cfg.password, _ = u.User.Password()
}
}

Expand Down Expand Up @@ -384,8 +398,10 @@ func (c *client) sendLeafConnect(tlsRequired bool) {
cinfo.Sig = sig
} else if userInfo := c.leaf.remote.curURL.User; userInfo != nil {
cinfo.User = userInfo.Username()
pass, _ := userInfo.Password()
cinfo.Pass = pass
cinfo.Pass, _ = userInfo.Password()
} else if c.leaf.remote.username != _EMPTY_ {
cinfo.User = c.leaf.remote.username
cinfo.Pass = c.leaf.remote.password
}

b, err := json.Marshal(cinfo)
Expand Down Expand Up @@ -489,23 +505,30 @@ func (s *Server) createLeafNode(conn net.Conn, remote *leafNodeCfg) *client {
// Determines if we are soliciting the connection or not.
var solicited bool

c.mu.Lock()
c.initClient()
if remote != nil {
solicited = true
// Users can bind to any local account, if its empty
// we will assume the $G account.
if remote.LocalAccount == "" {
remote.LocalAccount = globalAccountName
}
// FIXME(dlc) - Make this resolve at startup.
c.leaf.remote = remote
c.mu.Unlock()
// TODO: Decide what should be the optimal behavior here.
// For now, if lookup fails, we will constantly try
// to recreate this LN connection.
acc, err := s.LookupAccount(remote.LocalAccount)
if err != nil {
c.Debugf("No local account %q for leafnode", remote.LocalAccount)
c.Errorf("No local account %q for leafnode: %v", remote.LocalAccount, err)
c.closeConnection(MissingAccount)
return nil
}
c.mu.Lock()
c.acc = acc
c.leaf.remote = remote
}
c.mu.Unlock()

var nonce [nonceLen]byte

Expand All @@ -520,8 +543,6 @@ func (s *Server) createLeafNode(conn net.Conn, remote *leafNodeCfg) *client {
// Grab lock
c.mu.Lock()

c.initClient()

if solicited {
// We need to wait here for the info, but not for too long.
c.nc.SetReadDeadline(time.Now().Add(DEFAULT_LEAFNODE_INFO_WAIT))
Expand Down Expand Up @@ -725,7 +746,10 @@ func (c *client) updateLeafNodeURLs(info *Info) {
// Do not add if it's the same as what we already have configured.
var dup bool
for _, u := range cfg.URLs {
if urlsAreEqual(url, u) {
// URLs that we receive never have user info, but the
// ones that were configured may have. Simply compare
// host and port to decide if they are equal or not.
if url.Host == u.Host && url.Port() == u.Port() {
dup = true
break
}
Expand Down
138 changes: 138 additions & 0 deletions server/leafnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"net/url"
"os"
"strings"
"sync/atomic"
"testing"
"time"
)
Expand Down Expand Up @@ -261,3 +262,140 @@ func TestLeafNodeTLSRemoteWithNoCerts(t *testing.T) {
t.Fatalf("Expected %v, got: %v", expected, got)
}
}

type captureErrorLogger struct {
DummyLogger
errCh chan string
}

func (l *captureErrorLogger) Errorf(format string, v ...interface{}) {
select {
case l.errCh <- fmt.Sprintf(format, v...):
default:
}
}

func TestLeafNodeAccountNotFound(t *testing.T) {
ob := DefaultOptions()
ob.LeafNode.Host = "127.0.0.1"
ob.LeafNode.Port = -1
sb := RunServer(ob)
defer sb.Shutdown()

u, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", ob.LeafNode.Port))

logFileName := createConfFile(t, []byte(""))
defer os.Remove(logFileName)

oa := DefaultOptions()
oa.LeafNode.ReconnectInterval = 15 * time.Millisecond
oa.LeafNode.Remotes = []*RemoteLeafOpts{
{
LocalAccount: "foo",
URLs: []*url.URL{u},
},
}
// Expected to fail
if _, err := NewServer(oa); err == nil || !strings.Contains(err.Error(), "local account") {
t.Fatalf("Expected server to fail with error about no local account, got %v", err)
}
oa.Accounts = []*Account{NewAccount("foo")}
sa := RunServer(oa)
defer sa.Shutdown()

l := &captureErrorLogger{errCh: make(chan string, 1)}
sa.SetLogger(l, false, false)

checkLeafNodeConnected(t, sa)

// Now simulate account is removed with config reload, or it expires.
sa.accounts.Delete("foo")

// Restart B (with same Port)
sb.Shutdown()
sb = RunServer(ob)
defer sb.Shutdown()

// Wait for report of error
select {
case e := <-l.errCh:
if !strings.Contains(e, "No local account") {
t.Fatalf("Expected error about no local account, got %s", e)
}
case <-time.After(2 * time.Second):
t.Fatalf("Did not get the error")
}

// For now, sa would try to recreate the connection for ever.
// Check that lid is increasing...
time.Sleep(100 * time.Millisecond)
lid := atomic.LoadUint64(&sa.gcid)
if lid < 4 {
t.Fatalf("Seems like connection was not retried")
}
}

// This test ensures that we can connect using proper user/password
// to a LN URL that was discovered through the INFO protocol.
func TestLeafNodeBasicAuthFailover(t *testing.T) {
content := `
listen: "127.0.0.1:-1"
cluster {
listen: "127.0.0.1:-1"
%s
}
leafnodes {
listen: "127.0.0.1:-1"
authorization {
user: foo
password: pwd
timeout: 1
}
}
`
conf := createConfFile(t, []byte(fmt.Sprintf(content, "")))
defer os.Remove(conf)

sb1, ob1 := RunServerWithConfig(conf)
defer sb1.Shutdown()

conf = createConfFile(t, []byte(fmt.Sprintf(content, fmt.Sprintf("routes: [nats://127.0.0.1:%d]", ob1.Cluster.Port))))
defer os.Remove(conf)

sb2, _ := RunServerWithConfig(conf)
defer sb2.Shutdown()

checkClusterFormed(t, sb1, sb2)

content = `
port: -1
accounts {
foo {}
}
leafnodes {
listen: "127.0.0.1:-1"
remotes [
{
account: "foo"
url: "nats://foo:pwd@127.0.0.1:%d"
}
]
}
`
conf = createConfFile(t, []byte(fmt.Sprintf(content, ob1.LeafNode.Port)))
defer os.Remove(conf)

sa, _ := RunServerWithConfig(conf)
defer sa.Shutdown()

checkLeafNodeConnected(t, sa)

// Shutdown sb1, sa should reconnect to sb2
sb1.Shutdown()

// Wait a bit to make sure there was a disconnect and attempt to reconnect.
time.Sleep(250 * time.Millisecond)

// Should be able to reconnect
checkLeafNodeConnected(t, sa)
}
1 change: 1 addition & 0 deletions server/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2488,6 +2488,7 @@ func TestMonitorLeafNode(t *testing.T) {
opts.LeafNode.Port = -1
opts.LeafNode.AuthTimeout = 1
opts.LeafNode.TLSTimeout = 1
opts.Accounts = []*Account{NewAccount("acc")}
u, _ := url.Parse("nats://ivan:pwd@localhost:1234")
opts.LeafNode.Remotes = []*RemoteLeafOpts{
&RemoteLeafOpts{
Expand Down
94 changes: 58 additions & 36 deletions server/routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1083,38 +1083,6 @@ func TestRouteNoCrashOnAddingSubToRoute(t *testing.T) {
}

func TestRouteRTT(t *testing.T) {
checkRTT := func(t *testing.T, s *Server, checkForUpdate bool) {
t.Helper()
var route *client
s.mu.Lock()
for _, r := range s.routes {
route = r
break
}
s.mu.Unlock()

prev := time.Duration(0)
check := func(t *testing.T) {
t.Helper()
checkFor(t, time.Second, 15*time.Millisecond, func() error {
route.mu.Lock()
rtt := route.rtt
route.mu.Unlock()
if rtt == 0 || rtt == prev {
return fmt.Errorf("RTT probably not tracked")
}
prev = rtt
return nil
})
}
check(t)
if checkForUpdate {
// Wait a bit and check that rtt is updated
time.Sleep(30 * time.Millisecond)
check(t)
}
}

ob := DefaultOptions()
ob.PingInterval = 15 * time.Millisecond
sb := RunServer(ob)
Expand All @@ -1127,8 +1095,62 @@ func TestRouteRTT(t *testing.T) {
defer sa.Shutdown()

checkClusterFormed(t, sa, sb)
checkRTT(t, sa, true)
checkRTT(t, sb, true)

checkRTT := func(t *testing.T, s *Server) time.Duration {
t.Helper()
var route *client
s.mu.Lock()
for _, r := range s.routes {
route = r
break
}
s.mu.Unlock()

var rtt time.Duration
checkFor(t, time.Second, 15*time.Millisecond, func() error {
route.mu.Lock()
rtt = route.rtt
route.mu.Unlock()
if rtt == 0 {
return fmt.Errorf("RTT not tracked")
}
return nil
})
return rtt
}

prevA := checkRTT(t, sa)
prevB := checkRTT(t, sb)

checkUpdated := func(t *testing.T, s *Server, prev time.Duration) {
t.Helper()
attempts := 0
timeout := time.Now().Add(time.Second)
for time.Now().Before(timeout) {
if rtt := checkRTT(t, s); rtt != 0 {
return
}
attempts++
if attempts == 5 {
// If could be that we are very unlucky
// and the RTT is constant. So override
// the route's RTT to 0 to see if it gets
// updated.
s.mu.Lock()
for _, r := range s.routes {
r.mu.Lock()
r.rtt = 0
r.mu.Unlock()
break
}
s.mu.Unlock()
}
time.Sleep(15 * time.Millisecond)
}
t.Fatalf("RTT probably not updated")
}
checkUpdated(t, sa, prevA)
checkUpdated(t, sb, prevB)

sa.Shutdown()
sb.Shutdown()
Expand All @@ -1147,6 +1169,6 @@ func TestRouteRTT(t *testing.T) {
defer sa.Shutdown()

checkClusterFormed(t, sa, sb)
checkRTT(t, sa, false)
checkRTT(t, sb, false)
checkRTT(t, sa)
checkRTT(t, sb)
}
Loading