Skip to content

Commit

Permalink
Ensure Shutdown() waits for outstanding routes go routines
Browse files Browse the repository at this point in the history
We need to make sure that when Shutdown() returns, routes go routines
that try to connect or reconnect have returned. Otherwise, this may
affect tests running one after the other (a server from one test
may connect to a server in the next test).
  • Loading branch information
kozlovic committed Apr 21, 2016
1 parent f81d57c commit 3aa09ec
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 1 deletion.
9 changes: 9 additions & 0 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1054,6 +1054,12 @@ func (c *client) closeConnection() {
srv.mu.Lock()
defer srv.mu.Unlock()

// It is possible that the server is being shutdown.
// If so, don't try to reconnect
if !srv.running {
return
}

if rid != "" && srv.remotes[rid] != nil {
Debugf("Not attempting reconnect for solicited route, already connected to \"%s\"", rid)
return
Expand All @@ -1062,6 +1068,9 @@ func (c *client) closeConnection() {
return
} else if rtype != Implicit || retryImplicit {
Debugf("Attempting reconnect for solicited route \"%s\"", rurl)
// Keep track of this go-routine so we can wait for it on
// server shutdown.
srv.routeWG.Add(1)
go srv.reConnectToRoute(rurl, rtype)
}
}
Expand Down
5 changes: 5 additions & 0 deletions server/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ func (s *Server) processImplicitRoute(info *Info) {
if info.AuthRequired {
r.User = url.UserPassword(s.opts.ClusterUsername, s.opts.ClusterPassword)
}
s.routeWG.Add(1)
go s.connectToRoute(r, false)
}

Expand Down Expand Up @@ -598,6 +599,7 @@ func (s *Server) reConnectToRoute(rURL *url.URL, rtype RouteType) {
}

func (s *Server) connectToRoute(rURL *url.URL, tryForEver bool) {
defer s.routeWG.Done()
for s.isRunning() && rURL != nil {
Debugf("Trying to connect to route on %s", rURL.Host)
conn, err := net.DialTimeout("tcp", rURL.Host, DEFAULT_ROUTE_DIAL)
Expand Down Expand Up @@ -627,7 +629,10 @@ func (c *client) isSolicitedRoute() bool {
}

func (s *Server) solicitRoutes() {
s.mu.Lock()
defer s.mu.Unlock()
for _, r := range s.opts.Routes {
s.routeWG.Add(1)
go s.connectToRoute(r, true)
}
}
Expand Down
6 changes: 5 additions & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type Server struct {
routeListener net.Listener
routeInfo Info
routeInfoJSON []byte
routeWG sync.WaitGroup // to wait on (re)connect go routines, etc..
rcQuit chan bool
}

Expand Down Expand Up @@ -302,6 +303,9 @@ func (s *Server) Shutdown() {
<-s.done
doneExpected--
}

// Wait for route (re)connect go routines to be done.
s.routeWG.Wait()
}

// AcceptLoop is exported for easier testing.
Expand Down Expand Up @@ -757,7 +761,7 @@ func (s *Server) GetRouteListenEndpoint() string {
return net.JoinHostPort(host, strconv.Itoa(s.opts.ClusterPort))
}

// Id returns the server's ID
// ID returns the server's ID
func (s *Server) ID() string {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down

0 comments on commit 3aa09ec

Please sign in to comment.