Skip to content

Commit

Permalink
Fix cluster formation when servers connect quickly
Browse files Browse the repository at this point in the history
Both seed and chained cases are now handled properly when servers
connect quickly and concurrently to one another.
When accepting a route, the server will forward the new route INFO
protocol to its known routes. In turn those routes will connect
to the new server (if not already connected).
A retry for implicit route was introduced to mitigate the issue
with two servers connecting to each other and electing the opposite
connection as the winner, resulting in both connections being dropped.
The server with smaller ID will try once to reconnect.
Some tests were fixed to handle possible extra INFO protocol.
New tests added.

Fix issue: nats-io#206
  • Loading branch information
kozlovic committed Feb 25, 2016
1 parent 7a7cbfa commit 7c0a3b4
Show file tree
Hide file tree
Showing 11 changed files with 793 additions and 229 deletions.
39 changes: 28 additions & 11 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,9 @@ type client struct {
last time.Time
parseState

route *route
sendLocalSubs bool
debug bool
trace bool
route *route
debug bool
trace bool
}

func (c *client) String() (id string) {
Expand Down Expand Up @@ -307,6 +306,12 @@ func (c *client) maxPayloadViolation(sz int) {
c.closeConnection()
}

// Assume the lock is held upon entry.
func (c *client) sendInfo(info []byte) {
c.bw.Write(info)
c.bw.Flush()
}

func (c *client) sendErr(err string) {
c.mu.Lock()
if c.bw != nil {
Expand Down Expand Up @@ -942,6 +947,11 @@ func (c *client) closeConnection() {
subs := c.subs.All()
srv := c.srv

retryImplicit := false
if c.route != nil {
retryImplicit = c.route.retry
}

c.mu.Unlock()

if srv != nil {
Expand All @@ -963,19 +973,26 @@ func (c *client) closeConnection() {

// Check for a solicited route. If it was, start up a reconnect unless
// we are already connected to the other end.
if c.isSolicitedRoute() {
if c.isSolicitedRoute() || retryImplicit {
// Capture these under lock
c.mu.Lock()
rid := c.route.remoteID
rtype := c.route.routeType
rurl := c.route.url
c.mu.Unlock()

srv.mu.Lock()
defer srv.mu.Unlock()
rid := c.route.remoteID

if rid != "" && srv.remotes[rid] != nil {
Debugf("Not attempting reconnect for solicited route, already connected to \"%s\"", rid)
return
} else if c.route.remoteID == srv.info.ID {
Debugf("Detected route to self, ignoring \"%s\"", c.route.url)
} else if rid == srv.info.ID {
Debugf("Detected route to self, ignoring \"%s\"", rurl)
return
} else if c.route.routeType != Implicit {
Debugf("Attempting reconnect for solicited route \"%s\"", c.route.url)
go srv.reConnectToRoute(c.route.url)
} else if rtype != Implicit || retryImplicit {
Debugf("Attempting reconnect for solicited route \"%s\"", rurl)
go srv.reConnectToRoute(rurl, rtype)
}
}
}
Expand Down
13 changes: 13 additions & 0 deletions server/configs/seed.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2016 Apcera Inc. All rights reserved.

# Cluster Seed Node

port: 7222
net: '127.0.0.1'

http_port: 9222

cluster {
host: '127.0.0.1'
port: 7248
}
26 changes: 26 additions & 0 deletions server/configs/seed_tls.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Copyright 2016 Apcera Inc. All rights reserved.

# Cluster Seed Node

port: 7222
net: '127.0.0.1'

http_port: 9222

cluster {
host: '127.0.0.1'
port: 7248

tls {
# Route cert
cert_file: "../test/configs/certs/server-cert.pem"
# Private key
key_file: "../test/configs/certs/server-key.pem"
# Specified time for handshake to complete
timeout: 2

# Optional certificate authority verifying connected routes
# Required when we have self-signed CA, etc.
ca_file: "../test/configs/certs/ca.pem"
}
}
Loading

0 comments on commit 7c0a3b4

Please sign in to comment.