Skip to content

Commit

Permalink
[FIXED] Cluster's listener with IPv6
Browse files Browse the repository at this point in the history
Trying to use IPv6 address for the cluster host would fail.
Also, there were some unclosed channels in case of accept loop
setup failures.

Resolves nats-io#323
  • Loading branch information
kozlovic committed Aug 12, 2016
1 parent 62923be commit 3b84120
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 2 deletions.
6 changes: 4 additions & 2 deletions server/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,11 +584,13 @@ func (s *Server) broadcastUnSubscribe(sub *subscription) {
}

func (s *Server) routeAcceptLoop(ch chan struct{}) {
hp := fmt.Sprintf("%s:%d", s.opts.ClusterHost, s.opts.ClusterPort)
hp := net.JoinHostPort(s.opts.ClusterHost, strconv.Itoa(s.opts.ClusterPort))
Noticef("Listening for route connections on %s", hp)
l, e := net.Listen("tcp", hp)
if e != nil {
Fatalf("Error listening on router port: %d - %v", s.opts.Port, e)
// We need to close this channel to avoid a deadlock
close(ch)
Fatalf("Error listening on router port: %d - %v", s.opts.ClusterPort, e)
return
}

Expand Down
57 changes: 57 additions & 0 deletions server/routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ package server

import (
"fmt"
"net"
"net/url"
"reflect"
"testing"
"time"

"github.com/nats-io/nats"
"strconv"
)

func TestRouteConfig(t *testing.T) {
Expand Down Expand Up @@ -406,3 +408,58 @@ func TestRouteTLSHandshakeError(t *testing.T) {
t.Fatal("Route should have failed")
}
}

func TestBlockedShutdownOnRouteAcceptLoopFailure(t *testing.T) {
opts := DefaultOptions
opts.ClusterHost = "x.x.x.x"
opts.ClusterPort = 7222

s := New(&opts)
go s.Start()
// Wait a second
time.Sleep(time.Second)
ch := make(chan bool)
go func() {
s.Shutdown()
ch <- true
}()

timeout := time.NewTimer(5 * time.Second)
select {
case <-ch:
return
case <-timeout.C:
t.Fatal("Shutdown did not complete")
}
}

func TestRouteUseIPv6(t *testing.T) {
opts := DefaultOptions
opts.ClusterHost = "::"
opts.ClusterPort = 6222

// I believe that there is no IPv6 support on Travis...
// Regardless, cannot have this test fail simply because IPv6 is disabled
// on the host.
hp := net.JoinHostPort(opts.ClusterHost, strconv.Itoa(opts.ClusterPort))
_, err := net.ResolveTCPAddr("tcp", hp)
if err != nil {
t.Skipf("Skipping this test since there is no IPv6 support on this host: %v", err)
}

s := RunServer(&opts)
defer s.Shutdown()

routeUp := false
timeout := time.Now().Add(5 * time.Second)
for time.Now().Before(timeout) && !routeUp {
if s.GetRouteListenEndpoint() == "" {
time.Sleep(time.Second)
continue
}
routeUp = true
}
if !routeUp {
t.Fatal("Server failed to start route accept loop")
}
}
13 changes: 13 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,11 @@ func (s *Server) Start() {
Noticef("Starting nats-server version %s", VERSION)
Debugf("Go build version %s", s.info.GoVersion)

// Avoid RACE between Start() and Shutdown()
s.mu.Lock()
s.running = true
s.mu.Unlock()

s.grMu.Lock()
s.grRunning = true
s.grMu.Unlock()
Expand Down Expand Up @@ -342,6 +346,14 @@ func (s *Server) Shutdown() {

// AcceptLoop is exported for easier testing.
func (s *Server) AcceptLoop(clr chan struct{}) {
// If we were to exit before the listener is setup properly,
// make sure we close the channel.
defer func() {
if clr != nil {
close(clr)
}
}()

hp := net.JoinHostPort(s.opts.Host, strconv.Itoa(s.opts.Port))
Noticef("Listening for client connections on %s", hp)
l, e := net.Listen("tcp", hp)
Expand Down Expand Up @@ -384,6 +396,7 @@ func (s *Server) AcceptLoop(clr chan struct{}) {

// Let the caller know that we are ready
close(clr)
clr = nil

tmpDelay := ACCEPT_MIN_SLEEP

Expand Down
15 changes: 15 additions & 0 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,3 +171,18 @@ func TestGetConnectURLs(t *testing.T) {
t.Fatalf("Expected to get %v, got %v", expectedURL, urls[0])
}
}

func TestNoDeadlockOnStartFailure(t *testing.T) {
opts := DefaultOptions
opts.Host = "x.x.x.x" // bad host
opts.Port = 4222
opts.ClusterHost = "localhost"
opts.ClusterPort = 6222

s := New(&opts)
// This should return since it should fail to start a listener
// on x.x.x.x:4222
s.Start()
// We should be able to shutdown
s.Shutdown()
}

0 comments on commit 3b84120

Please sign in to comment.