Skip to content

Commit

Permalink
Major updates + support for config reload of client/cluster advertise
Browse files Browse the repository at this point in the history
  • Loading branch information
kozlovic committed Feb 6, 2018
1 parent 306a3f9 commit acf4a31
Show file tree
Hide file tree
Showing 13 changed files with 547 additions and 105 deletions.
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ Server Options:
-ms,--https_port <port> Use port for https monitoring
-c, --config <file> Configuration file
-sl,--signal <signal>[=<pid>] Send signal to gnatsd process (stop, quit, reopen, reload)
--client_advertise <string> Client URL to advertise to other servers
Logging Options:
-l, --log <file> File to redirect log output
Expand All @@ -178,6 +179,7 @@ Cluster Options:
--routes <rurl-1, rurl-2> Routes to solicit and connect
--cluster <cluster-url> Cluster URL for solicited routes
--no_advertise <bool> Advertise known cluster IPs to clients
--cluster_advertise <string> Cluster URL to advertise to other servers
--connect_retries <number> For implicit routes, number of connect retries
Expand Down Expand Up @@ -286,6 +288,23 @@ The `--routes` flag specifies the NATS URL for one or more servers in the cluste

Previous releases required you to build the complete mesh using the `--routes` flag. To define your cluster in the current release, please follow the "Basic example" as described below.

Suppose that server srvA is connected to server srvB. A bi-directional route exists between srvA and srvB. A new server, srvC, connects to srvA.<br>
When accepting the connection, srvA will gossip the address of srvC to srvB so that srvB connects to srvC, completing the full mesh.<br>
The URL that srvB will use to connect to srvC is the result of the TCP remote address that srvA got from its connection to srvC.

It is possible to advertise with `--cluster_advertise` a different address than the one used in `--cluster`.

In the previous example, if srvC uses a `--cluster_adertise` URL, this is what srvA will gossip to srvB in order to connect to srvC.

NOTE: The advertise address should really result in a connection to srvC. Providing an address that would result in a connection to a different NATS Server would prevent the formation of a full-mesh cluster!

As part of the gossip protocol, a server will also send to the other servers the URL clients should connect to.<br>
The URL is the one defined in the `listen` parameter, or, if 0.0.0.0 or :: is specified, the resolved non-local IP addresses for the "any" interface.

If those addresses are not reacheable from the outside world where the clients are running, the administrator can use the `--no_advertise` option to disable servers gossiping those URLs.<br>
Another option is to provide a `--client_advertise` URL to use instead. If this option is specified (and advertise has not been disabled), then the server will advertise this URL to other servers instead of its `listen` address (or resolved IPs when listen is 0.0.0.0 or ::).


### Basic example

NATS makes building the full mesh easy. Simply designate a server to be a *seed* server. All other servers in the cluster simply specify the *seed* server as its server's routes option as indicated below.
Expand Down
2 changes: 2 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Server Options:
-ms,--https_port <port> Use port for https monitoring
-c, --config <file> Configuration file
-sl,--signal <signal>[=<pid>] Send signal to gnatsd process (stop, quit, reopen, reload)
--client_advertise <string> Client URL to advertise to other servers
Logging Options:
-l, --log <file> File to redirect log output
Expand All @@ -47,6 +48,7 @@ Cluster Options:
--routes <rurl-1, rurl-2> Routes to solicit and connect
--cluster <cluster-url> Cluster URL for solicited routes
--no_advertise <bool> Advertise known cluster IPs to clients
--cluster_advertise <string> Cluster URL to advertise to other servers
--connect_retries <number> For implicit routes, number of connect retries
Expand Down
20 changes: 17 additions & 3 deletions server/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"runtime"
"strings"
"sync"
"testing"

"github.com/nats-io/gnatsd/logger"
Expand Down Expand Up @@ -64,28 +65,41 @@ func TestSetLogger(t *testing.T) {
}

type DummyLogger struct {
sync.Mutex
msg string
}

func (dl *DummyLogger) checkContent(t *testing.T, expectedStr string) {
if dl.msg != expectedStr {
stackFatalf(t, "Expected log to be: %v, got %v", expectedStr, dl.msg)
func (l *DummyLogger) checkContent(t *testing.T, expectedStr string) {
l.Lock()
defer l.Unlock()
if l.msg != expectedStr {
stackFatalf(t, "Expected log to be: %v, got %v", expectedStr, l.msg)
}
}

func (l *DummyLogger) Noticef(format string, v ...interface{}) {
l.Lock()
defer l.Unlock()
l.msg = fmt.Sprintf(format, v...)
}
func (l *DummyLogger) Errorf(format string, v ...interface{}) {
l.Lock()
defer l.Unlock()
l.msg = fmt.Sprintf(format, v...)
}
func (l *DummyLogger) Fatalf(format string, v ...interface{}) {
l.Lock()
defer l.Unlock()
l.msg = fmt.Sprintf(format, v...)
}
func (l *DummyLogger) Debugf(format string, v ...interface{}) {
l.Lock()
defer l.Unlock()
l.msg = fmt.Sprintf(format, v...)
}
func (l *DummyLogger) Tracef(format string, v ...interface{}) {
l.Lock()
defer l.Unlock()
l.msg = fmt.Sprintf(format, v...)
}

Expand Down
13 changes: 8 additions & 5 deletions server/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type ClusterOpts struct {
TLSTimeout float64 `json:"-"`
TLSConfig *tls.Config `json:"-"`
ListenStr string `json:"-"`
RouteAdvertise string `json:"-"`
Advertise string `json:"-"`
NoAdvertise bool `json:"-"`
ConnectRetries int `json:"-"`
}
Expand Down Expand Up @@ -391,8 +391,8 @@ func parseCluster(cm map[string]interface{}, opts *Options) error {
opts.Cluster.TLSConfig.ClientAuth = tls.RequireAndVerifyClientCert
opts.Cluster.TLSConfig.RootCAs = opts.Cluster.TLSConfig.ClientCAs
opts.Cluster.TLSTimeout = tc.Timeout
case "route_advertise":
opts.Cluster.RouteAdvertise = mv.(string)
case "cluster_advertise", "advertise":
opts.Cluster.Advertise = mv.(string)
case "no_advertise":
opts.Cluster.NoAdvertise = mv.(bool)
case "connect_retries":
Expand Down Expand Up @@ -768,6 +768,9 @@ func MergeOptions(fileOpts, flagOpts *Options) *Options {
if flagOpts.Cluster.ConnectRetries != 0 {
opts.Cluster.ConnectRetries = flagOpts.Cluster.ConnectRetries
}
if flagOpts.Cluster.Advertise != "" {
opts.Cluster.Advertise = flagOpts.Cluster.Advertise
}
if flagOpts.RoutesStr != "" {
mergeRoutes(&opts, flagOpts)
}
Expand Down Expand Up @@ -951,7 +954,7 @@ func ConfigureOptions(fs *flag.FlagSet, args []string, printVersion, printHelp,
fs.StringVar(&opts.Host, "addr", "", "Network host to listen on.")
fs.StringVar(&opts.Host, "a", "", "Network host to listen on.")
fs.StringVar(&opts.Host, "net", "", "Network host to listen on.")
fs.StringVar(&opts.ClientAdvertise, "client_advertise", "", "Client url for discovered servers.")
fs.StringVar(&opts.ClientAdvertise, "client_advertise", "", "Client URL to advertise to other servers.")
fs.BoolVar(&opts.Debug, "D", false, "Enable Debug logging.")
fs.BoolVar(&opts.Debug, "debug", false, "Enable Debug logging.")
fs.BoolVar(&opts.Trace, "V", false, "Enable Trace logging.")
Expand Down Expand Up @@ -984,7 +987,7 @@ func ConfigureOptions(fs *flag.FlagSet, args []string, printVersion, printHelp,
fs.StringVar(&opts.RoutesStr, "routes", "", "Routes to actively solicit a connection.")
fs.StringVar(&opts.Cluster.ListenStr, "cluster", "", "Cluster url from which members can solicit routes.")
fs.StringVar(&opts.Cluster.ListenStr, "cluster_listen", "", "Cluster url from which members can solicit routes.")
fs.StringVar(&opts.Cluster.RouteAdvertise, "route_advertise", "", "Cluster url(s) for discovered servers.")
fs.StringVar(&opts.Cluster.Advertise, "cluster_advertise", "", "Cluster URL to advertise to other servers.")
fs.BoolVar(&opts.Cluster.NoAdvertise, "no_advertise", false, "Advertise known cluster IPs to clients.")
fs.IntVar(&opts.Cluster.ConnectRetries, "connect_retries", 0, "For implicit routes, number of connect retries")
fs.BoolVar(&showTLSHelp, "help_tls", false, "TLS help.")
Expand Down
45 changes: 44 additions & 1 deletion server/reload.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func (c *clusterOption) Apply(server *Server) {
server.routeInfo.SSLRequired = tlsRequired
server.routeInfo.TLSVerify = tlsRequired
server.routeInfo.AuthRequired = c.newValue.Username != ""
server.generateRouteInfoJSON()
server.setRouteInfoHostPortAndIP()
server.mu.Unlock()
server.Noticef("Reloaded: cluster")
}
Expand Down Expand Up @@ -407,6 +407,20 @@ func (w *writeDeadlineOption) Apply(server *Server) {
server.Noticef("Reloaded: write_deadline = %s", w.newValue)
}

// clientAdvertiseOption implements the option interface for the `client_advertise` setting.
type clientAdvertiseOption struct {
noopOption
newValue string
}

// Apply the setting by updating the server info and regenerate the infoJSON byte array.
func (c *clientAdvertiseOption) Apply(server *Server) {
server.mu.Lock()
server.setInfoHostPortAndGenerateJSON()
server.mu.Unlock()
server.Noticef("Reload: client_advertise = %s", c.newValue)
}

// Reload reads the current configuration file and applies any supported
// changes. This returns an error if the server was not started with a config
// file or an option which doesn't support hot-swapping was changed.
Expand All @@ -422,11 +436,25 @@ func (s *Server) Reload() error {
// TODO: Dump previous good config to a .bak file?
return err
}
clientOrgPort := s.clientActualPort
clusterOrgPort := s.clusterActualPort
s.mu.Unlock()

// Apply flags over config file settings.
newOpts = MergeOptions(newOpts, FlagSnapshot)
processOptions(newOpts)

// processOptions sets Port to 0 if set to -1 (RANDOM port)
// If that's the case, set it to the saved value when the accept loop was
// created.
if newOpts.Port == 0 {
newOpts.Port = clientOrgPort
}
// We don't do that for cluster, so check against -1.
if newOpts.Cluster.Port == -1 {
newOpts.Cluster.Port = clusterOrgPort
}

if err := s.reloadOptions(newOpts); err != nil {
return err
}
Expand Down Expand Up @@ -518,6 +546,15 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) {
diffOpts = append(diffOpts, &maxPingsOutOption{newValue: newValue.(int)})
case "writedeadline":
diffOpts = append(diffOpts, &writeDeadlineOption{newValue: newValue.(time.Duration)})
case "clientadvertise":
cliAdv := newValue.(string)
if cliAdv != "" {
// Validate ClientAdvertise syntax
if _, _, err := parseHostPort(cliAdv, 0); err != nil {
return nil, fmt.Errorf("invalid ClientAdvertise value of %s, err=%v", cliAdv, err)
}
}
diffOpts = append(diffOpts, &clientAdvertiseOption{newValue: cliAdv})
case "nolog":
// Ignore NoLog option since it's not parsed and only used in
// testing.
Expand Down Expand Up @@ -612,6 +649,12 @@ func validateClusterOpts(old, new ClusterOpts) error {
return fmt.Errorf("Config reload not supported for cluster port: old=%d, new=%d",
old.Port, new.Port)
}
// Validate Cluster.Advertise syntax
if new.Advertise != "" {
if _, _, err := parseHostPort(new.Advertise, 0); err != nil {
return fmt.Errorf("invalid Cluster.Advertise value of %s, err=%v", new.Advertise, err)
}
}
return nil
}

Expand Down
Loading

0 comments on commit acf4a31

Please sign in to comment.