diff --git a/server/client.go b/server/client.go index 48315bd3a3d..43206336406 100644 --- a/server/client.go +++ b/server/client.go @@ -57,10 +57,9 @@ type client struct { last time.Time parseState - route *route - sendLocalSubs bool - debug bool - trace bool + route *route + debug bool + trace bool } func (c *client) String() (id string) { @@ -307,6 +306,12 @@ func (c *client) maxPayloadViolation(sz int) { c.closeConnection() } +// Assume the lock is held upon entry. +func (c *client) sendInfo(info []byte) { + c.bw.Write(info) + c.bw.Flush() +} + func (c *client) sendErr(err string) { c.mu.Lock() if c.bw != nil { @@ -942,6 +947,11 @@ func (c *client) closeConnection() { subs := c.subs.All() srv := c.srv + retryImplicit := false + if c.route != nil { + retryImplicit = c.route.retry + } + c.mu.Unlock() if srv != nil { @@ -963,19 +973,26 @@ func (c *client) closeConnection() { // Check for a solicited route. If it was, start up a reconnect unless // we are already connected to the other end. - if c.isSolicitedRoute() { + if c.isSolicitedRoute() || retryImplicit { + // Capture these under lock + c.mu.Lock() + rid := c.route.remoteID + rtype := c.route.routeType + rurl := c.route.url + c.mu.Unlock() + srv.mu.Lock() defer srv.mu.Unlock() - rid := c.route.remoteID + if rid != "" && srv.remotes[rid] != nil { Debugf("Not attempting reconnect for solicited route, already connected to \"%s\"", rid) return - } else if c.route.remoteID == srv.info.ID { - Debugf("Detected route to self, ignoring \"%s\"", c.route.url) + } else if rid == srv.info.ID { + Debugf("Detected route to self, ignoring \"%s\"", rurl) return - } else if c.route.routeType != Implicit { - Debugf("Attempting reconnect for solicited route \"%s\"", c.route.url) - go srv.reConnectToRoute(c.route.url) + } else if rtype != Implicit || retryImplicit { + Debugf("Attempting reconnect for solicited route \"%s\"", rurl) + go srv.reConnectToRoute(rurl, rtype) } } } diff --git a/server/configs/seed.conf b/server/configs/seed.conf new file mode 100644 index 00000000000..147ad7a52be --- /dev/null +++ b/server/configs/seed.conf @@ -0,0 +1,13 @@ +# Copyright 2016 Apcera Inc. All rights reserved. + +# Cluster Seed Node + +port: 7222 +net: '127.0.0.1' + +http_port: 9222 + +cluster { + host: '127.0.0.1' + port: 7248 +} diff --git a/server/configs/seed_tls.conf b/server/configs/seed_tls.conf new file mode 100644 index 00000000000..4e2d005e016 --- /dev/null +++ b/server/configs/seed_tls.conf @@ -0,0 +1,26 @@ +# Copyright 2016 Apcera Inc. All rights reserved. + +# Cluster Seed Node + +port: 7222 +net: '127.0.0.1' + +http_port: 9222 + +cluster { + host: '127.0.0.1' + port: 7248 + + tls { + # Route cert + cert_file: "../test/configs/certs/server-cert.pem" + # Private key + key_file: "../test/configs/certs/server-key.pem" + # Specified time for handshake to complete + timeout: 2 + + # Optional certificate authority verifying connected routes + # Required when we have self-signed CA, etc. + ca_file: "../test/configs/certs/ca.pem" + } +} diff --git a/server/route.go b/server/route.go index d3530115636..0857e241c91 100644 --- a/server/route.go +++ b/server/route.go @@ -12,6 +12,7 @@ import ( "net/url" "regexp" "strconv" + "strings" "sync/atomic" "time" ) @@ -30,19 +31,14 @@ const ( type route struct { remoteID string didSolicit bool + retry bool routeType RouteType url *url.URL + ipUrlString string authRequired bool tlsRequired bool } -type RemoteInfo struct { - RemoteID string `json:"id"` - URL string `json:"url"` - AuthRequired bool `json:"auth_required"` - TLSRequired bool `json:"tls_required"` -} - type connectInfo struct { Verbose bool `json:"verbose"` Pedantic bool `json:"pedantic"` @@ -85,8 +81,24 @@ func (c *client) sendConnect(tlsRequired bool) { // Process the info message if we are a route. func (c *client) processRouteInfo(info *Info) { c.mu.Lock() - if c.route == nil { + // Connection can be closed at any time (by auth timeout, etc). + // Does not make sense to continue here if connection is gone. + if c.route == nil || c.nc == nil { + c.mu.Unlock() + return + } + + s := c.srv + remoteID := c.route.remoteID + + // We receive an INFO from a server that informs us about another server, + // so the info.ID in the INFO protocol does not match the ID of this route. + if remoteID != "" && remoteID != info.ID { c.mu.Unlock() + + // Process this implicit route. We will check that it is not an explicit + // route and/or that it has not been connected already. + s.processImplicitRoute(info) return } @@ -95,7 +107,7 @@ func (c *client) processRouteInfo(info *Info) { c.route.remoteID = info.ID // Detect route to self. - if c.route.remoteID == c.srv.info.ID { + if c.route.remoteID == s.info.ID { c.mu.Unlock() c.closeConnection() return @@ -120,17 +132,32 @@ func (c *client) processRouteInfo(info *Info) { c.route.url = url } + // Need to get the remote IP address. Do this outside of the c.route.url + // test because the former can be not nil in case of explicit route, but + // we still want to get the remote IP. + if c.route.ipUrlString == "" { + switch conn := c.nc.(type) { + case *net.TCPConn, *tls.Conn: + addr := conn.RemoteAddr().(*net.TCPAddr) + c.route.ipUrlString = fmt.Sprintf("nats-route://%s/", net.JoinHostPort(addr.IP.String(), strconv.Itoa(info.Port))) + default: + c.route.ipUrlString = fmt.Sprintf("%s", c.route.url) + } + } + + // Set this in case we need to forward the info to other servers. + info.IP = c.route.ipUrlString + // Check to see if we have this remote already registered. // This can happen when both servers have routes to each other. - s := c.srv c.mu.Unlock() - if s.addRoute(c) { + if added, sendInfo := s.addRoute(c, info); added { c.Debugf("Registering remote route %q", info.ID) // Send our local subscriptions to this route. s.sendLocalSubsToRoute(c) - if len(info.Routes) > 0 { - s.processImplicitRoutes(info.Routes) + if sendInfo { + s.forwardNewRouteInfoToKnownServers(info) } } else { c.Debugf("Detected duplicate remote route %q", info.ID) @@ -138,27 +165,69 @@ func (c *client) processRouteInfo(info *Info) { } } -// This will process implicit route information from other servers. +// This will process implicit route information received from another server. // We will check to see if we have configured or are already connected, // and if so we will ignore. Otherwise we will attempt to connect. -func (s *Server) processImplicitRoutes(routes []RemoteInfo) { +func (s *Server) processImplicitRoute(info *Info) { + remoteID := info.ID + s.mu.Lock() defer s.mu.Unlock() - for _, ri := range routes { - if _, exists := s.remotes[ri.RemoteID]; exists { - continue - } - // We have a new route that we do not currently know about. - // Process here and solicit a connection. - r, err := url.Parse(ri.URL) - if err != nil { - Debugf("Error parsing URL from Remote INFO: %v\n", err) - continue + + // Don't connect to ourself + if remoteID == s.info.ID { + return + } + // Check if this route already exists + if _, exists := s.remotes[remoteID]; exists { + return + } + // Check if we have this route as a configured route + if s.hasThisRouteConfigured(info) { + return + } + + // Initiate the connection, using info.IP instead of info.URL here... + r, err := url.Parse(info.IP) + if err != nil { + Debugf("Error parsing URL from INFO: %v\n", err) + return + } + if info.AuthRequired { + r.User = url.UserPassword(s.opts.ClusterUsername, s.opts.ClusterPassword) + } + go s.connectToRoute(r, false) +} + +// hasThisRouteConfigured returns true if info.Host:info.Port is present +// in the server's opts.Routes, false otherwise. +// Server lock is assumed to be held by caller. +func (s *Server) hasThisRouteConfigured(info *Info) bool { + urlToCheckExplicit := net.JoinHostPort(info.Host, strconv.Itoa(info.Port)) + for _, ri := range s.opts.Routes { + if strings.ToLower(ri.Host) == strings.ToLower(urlToCheckExplicit) { + return true } - if ri.AuthRequired { - r.User = url.UserPassword(s.opts.ClusterUsername, s.opts.ClusterPassword) + } + return false +} + +// forwardNewRouteInfoToKnownServers sends the INFO protocol of the new route +// to all routes known by this server. In turn, each server will contact this +// new route. +func (s *Server) forwardNewRouteInfoToKnownServers(info *Info) { + s.mu.Lock() + defer s.mu.Unlock() + + b, _ := json.Marshal(info) + infoJSON := []byte(fmt.Sprintf(InfoProto, b)) + + for _, r := range s.routes { + r.mu.Lock() + if r.route.remoteID != info.ID { + r.sendInfo(infoJSON) } - go s.connectToRoute(r) + r.mu.Unlock() } } @@ -169,15 +238,6 @@ func (s *Server) sendLocalSubsToRoute(route *client) { b := bytes.Buffer{} s.mu.Lock() - if s.routes[route.cid] == nil { - // We are too early, let createRoute call this function. - route.mu.Lock() - route.sendLocalSubs = true - route.mu.Unlock() - s.mu.Unlock() - return - } - for _, client := range s.clients { client.mu.Lock() subs := client.subs.All() @@ -204,57 +264,20 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client { didSolicit := rURL != nil r := &route{didSolicit: didSolicit} for _, route := range s.opts.Routes { - if rURL == route { + if rURL != nil && (strings.ToLower(rURL.Host) == strings.ToLower(route.Host)) { r.routeType = Explicit } } + c := &client{srv: s, nc: conn, opts: clientOpts{}, typ: ROUTER, route: r} - // Grab server variables and clone known routes. + // Grab server variables s.mu.Lock() - // copy - info := s.routeInfo - for _, r := range s.routes { - r.mu.Lock() - // race condition where connection can be closed (r.nc == nil) - // and route still in s.routes[]. - if r.nc == nil { - r.mu.Unlock() - continue - } - - if r.route.url != nil { - var rurl string - - _, rport, err := net.SplitHostPort(r.route.url.Host) - if err == nil { - // We will send the url but based on the route's ip address. - if ip, ok := r.nc.(*net.TCPConn); ok { - addr := ip.RemoteAddr().(*net.TCPAddr) - rurl = fmt.Sprintf("nats-route://%s:%s/", addr.IP.String(), rport) - } - } - - // Error or route not connected? - if rurl == "" { - rurl = fmt.Sprintf("%s", r.route.url) - } - - ri := RemoteInfo{ - RemoteID: r.route.remoteID, - URL: rurl, - AuthRequired: r.route.authRequired, - TLSRequired: r.route.tlsRequired, - } - info.Routes = append(info.Routes, ri) - } - r.mu.Unlock() - } + infoJSON := s.routeInfoJSON + authRequired := s.routeInfo.AuthRequired + tlsRequired := s.routeInfo.TLSRequired s.mu.Unlock() - authRequired := info.AuthRequired - tlsRequired := info.TLSRequired - // Grab lock c.mu.Lock() @@ -263,6 +286,12 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client { c.Debugf("Route connection created") + if didSolicit { + // Do this before the TLS code, otherwise, in case of failure + // and if route is explicit, it would try to reconnect to 'nil'... + r.url = rURL + } + // Check for TLS if tlsRequired { // Copy off the config to add in ServerName if we @@ -300,6 +329,12 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client { // Re-Grab lock c.mu.Lock() + // Verify that the connection did not go away while we released the lock. + if c.nc == nil { + c.mu.Unlock() + return nil + } + // Rewrap bw c.bw = bufio.NewWriterSize(c.nc, s.opts.BufSize) @@ -318,17 +353,12 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client { // Queue Connect proto if we solicited the connection. if didSolicit { - r.url = rURL c.Debugf("Route connect msg sent") c.sendConnect(tlsRequired) } - // Add other routes in that are known to the info payload - b, _ := json.Marshal(info) - infoJSON := []byte(fmt.Sprintf(InfoProto, b)) - // Send our info to the other side. - s.sendInfo(c, infoJSON) + c.sendInfo(infoJSON) // Check for Auth required state for incoming connections. if authRequired && !didSolicit { @@ -336,25 +366,8 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client { c.setAuthTimer(ttl) } - // Unlock to register. - c.mu.Unlock() - - // Register with the server. - s.mu.Lock() - s.routes[c.cid] = c - s.mu.Unlock() - - // Now that the route is registered, we need to make sure that - // the send of the local subs was not done too early (from - // processRouteInfo). If it was, then send again. - c.mu.Lock() - sendLocalSubs := c.sendLocalSubs c.mu.Unlock() - if sendLocalSubs { - s.sendLocalSubsToRoute(c) - } - return c } @@ -417,23 +430,45 @@ func routeSid(sub *subscription) string { return fmt.Sprintf("%s%s:%d:%s", qi, RSID, sub.client.cid, sub.sid) } -func (s *Server) addRoute(c *client) bool { +func (s *Server) addRoute(c *client, info *Info) (bool, bool) { id := c.route.remoteID + sendInfo := false + s.mu.Lock() remote, exists := s.remotes[id] if !exists { + s.routes[c.cid] = c s.remotes[id] = c + + // If this server's ID is (alpha) less than the peer, then we will + // make sure that if we are disconnected, we will try to connect once + // more. This is to mitigate the issue where both sides add the route + // on the opposite connection, and therefore we end-up with both + // being dropped. + if s.info.ID < id { + c.mu.Lock() + // Make this as a retry (otherwise, only explicit are retried). + c.route.retry = true + c.mu.Unlock() + } + + // we don't need to send if the only route is the one we just accepted. + sendInfo = len(s.routes) > 1 } s.mu.Unlock() if exists && c.route.didSolicit { // upgrade to solicited? remote.mu.Lock() + // the existing route (remote) should keep its 'retry' value, and + // not be replaced with c.route.retry. + retry := remote.route.retry remote.route = c.route + remote.route.retry = retry remote.mu.Unlock() } - return !exists + return !exists, sendInfo } func (s *Server) broadcastToRoutes(proto string) { @@ -543,6 +578,8 @@ func (s *Server) StartRouting() { info.AuthRequired = true } s.routeInfo = info + b, _ := json.Marshal(info) + s.routeInfoJSON = []byte(fmt.Sprintf(InfoProto, b)) // Spin up the accept loop ch := make(chan struct{}) @@ -553,12 +590,15 @@ func (s *Server) StartRouting() { s.solicitRoutes() } -func (s *Server) reConnectToRoute(rUrl *url.URL) { - time.Sleep(DEFAULT_ROUTE_RECONNECT) - s.connectToRoute(rUrl) +func (s *Server) reConnectToRoute(rUrl *url.URL, rtype RouteType) { + tryForEver := rtype == Explicit + if tryForEver { + time.Sleep(DEFAULT_ROUTE_RECONNECT) + } + s.connectToRoute(rUrl, tryForEver) } -func (s *Server) connectToRoute(rUrl *url.URL) { +func (s *Server) connectToRoute(rUrl *url.URL, tryForEver bool) { for s.isRunning() && rUrl != nil { Debugf("Trying to connect to route on %s", rUrl.Host) conn, err := net.DialTimeout("tcp", rUrl.Host, DEFAULT_ROUTE_DIAL) @@ -568,6 +608,9 @@ func (s *Server) connectToRoute(rUrl *url.URL) { case <-s.rcQuit: return case <-time.After(DEFAULT_ROUTE_CONNECT): + if !tryForEver { + return + } continue } } @@ -586,7 +629,7 @@ func (c *client) isSolicitedRoute() bool { func (s *Server) solicitRoutes() { for _, r := range s.opts.Routes { - go s.connectToRoute(r) + go s.connectToRoute(r, true) } } diff --git a/server/routes_test.go b/server/routes_test.go index dc2e30c2e62..329a855a86f 100644 --- a/server/routes_test.go +++ b/server/routes_test.go @@ -132,3 +132,251 @@ func TestServerRoutesWithAuthAndBCrypt(t *testing.T) { t.Fatal("Timeout waiting for message across route") } } + +// Helper function to check that a cluster is formed +func checkClusterFormed(t *testing.T, servers ...*Server) { + // Wait for the cluster to form + var err string + expectedNumRoutes := len(servers) - 1 + maxTime := time.Now().Add(5 * time.Second) + for time.Now().Before(maxTime) { + err = "" + for _, s := range servers { + if numRoutes := s.NumRoutes(); numRoutes != expectedNumRoutes { + err = fmt.Sprintf("Expected %d routes for server %q, got %d", expectedNumRoutes, s.Id(), numRoutes) + break + } + } + if err != "" { + time.Sleep(100 * time.Millisecond) + } else { + break + } + } + if err != "" { + t.Fatalf("%s", err) + } +} + +// Helper function to generate next opts to make sure no port conflicts etc. +func nextServerOpts(opts *Options) *Options { + nopts := *opts + nopts.Port += 1 + nopts.ClusterPort += 1 + nopts.HTTPPort += 1 + return &nopts +} + +func TestSeedSolicitWorks(t *testing.T) { + optsSeed, _ := ProcessConfigFile("./configs/seed.conf") + + optsSeed.NoSigs, optsSeed.NoLog = true, true + + srvSeed := RunServer(optsSeed) + defer srvSeed.Shutdown() + + optsA := nextServerOpts(optsSeed) + optsA.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", optsSeed.ClusterHost, optsSeed.ClusterPort)) + + srvA := RunServer(optsA) + defer srvA.Shutdown() + + urlA := fmt.Sprintf("nats://%s:%d/", optsA.Host, optsA.Port) + + nc1, err := nats.Connect(urlA) + if err != nil { + t.Fatalf("Error creating client: %v\n", err) + } + defer nc1.Close() + + // Test that we are connected. + ch := make(chan bool) + nc1.Subscribe("foo", func(m *nats.Msg) { ch <- true }) + nc1.Flush() + + optsB := nextServerOpts(optsA) + optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", optsSeed.ClusterHost, optsSeed.ClusterPort)) + + srvB := RunServer(optsB) + defer srvB.Shutdown() + + urlB := fmt.Sprintf("nats://%s:%d/", optsB.Host, optsB.Port) + + nc2, err := nats.Connect(urlB) + if err != nil { + t.Fatalf("Error creating client: %v\n", err) + } + defer nc2.Close() + + checkClusterFormed(t, srvSeed, srvA, srvB) + + nc2.Publish("foo", []byte("Hello")) + + // Wait for message + select { + case <-ch: + case <-time.After(2 * time.Second): + t.Fatal("Timeout waiting for message across route") + } +} + +func TestTLSSeedSolicitWorks(t *testing.T) { + optsSeed, _ := ProcessConfigFile("./configs/seed_tls.conf") + + optsSeed.NoSigs, optsSeed.NoLog = true, true + + srvSeed := RunServer(optsSeed) + defer srvSeed.Shutdown() + + optsA := nextServerOpts(optsSeed) + optsA.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", optsSeed.ClusterHost, optsSeed.ClusterPort)) + + srvA := RunServer(optsA) + defer srvA.Shutdown() + + urlA := fmt.Sprintf("nats://%s:%d/", optsA.Host, optsA.Port) + + nc1, err := nats.Connect(urlA) + if err != nil { + t.Fatalf("Error creating client: %v\n", err) + } + defer nc1.Close() + + // Test that we are connected. + ch := make(chan bool) + nc1.Subscribe("foo", func(m *nats.Msg) { ch <- true }) + nc1.Flush() + + optsB := nextServerOpts(optsA) + optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", optsSeed.ClusterHost, optsSeed.ClusterPort)) + + srvB := RunServer(optsB) + defer srvB.Shutdown() + + urlB := fmt.Sprintf("nats://%s:%d/", optsB.Host, optsB.Port) + + nc2, err := nats.Connect(urlB) + if err != nil { + t.Fatalf("Error creating client: %v\n", err) + } + defer nc2.Close() + + checkClusterFormed(t, srvSeed, srvA, srvB) + + nc2.Publish("foo", []byte("Hello")) + + // Wait for message + select { + case <-ch: + case <-time.After(2 * time.Second): + t.Fatal("Timeout waiting for message across route") + } +} + +func TestChainedSolicitWorks(t *testing.T) { + optsSeed, _ := ProcessConfigFile("./configs/seed.conf") + + optsSeed.NoSigs, optsSeed.NoLog = true, true + + srvSeed := RunServer(optsSeed) + defer srvSeed.Shutdown() + + optsA := nextServerOpts(optsSeed) + optsA.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", optsSeed.ClusterHost, optsSeed.ClusterPort)) + + srvA := RunServer(optsA) + defer srvA.Shutdown() + + urlSeed := fmt.Sprintf("nats://%s:%d/", optsSeed.Host, optsSeed.Port) + + nc1, err := nats.Connect(urlSeed) + if err != nil { + t.Fatalf("Error creating client: %v\n", err) + } + defer nc1.Close() + + // Test that we are connected. + ch := make(chan bool) + nc1.Subscribe("foo", func(m *nats.Msg) { ch <- true }) + nc1.Flush() + + optsB := nextServerOpts(optsA) + // Server B connects to A + optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", optsA.ClusterHost, optsA.ClusterPort)) + + srvB := RunServer(optsB) + defer srvB.Shutdown() + + urlB := fmt.Sprintf("nats://%s:%d/", optsB.Host, optsB.Port) + + nc2, err := nats.Connect(urlB) + if err != nil { + t.Fatalf("Error creating client: %v\n", err) + } + defer nc2.Close() + + checkClusterFormed(t, srvSeed, srvA, srvB) + + nc2.Publish("foo", []byte("Hello")) + + // Wait for message + select { + case <-ch: + case <-time.After(2 * time.Second): + t.Fatal("Timeout waiting for message across route") + } +} + +func TestTLSChainedSolicitWorks(t *testing.T) { + optsSeed, _ := ProcessConfigFile("./configs/seed_tls.conf") + + optsSeed.NoSigs, optsSeed.NoLog = true, true + + srvSeed := RunServer(optsSeed) + defer srvSeed.Shutdown() + + optsA := nextServerOpts(optsSeed) + optsA.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", optsSeed.ClusterHost, optsSeed.ClusterPort)) + + srvA := RunServer(optsA) + defer srvA.Shutdown() + + urlSeed := fmt.Sprintf("nats://%s:%d/", optsSeed.Host, optsSeed.Port) + + nc1, err := nats.Connect(urlSeed) + if err != nil { + t.Fatalf("Error creating client: %v\n", err) + } + defer nc1.Close() + + // Test that we are connected. + ch := make(chan bool) + nc1.Subscribe("foo", func(m *nats.Msg) { ch <- true }) + nc1.Flush() + + optsB := nextServerOpts(optsA) + // Server B connects to A + optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", optsA.ClusterHost, optsA.ClusterPort)) + + srvB := RunServer(optsB) + defer srvB.Shutdown() + + urlB := fmt.Sprintf("nats://%s:%d/", optsB.Host, optsB.Port) + + nc2, err := nats.Connect(urlB) + if err != nil { + t.Fatalf("Error creating client: %v\n", err) + } + defer nc2.Close() + + checkClusterFormed(t, srvSeed, srvA, srvB) + + nc2.Publish("foo", []byte("Hello")) + + // Wait for message + select { + case <-ch: + case <-time.After(2 * time.Second): + t.Fatal("Timeout waiting for message across route") + } +} diff --git a/server/server.go b/server/server.go index 79814b3f528..4a4d920c022 100644 --- a/server/server.go +++ b/server/server.go @@ -26,17 +26,17 @@ import ( // Info is the information sent to clients to help them understand information // about this server. type Info struct { - ID string `json:"server_id"` - Version string `json:"version"` - GoVersion string `json:"go"` - Host string `json:"host"` - Port int `json:"port"` - AuthRequired bool `json:"auth_required"` - SSLRequired bool `json:"ssl_required"` // DEPRECATED: ssl json used for older clients - TLSRequired bool `json:"tls_required"` - TLSVerify bool `json:"tls_verify"` - MaxPayload int `json:"max_payload"` - Routes []RemoteInfo `json:"routes,omitempty"` + ID string `json:"server_id"` + Version string `json:"version"` + GoVersion string `json:"go"` + Host string `json:"host"` + Port int `json:"port"` + AuthRequired bool `json:"auth_required"` + SSLRequired bool `json:"ssl_required"` // DEPRECATED: ssl json used for older clients + TLSRequired bool `json:"tls_required"` + TLSVerify bool `json:"tls_verify"` + MaxPayload int `json:"max_payload"` + IP string `json:"ip,omitempty"` } // Server is our main struct. @@ -65,6 +65,7 @@ type Server struct { httpReqStats map[string]uint64 routeListener net.Listener routeInfo Info + routeInfoJSON []byte rcQuit chan bool } @@ -490,7 +491,7 @@ func (s *Server) createClient(conn net.Conn) *client { } // Send our information. - s.sendInfo(c, info) + c.sendInfo(info) // Unlock to register c.mu.Unlock() @@ -615,11 +616,6 @@ func tlsCipher(cs uint16) string { return fmt.Sprintf("Unknown [%x]", cs) } -// Assume the lock is held upon entry. -func (s *Server) sendInfo(c *client, info []byte) { - c.nc.Write(info) -} - func (s *Server) checkClientAuth(c *client) bool { if s.cAuth == nil { return true @@ -651,6 +647,7 @@ func (s *Server) removeClient(c *client) { c.mu.Lock() cid := c.cid typ := c.typ + r := c.route c.mu.Unlock() s.mu.Lock() @@ -659,11 +656,11 @@ func (s *Server) removeClient(c *client) { delete(s.clients, cid) case ROUTER: delete(s.routes, cid) - if c.route != nil { - rc, ok := s.remotes[c.route.remoteID] + if r != nil { + rc, ok := s.remotes[r.remoteID] // Only delete it if it is us.. if ok && c == rc { - delete(s.remotes, c.route.remoteID) + delete(s.remotes, r.remoteID) } } } diff --git a/test/client_cluster_test.go b/test/client_cluster_test.go index 6644164ae3d..b4a27163469 100644 --- a/test/client_cluster_test.go +++ b/test/client_cluster_test.go @@ -228,7 +228,7 @@ func TestServerRestartAndQueueSubs(t *testing.T) { waitOnReconnect() - time.Sleep(50 * time.Millisecond) + checkClusterFormed(t, srvA, srvB) // Now send another 10 messages, from each client.. sendAndCheckMsgs(10) diff --git a/test/cluster_test.go b/test/cluster_test.go index d0108be7646..8d7ec11f5e7 100644 --- a/test/cluster_test.go +++ b/test/cluster_test.go @@ -11,16 +11,42 @@ import ( "github.com/nats-io/gnatsd/server" ) +// Helper function to check that a cluster is formed +func checkClusterFormed(t *testing.T, servers ...*server.Server) { + // Wait for the cluster to form + var err string + expectedNumRoutes := len(servers) - 1 + maxTime := time.Now().Add(5 * time.Second) + for time.Now().Before(maxTime) { + err = "" + for _, s := range servers { + if numRoutes := s.NumRoutes(); numRoutes != expectedNumRoutes { + err = fmt.Sprintf("Expected %d routes for server %q, got %d", expectedNumRoutes, s.Id(), numRoutes) + break + } + } + if err != "" { + time.Sleep(100 * time.Millisecond) + } else { + break + } + } + if err != "" { + t.Fatalf("%s", err) + } +} + func runServers(t *testing.T) (srvA, srvB *server.Server, optsA, optsB *server.Options) { srvA, optsA = RunServerWithConfig("./configs/srv_a.conf") srvB, optsB = RunServerWithConfig("./configs/srv_b.conf") + + checkClusterFormed(t, srvA, srvB) return } func TestProperServerWithRoutesShutdown(t *testing.T) { before := runtime.NumGoroutine() srvA, srvB, _, _ := runServers(t) - time.Sleep(100 * time.Millisecond) srvA.Shutdown() srvB.Shutdown() time.Sleep(100 * time.Millisecond) @@ -38,16 +64,6 @@ func TestDoubleRouteConfig(t *testing.T) { srvA, srvB, _, _ := runServers(t) defer srvA.Shutdown() defer srvB.Shutdown() - - // Wait for the setup - time.Sleep(1 * time.Second) - - if numRoutesA := srvA.NumRoutes(); numRoutesA != 1 { - t.Fatalf("Expected one route for srvA, got %d\n", numRoutesA) - } - if numRoutesB := srvB.NumRoutes(); numRoutesB != 1 { - t.Fatalf("Expected one route for srvB, got %d\n", numRoutesB) - } } func TestBasicClusterPubSub(t *testing.T) { diff --git a/test/cluster_tls_test.go b/test/cluster_tls_test.go index 3182f771585..e95e91e3319 100644 --- a/test/cluster_tls_test.go +++ b/test/cluster_tls_test.go @@ -4,7 +4,6 @@ package test import ( "testing" - "time" "github.com/nats-io/gnatsd/server" ) @@ -12,6 +11,7 @@ import ( func runTLSServers(t *testing.T) (srvA, srvB *server.Server, optsA, optsB *server.Options) { srvA, optsA = RunServerWithConfig("./configs/srv_a_tls.conf") srvB, optsB = RunServerWithConfig("./configs/srv_b_tls.conf") + checkClusterFormed(t, srvA, srvB) return } @@ -19,16 +19,6 @@ func TestTLSClusterConfig(t *testing.T) { srvA, srvB, _, _ := runTLSServers(t) defer srvA.Shutdown() defer srvB.Shutdown() - - // Wait for the setup - time.Sleep(1 * time.Second) - - if numRoutesA := srvA.NumRoutes(); numRoutesA != 1 { - t.Fatalf("Expected one route for srvA, got %d\n", numRoutesA) - } - if numRoutesB := srvB.NumRoutes(); numRoutesB != 1 { - t.Fatalf("Expected one route for srvB, got %d\n", numRoutesB) - } } func TestBasicTLSClusterPubSub(t *testing.T) { @@ -36,9 +26,6 @@ func TestBasicTLSClusterPubSub(t *testing.T) { defer srvA.Shutdown() defer srvB.Shutdown() - // Wait for the setup - time.Sleep(500 * time.Millisecond) - clientA := createClientConn(t, optsA.Host, optsA.Port) defer clientA.Close() diff --git a/test/route_discovery_test.go b/test/route_discovery_test.go index 900e3cabc44..b26aef92b34 100644 --- a/test/route_discovery_test.go +++ b/test/route_discovery_test.go @@ -4,6 +4,7 @@ package test import ( "encoding/json" + "errors" "fmt" "io/ioutil" "net" @@ -40,8 +41,8 @@ func TestSeedFirstRouteInfo(t *testing.T) { t.Fatalf("Could not unmarshal route info: %v", err) } - if len(info.Routes) != 0 { - t.Fatalf("Expected len of []Routes to be zero vs %d\n", len(info.Routes)) + if info.ID != s.Id() { + t.Fatalf("Expected seed's ID %q, got %q", s.Id(), info.ID) } } @@ -52,14 +53,12 @@ func TestSeedMultipleRouteInfo(t *testing.T) { rc1 := createRouteConn(t, opts.ClusterHost, opts.ClusterPort) defer rc1.Close() - routeSend1, route1Expect := setupRoute(t, rc1, opts) - route1Expect(infoRe) - rc1ID := "2222" rc1Port := 22 rc1Host := "127.0.0.1" - hp1 := fmt.Sprintf("nats-route://%s/", net.JoinHostPort(rc1Host, strconv.Itoa(rc1Port))) + routeSend1, route1Expect := setupRouteEx(t, rc1, opts, rc1ID) + route1Expect(infoRe) // register ourselves via INFO r1Info := server.Info{ID: rc1ID, Host: rc1Host, Port: rc1Port} @@ -72,13 +71,13 @@ func TestSeedMultipleRouteInfo(t *testing.T) { rc2 := createRouteConn(t, opts.ClusterHost, opts.ClusterPort) defer rc2.Close() - routeSend2, route2Expect := setupRoute(t, rc2, opts) - rc2ID := "2224" rc2Port := 24 rc2Host := "127.0.0.1" - // hp2 := net.JoinHostPort(rc2Host, strconv.Itoa(rc2Port)) + routeSend2, route2Expect := setupRouteEx(t, rc2, opts, rc2ID) + + hp2 := fmt.Sprintf("nats-route://%s/", net.JoinHostPort(rc2Host, strconv.Itoa(rc2Port))) // register ourselves via INFO r2Info := server.Info{ID: rc2ID, Host: rc2Host, Port: rc2Port} @@ -86,29 +85,26 @@ func TestSeedMultipleRouteInfo(t *testing.T) { infoJSON = fmt.Sprintf(server.InfoProto, b) routeSend2(infoJSON) - // Now read back out the info from the seed route - buf := route2Expect(infoRe) + // Now read back the second INFO route1 should receive letting + // it know about route2 + buf := route1Expect(infoRe) info := server.Info{} if err := json.Unmarshal(buf[4:], &info); err != nil { t.Fatalf("Could not unmarshal route info: %v", err) } - if len(info.Routes) != 1 { - t.Fatalf("Expected len of []Routes to be 1 vs %d\n", len(info.Routes)) + if info.ID != rc2ID { + t.Fatalf("Expected info.ID to be %q, got %q", rc2ID, info.ID) } - - route := info.Routes[0] - if route.RemoteID != rc1ID { - t.Fatalf("Expected RemoteID of \"22\", got %q\n", route.RemoteID) - } - if route.URL == "" { - t.Fatalf("Expected a URL for the implicit route") + if info.IP == "" { + t.Fatalf("Expected a IP for the implicit route") } - if route.URL != hp1 { - t.Fatalf("Expected URL Host of %s, got %s\n", hp1, route.URL) + if info.IP != hp2 { + t.Fatalf("Expected IP Host of %s, got %s\n", hp2, info.IP) } + route2Expect(infoRe) routeSend2("PING\r\n") route2Expect(pongRe) @@ -116,12 +112,12 @@ func TestSeedMultipleRouteInfo(t *testing.T) { rc3 := createRouteConn(t, opts.ClusterHost, opts.ClusterPort) defer rc3.Close() - routeSend3, route3Expect := setupRoute(t, rc3, opts) - rc3ID := "2226" rc3Port := 26 rc3Host := "127.0.0.1" + routeSend3, _ := setupRouteEx(t, rc3, opts, rc3ID) + // register ourselves via INFO r3Info := server.Info{ID: rc3ID, Host: rc3Host, Port: rc3Port} b, _ = json.Marshal(r3Info) @@ -129,14 +125,27 @@ func TestSeedMultipleRouteInfo(t *testing.T) { routeSend3(infoJSON) // Now read back out the info from the seed route - buf = route3Expect(infoRe) + buf = route1Expect(infoRe) info = server.Info{} if err := json.Unmarshal(buf[4:], &info); err != nil { t.Fatalf("Could not unmarshal route info: %v", err) } - if len(info.Routes) != 2 { - t.Fatalf("Expected len of []Routes to be 2 vs %d\n", len(info.Routes)) + + if info.ID != rc3ID { + t.Fatalf("Expected info.ID to be %q, got %q", rc3ID, info.ID) + } + + // Now read back out the info from the seed route + buf = route2Expect(infoRe) + + info = server.Info{} + if err := json.Unmarshal(buf[4:], &info); err != nil { + t.Fatalf("Could not unmarshal route info: %v", err) + } + + if info.ID != rc3ID { + t.Fatalf("Expected info.ID to be %q, got %q", rc3ID, info.ID) } } @@ -195,6 +204,124 @@ func TestSeedSolicitWorks(t *testing.T) { } } +type serverInfo struct { + server *server.Server + opts *server.Options +} + +func checkConnected(t *testing.T, servers []serverInfo, current int, oneSeed bool) error { + s := servers[current] + + // Grab Routez from monitor ports, make sure we are fully connected + url := fmt.Sprintf("http://%s:%d/", s.opts.Host, s.opts.HTTPPort) + rz := readHttpRoutez(t, url) + total := len(servers) + var ids []string + for i := 0; i < total; i++ { + if i == current { + continue + } + ids = append(ids, servers[i].server.Id()) + } + ris, err := expectRidsNoFatal(t, true, rz, ids) + if err != nil { + return err + } + for i := 0; i < total; i++ { + if i == current { + continue + } + s := servers[i] + if current == 0 || ((oneSeed && i > 0) || (!oneSeed && (i != current-1))) { + if ris[s.server.Id()].IsConfigured != false { + return errors.New(fmt.Sprintf("Expected server %s:%d not to be configured", s.opts.Host, s.opts.Port)) + } + } else if oneSeed || (i == current-1) { + if ris[s.server.Id()].IsConfigured != true { + return errors.New(fmt.Sprintf("Expected server %s:%d to be configured", s.opts.Host, s.opts.Port)) + } + } + } + return nil +} + +func TestStressSeedSolicitWorks(t *testing.T) { + s1, opts := runSeedServer(t) + defer s1.Shutdown() + + // Create the routes string for others to connect to the seed. + routesStr := fmt.Sprintf("nats-route://%s:%d/", opts.ClusterHost, opts.ClusterPort) + + s2Opts := nextServerOpts(opts) + s2Opts.Routes = server.RoutesFromStr(routesStr) + + s3Opts := nextServerOpts(s2Opts) + s4Opts := nextServerOpts(s3Opts) + + for i := 0; i < 10; i++ { + func() { + // Run these servers manually, because we want them to start and + // connect to s1 as fast as possible. + + s2 := server.New(s2Opts) + if s2 == nil { + panic("No NATS Server object returned.") + } + defer s2.Shutdown() + go s2.Start() + + s3 := server.New(s3Opts) + if s3 == nil { + panic("No NATS Server object returned.") + } + defer s3.Shutdown() + go s3.Start() + + s4 := server.New(s4Opts) + if s4 == nil { + panic("No NATS Server object returned.") + } + defer s4.Shutdown() + go s4.Start() + + serversInfo := []serverInfo{{s1, opts}, {s2, s2Opts}, {s3, s3Opts}, {s4, s4Opts}} + + var err error + maxTime := time.Now().Add(5 * time.Second) + for time.Now().Before(maxTime) { + resetPreviousHTTPConnections() + + for j := 0; j < len(serversInfo); j++ { + err = checkConnected(t, serversInfo, j, true) + // If error, start this for loop from beginning + if err != nil { + // Sleep a bit before the next attempt + time.Sleep(100 * time.Millisecond) + break + } + } + // All servers checked ok, we are done, otherwise, try again + // until time is up + if err == nil { + break + } + } + // Report error + if err != nil { + t.Fatalf("Error: %v", err) + } + }() + maxTime := time.Now().Add(2 * time.Second) + for time.Now().Before(maxTime) { + if s1.NumRoutes() > 0 { + time.Sleep(10 * time.Millisecond) + } else { + break + } + } + } +} + func TestChainedSolicitWorks(t *testing.T) { s1, opts := runSeedServer(t) defer s1.Shutdown() @@ -253,6 +380,89 @@ func TestChainedSolicitWorks(t *testing.T) { } } +func TestStressChainedSolicitWorks(t *testing.T) { + s1, opts := runSeedServer(t) + defer s1.Shutdown() + + // Create the routes string for s2 to connect to the seed + routesStr := fmt.Sprintf("nats-route://%s:%d/", opts.ClusterHost, opts.ClusterPort) + s2Opts := nextServerOpts(opts) + s2Opts.Routes = server.RoutesFromStr(routesStr) + + s3Opts := nextServerOpts(s2Opts) + // Create the routes string for s3 to connect to s2 + routesStr = fmt.Sprintf("nats-route://%s:%d/", s2Opts.ClusterHost, s2Opts.ClusterPort) + s3Opts.Routes = server.RoutesFromStr(routesStr) + + s4Opts := nextServerOpts(s3Opts) + // Create the routes string for s4 to connect to s3 + routesStr = fmt.Sprintf("nats-route://%s:%d/", s3Opts.ClusterHost, s3Opts.ClusterPort) + s4Opts.Routes = server.RoutesFromStr(routesStr) + + for i := 0; i < 10; i++ { + func() { + // Run these servers manually, because we want them to start and + // connect to s1 as fast as possible. + + s2 := server.New(s2Opts) + if s2 == nil { + panic("No NATS Server object returned.") + } + defer s2.Shutdown() + go s2.Start() + + s3 := server.New(s3Opts) + if s3 == nil { + panic("No NATS Server object returned.") + } + defer s3.Shutdown() + go s3.Start() + + s4 := server.New(s4Opts) + if s4 == nil { + panic("No NATS Server object returned.") + } + defer s4.Shutdown() + go s4.Start() + + serversInfo := []serverInfo{{s1, opts}, {s2, s2Opts}, {s3, s3Opts}, {s4, s4Opts}} + + var err error + maxTime := time.Now().Add(5 * time.Second) + for time.Now().Before(maxTime) { + resetPreviousHTTPConnections() + + for j := 0; j < len(serversInfo); j++ { + err = checkConnected(t, serversInfo, j, false) + // If error, start this for loop from beginning + if err != nil { + // Sleep a bit before the next attempt + time.Sleep(100 * time.Millisecond) + break + } + } + // All servers checked ok, we are done, otherwise, try again + // until time is up + if err == nil { + break + } + } + // Report error + if err != nil { + t.Fatalf("Error: %v", err) + } + }() + maxTime := time.Now().Add(2 * time.Second) + for time.Now().Before(maxTime) { + if s1.NumRoutes() > 0 { + time.Sleep(10 * time.Millisecond) + } else { + break + } + } + } +} + func TestAuthSeedSolicitWorks(t *testing.T) { s1, opts := runAuthSeedServer(t) defer s1.Shutdown() @@ -310,9 +520,21 @@ func TestAuthSeedSolicitWorks(t *testing.T) { // Helper to check for correct route memberships func expectRids(t *testing.T, rz *server.Routez, rids []string) map[string]*server.RouteInfo { + ri, err := expectRidsNoFatal(t, false, rz, rids) + if err != nil { + t.Fatalf("%v", err) + } + return ri +} + +func expectRidsNoFatal(t *testing.T, direct bool, rz *server.Routez, rids []string) (map[string]*server.RouteInfo, error) { + caller := 1 + if !direct { + caller++ + } if len(rids) != rz.NumRoutes { - _, fn, line, _ := runtime.Caller(1) - t.Fatalf("[%s:%d] Expecting %d routes, got %d\n", fn, line, len(rids), rz.NumRoutes) + _, fn, line, _ := runtime.Caller(caller) + return nil, errors.New(fmt.Sprintf("[%s:%d] Expecting %d routes, got %d\n", fn, line, len(rids), rz.NumRoutes)) } set := make(map[string]bool) for _, v := range rids { @@ -322,12 +544,12 @@ func expectRids(t *testing.T, rz *server.Routez, rids []string) map[string]*serv ri := make(map[string]*server.RouteInfo) for _, r := range rz.Routes { if set[r.RemoteId] != true { - _, fn, line, _ := runtime.Caller(1) - t.Fatalf("[%s:%d] Route with rid %s unexpected, expected %+v\n", fn, line, r.RemoteId, rids) + _, fn, line, _ := runtime.Caller(caller) + return nil, errors.New(fmt.Sprintf("[%s:%d] Route with rid %s unexpected, expected %+v\n", fn, line, r.RemoteId, rids)) } ri[r.RemoteId] = r } - return ri + return ri, nil } // Helper to easily grab routez info. @@ -355,20 +577,20 @@ func readHttpRoutez(t *testing.T, url string) *server.Routez { return &r } -func TestSeedReturnIPInsteadOfURL(t *testing.T) { +func TestSeedReturnIPInInfo(t *testing.T) { s, opts := runSeedServer(t) defer s.Shutdown() rc1 := createRouteConn(t, opts.ClusterHost, opts.ClusterPort) defer rc1.Close() - routeSend1, route1Expect := setupRoute(t, rc1, opts) - route1Expect(infoRe) - rc1ID := "2222" rc1Port := 22 rc1Host := "localhost" + routeSend1, route1Expect := setupRouteEx(t, rc1, opts, rc1ID) + route1Expect(infoRe) + // register ourselves via INFO r1Info := server.Info{ID: rc1ID, Host: rc1Host, Port: rc1Port} b, _ := json.Marshal(r1Info) @@ -380,50 +602,40 @@ func TestSeedReturnIPInsteadOfURL(t *testing.T) { rc2 := createRouteConn(t, opts.ClusterHost, opts.ClusterPort) defer rc2.Close() - routeSend2, route2Expect := setupRoute(t, rc2, opts) - rc2ID := "2224" rc2Port := 24 rc2Host := "localhost" + routeSend2, _ := setupRouteEx(t, rc2, opts, rc2ID) + // register ourselves via INFO r2Info := server.Info{ID: rc2ID, Host: rc2Host, Port: rc2Port} b, _ = json.Marshal(r2Info) infoJSON = fmt.Sprintf(server.InfoProto, b) routeSend2(infoJSON) - // Now read back out the info from the seed route - buf := route2Expect(infoRe) + // Now read info that route1 should have received from the seed + buf := route1Expect(infoRe) info := server.Info{} if err := json.Unmarshal(buf[4:], &info); err != nil { t.Fatalf("Could not unmarshal route info: %v", err) } - if len(info.Routes) != 1 { - t.Fatalf("Expected len of []Routes to be 1 vs %d", len(info.Routes)) - } - - route := info.Routes[0] - if route.RemoteID != rc1ID { - t.Fatalf("Expected RemoteID of \"22\", got %q", route.RemoteID) + if info.IP == "" { + t.Fatal("Expected to have IP in INFO") } - if route.URL == "" { - t.Fatal("Expected a URL for the implicit route") - } - rurl := strings.TrimPrefix(route.URL, "nats-route://") - rhost, _, err := net.SplitHostPort(rurl) + rip, _, err := net.SplitHostPort(strings.TrimPrefix(info.IP, "nats-route://")) if err != nil { - t.Fatalf("Error getting host information from: %v, err=%v", route.URL, err) - } - if rhost == rc1Host { - t.Fatalf("Expected route url to include IP address, got %s", rhost) + t.Fatalf("Error parsing url: %v", err) } addr, ok := rc1.RemoteAddr().(*net.TCPAddr) if !ok { t.Fatal("Unable to get IP address from route") } - if rhost != addr.IP.String() { - t.Fatalf("Expected IP %s, got %s", addr.IP.String(), rhost) + s1 := strings.ToLower(addr.IP.String()) + s2 := strings.ToLower(rip) + if s1 != s2 { + t.Fatalf("Expected IP %s, got %s", s1, s2) } } diff --git a/test/routes_test.go b/test/routes_test.go index 6d4bca11204..e0e39d28db2 100644 --- a/test/routes_test.go +++ b/test/routes_test.go @@ -81,7 +81,7 @@ func TestRouteToSelf(t *testing.T) { rc := createRouteConn(t, opts.ClusterHost, opts.ClusterPort) defer rc.Close() - routeSend, routeExpect := setupRoute(t, rc, opts) + routeSend, routeExpect := setupRouteEx(t, rc, opts, s.Id()) buf := routeExpect(infoRe) info := server.Info{} @@ -121,7 +121,8 @@ func TestSendRouteSubAndUnsub(t *testing.T) { defer rc.Close() expectAuthRequired(t, rc) - setupRoute(t, rc, opts) + routeSend, _ := setupRouteEx(t, rc, opts, "ROUTER:xyz") + routeSend("INFO {\"server_id\":\"ROUTER:xyz\"}\r\n") // Send SUB via client connection send("SUB foo 22\r\n") @@ -216,6 +217,7 @@ func TestRouteForwardsMsgToClients(t *testing.T) { expectMsgs := expectMsgsCommand(t, clientExpect) route := createRouteConn(t, opts.ClusterHost, opts.ClusterPort) + defer route.Close() expectAuthRequired(t, route) routeSend, _ := setupRoute(t, route, opts) @@ -304,7 +306,8 @@ func TestRouteQueueSemantics(t *testing.T) { defer route.Close() expectAuthRequired(t, route) - routeSend, routeExpect := setupRoute(t, route, opts) + routeSend, routeExpect := setupRouteEx(t, route, opts, "ROUTER:xyz") + routeSend("INFO {\"server_id\":\"ROUTER:xyz\"}\r\n") expectMsgs := expectMsgsCommand(t, routeExpect) // Express multiple interest on this route for foo, queue group bar. @@ -482,6 +485,7 @@ func TestRouteResendsLocalSubsOnReconnect(t *testing.T) { clientExpect(pongRe) route := createRouteConn(t, opts.ClusterHost, opts.ClusterPort) + defer route.Close() routeSend, routeExpect := setupRouteEx(t, route, opts, "ROUTE:4222") // Expect to see the local sub echoed through after we send our INFO. @@ -532,7 +536,8 @@ func TestAutoUnsubPropagation(t *testing.T) { defer route.Close() expectAuthRequired(t, route) - _, routeExpect := setupRoute(t, route, opts) + routeSend, routeExpect := setupRouteEx(t, route, opts, "ROUTER:xyz") + routeSend("INFO {\"server_id\":\"ROUTER:xyz\"}\r\n") // Setup a local subscription clientSend("SUB foo 2\r\n")