Skip to content

Commit

Permalink
Fixed race condition on reconnects that prevented reconnectCBs
Browse files Browse the repository at this point in the history
  • Loading branch information
derekcollison committed Oct 15, 2013
1 parent 3e58567 commit 1e360f4
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 12 deletions.
2 changes: 1 addition & 1 deletion example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func ExampleEncodedConn_BindRecvChan() {
c.Publish("hello", me)

// Receive the publish directly on a channel
who := <- ch
who := <-ch

fmt.Printf("%v says hello!\n", who)
}
1 change: 0 additions & 1 deletion gob_enc.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ type GobEncoder struct {

// FIXME(dlc) - This could probably be more efficient.


func (ge *GobEncoder) Encode(subject string, v interface{}) ([]byte, error) {
b := new(bytes.Buffer)
enc := gob.NewEncoder(b)
Expand Down
1 change: 0 additions & 1 deletion gob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,3 @@ func TestGobMarshalStruct(t *testing.T) {
t.Fatal("Did not receive the message")
}
}

24 changes: 18 additions & 6 deletions nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
)

const (
Version = "0.86"
Version = "0.88"
DefaultURL = "nats://localhost:4222"
DefaultPort = 4222
DefaultMaxReconnect = 10
Expand Down Expand Up @@ -415,6 +415,8 @@ func (nc *Conn) makeTLSConn() {
// We also use a WaitGroup to make sure we only start them on a
// reconnect when the precious ones have exited.
func (nc *Conn) spinUpSocketWatchers() {
// Kick old flusher forcefully.
nc.fch <- true
// Wait for any previous ones.
nc.wg.Wait()
// We will wait on both.
Expand All @@ -424,6 +426,17 @@ func (nc *Conn) spinUpSocketWatchers() {
go nc.flusher()
}

// reSpinUpSocketWatchers will handle respinning up the socket watchers
// on a reconnect. This is called from doReconnect and is in its own Go
// routine.
func (nc *Conn) reSpinUpSocketWatchers() {
nc.spinUpSocketWatchers()
nc.mu.Lock()
// Assume the best for status.
nc.status = CONNECTED
nc.mu.Unlock()
}

// Report the connected server's Url
func (nc *Conn) ConnectedUrl() string {
nc.mu.Lock()
Expand Down Expand Up @@ -705,7 +718,6 @@ func (nc *Conn) flushReconnectPendingItems() {
// Try to reconnect using the option parameters.
// This function assumes we are allowed to reconnect.
func (nc *Conn) doReconnect() {

// Hold the lock manually and release where needed below,
// can't do defer here.
nc.mu.Lock()
Expand Down Expand Up @@ -750,10 +762,9 @@ func (nc *Conn) doReconnect() {

// Process Connect logic
if nc.err = nc.processExpectedInfo(); nc.err == nil {
// Assume the best
nc.status = CONNECTED
// Spin up socket watchers again
go nc.spinUpSocketWatchers()
go nc.reSpinUpSocketWatchers()

// Send our connect info as normal
cProto, _ := nc.connectProto()
nc.bw.WriteString(cProto)
Expand Down Expand Up @@ -839,6 +850,7 @@ func (nc *Conn) readLoop() {
if sb || conn == nil {
break
}

n, err := conn.Read(b)
if err != nil {
nc.processOpErr(err) // FIXME.
Expand Down Expand Up @@ -957,7 +969,6 @@ func (nc *Conn) processSlowConsumer(s *Subscription) {
// flusher is a separate Go routine that will process flush requests for the write
// bufio. This allows coalescing of writes to the underlying socket.
func (nc *Conn) flusher() {

// Release the wait group
defer nc.wg.Done()

Expand All @@ -966,6 +977,7 @@ func (nc *Conn) flusher() {
return
}
nc.mu.Lock()

// Check for closed or reconnecting
if nc.IsClosed() || nc.isReconnecting() {
nc.mu.Unlock()
Expand Down
4 changes: 2 additions & 2 deletions netchan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ func TestSimpleRecvChan(t *testing.T) {

// Receive from 'foo'
select {
case num := <- ch:
case num := <-ch:
if num != numSent {
t.Fatalf("Failed to receive correct value: %d vs %d\n", num, numSent)
}
case <-time.After(1*time.Second):
case <-time.After(1 * time.Second):
t.Fatalf("Failed to receive a value, timed-out\n")
}
close(ch)
Expand Down
1 change: 0 additions & 1 deletion reconnect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ func TestReconnectAllowedFlags(t *testing.T) {
// clear the CloseCB since ch will block
nc.Opts.ClosedCB = nil
nc.Close()

}

var reconnectOpts = Options{
Expand Down

0 comments on commit 1e360f4

Please sign in to comment.