Skip to content

Commit

Permalink
Merge pull request nats-io#314 from nats-io/server_send_async_info
Browse files Browse the repository at this point in the history
[ADDED] Server sends INFO with cluster URLs to clients with support
  • Loading branch information
derekcollison authored Aug 1, 2016
2 parents 1ac073d + 6f9d542 commit e0bb798
Show file tree
Hide file tree
Showing 9 changed files with 552 additions and 43 deletions.
1 change: 1 addition & 0 deletions TODO.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@

# General

- [ ] Auth for queue groups?
- [ ] Blacklist or ERR escalation to close connection for auth/permissions
- [ ] Protocol updates, MAP, MPUB, etc
- [ ] Multiple listen endpoints
Expand Down
119 changes: 111 additions & 8 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,23 @@ import (
"time"
)

// Type of client connection.
const (
// CLIENT is an end user.
CLIENT = iota
// ROUTER is another router in the cluster.
ROUTER
)

const (
// Original Client protocol from 2009.
// http://nats.io/documentation/internals/nats-protocol/
ClientProtoZero = iota
// This signals a client can receive more then the original INFO block.
// This can be used to update clients on other cluster members, etc.
ClientProtoInfo
)

func init() {
rand.Seed(time.Now().UnixNano())
}
Expand All @@ -30,14 +47,42 @@ const (
maxBufSize = 65536
)

// Type of client
// Represent client booleans with a bitmask
type clientFlag byte

// Some client state represented as flags
const (
// CLIENT is an end user.
CLIENT = iota
// ROUTER is another router in the cluster.
ROUTER
connectReceived clientFlag = 1 << iota // The CONNECT proto has been received
firstPongSent // The first PONG has been sent
infoUpdated // The server's Info object has changed before first PONG was sent
)

// set the flag (would be equivalent to set the boolean to true)
func (cf *clientFlag) set(c clientFlag) {
*cf |= c
}

// isSet returns true if the flag is set, false otherwise
func (cf clientFlag) isSet(c clientFlag) bool {
return cf&c != 0
}

// setIfNotSet will set the flag `c` only if that flag was not already
// set and return true to indicate that the flag has been set. Returns
// false otherwise.
func (cf *clientFlag) setIfNotSet(c clientFlag) bool {
if *cf&c == 0 {
*cf |= c
return true
}
return false
}

// clear unset the flag (would be equivalent to set the boolean to false)
func (cf *clientFlag) clear(c clientFlag) {
*cf &= ^c
}

type client struct {
// Here first because of use of atomics, and memory alignment.
stats
Expand Down Expand Up @@ -67,6 +112,8 @@ type client struct {
route *route
debug bool
trace bool

flags clientFlag // Compact booleans into a single field. Size will be increased when needed.
}

type permissions struct {
Expand Down Expand Up @@ -118,6 +165,7 @@ type clientOpts struct {
Name string `json:"name"`
Lang string `json:"lang"`
Version string `json:"version"`
Protocol int `json:"protocol"`
}

var defaultOpts = clientOpts{Verbose: true, Pedantic: true}
Expand Down Expand Up @@ -371,28 +419,54 @@ func (c *client) processConnect(arg []byte) error {
typ := c.typ
r := c.route
srv := c.srv
c.mu.Unlock()

// Moved unmarshalling of clients' Options under the lock.
// The client has already been added to the server map, so it is possible
// that other routines lookup the client, and access its options under
// the client's lock, so unmarshalling the options outside of the lock
// would cause data RACEs.
if err := json.Unmarshal(arg, &c.opts); err != nil {
c.mu.Unlock()
return err
}
// Indicate that the CONNECT protocol has been received, and that the
// server now knows which protocol this client supports.
c.flags.set(connectReceived)
// Capture these under lock
proto := c.opts.Protocol
verbose := c.opts.Verbose
c.mu.Unlock()

if srv != nil {
// As soon as c.opts is unmarshalled and if the proto is at
// least ClientProtoInfo, we need to increment the following counter.
// This is decremented when client is removed from the server's
// clients map.
if proto >= ClientProtoInfo {
srv.mu.Lock()
srv.cproto++
srv.mu.Unlock()
}

// Check for Auth
if ok := srv.checkAuth(c); !ok {
c.authViolation()
return ErrAuthorization
}
}

// Check client protocol request if it exists.
if typ == CLIENT && (proto < ClientProtoZero || proto > ClientProtoInfo) {
return ErrBadClientProtocol
}

// Grab connection name of remote route.
if typ == ROUTER && r != nil {
c.mu.Lock()
c.route.remoteID = c.opts.Name
c.mu.Unlock()
}

if c.opts.Verbose {
if verbose {
c.sendOK()
}
return nil
Expand Down Expand Up @@ -449,12 +523,15 @@ func (c *client) sendInfo(info []byte) {

func (c *client) sendErr(err string) {
c.mu.Lock()
c.traceOutOp("-ERR", []byte(err))
c.sendProto([]byte(fmt.Sprintf("-ERR '%s'\r\n", err)), true)
c.mu.Unlock()
}

func (c *client) sendOK() {
c.mu.Lock()
c.traceOutOp("OK", nil)
// Can not autoflush this one, needs to be async.
c.sendProto([]byte("+OK\r\n"), false)
c.pcd[c] = needFlush
c.mu.Unlock()
Expand All @@ -473,7 +550,33 @@ func (c *client) processPing() {
c.clearConnection()
c.Debugf("Error on Flush, error %s", err.Error())
}
srv := c.srv
sendUpdateINFO := false
// Check if this is the first PONG, if so...
if c.flags.setIfNotSet(firstPongSent) {
// Check if server should send an async INFO protocol to the client
if c.opts.Protocol >= ClientProtoInfo &&
srv != nil && c.flags.isSet(infoUpdated) {
sendUpdateINFO = true
}
// We can now clear the flag
c.flags.clear(infoUpdated)
}
c.mu.Unlock()

// Some clients send an initial PING as part of the synchronous connect process.
// They can't be receiving anything until the first PONG is received.
// So we delay the possible updated INFO after this point.
if sendUpdateINFO {
srv.mu.Lock()
// Use the cached protocol
proto := srv.infoJSON
srv.mu.Unlock()

c.mu.Lock()
c.sendInfo(proto)
c.mu.Unlock()
}
}

func (c *client) processPong() {
Expand Down
44 changes: 44 additions & 0 deletions server/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"time"

"crypto/tls"

"github.com/nats-io/nats"
)

Expand Down Expand Up @@ -167,6 +168,49 @@ func TestClientConnect(t *testing.T) {
}
}

func TestClientConnectProto(t *testing.T) {
_, c, _ := setupClient()

// Basic Connect setting flags, proto should be zero (original proto)
connectOp := []byte("CONNECT {\"verbose\":true,\"pedantic\":true,\"ssl_required\":false}\r\n")
err := c.parse(connectOp)
if err != nil {
t.Fatalf("Received error: %v\n", err)
}
if c.state != OP_START {
t.Fatalf("Expected state of OP_START vs %d\n", c.state)
}
if !reflect.DeepEqual(c.opts, clientOpts{Verbose: true, Pedantic: true, Protocol: ClientProtoZero}) {
t.Fatalf("Did not parse connect options correctly: %+v\n", c.opts)
}

// ProtoInfo
connectOp = []byte(fmt.Sprintf("CONNECT {\"verbose\":true,\"pedantic\":true,\"ssl_required\":false,\"protocol\":%d}\r\n", ClientProtoInfo))
err = c.parse(connectOp)
if err != nil {
t.Fatalf("Received error: %v\n", err)
}
if c.state != OP_START {
t.Fatalf("Expected state of OP_START vs %d\n", c.state)
}
if !reflect.DeepEqual(c.opts, clientOpts{Verbose: true, Pedantic: true, Protocol: ClientProtoInfo}) {
t.Fatalf("Did not parse connect options correctly: %+v\n", c.opts)
}
if c.opts.Protocol != ClientProtoInfo {
t.Fatalf("Protocol should have been set to %v, but is set to %v", ClientProtoInfo, c.opts.Protocol)
}

// Illegal Option
connectOp = []byte("CONNECT {\"protocol\":22}\r\n")
err = c.parse(connectOp)
if err == nil {
t.Fatalf("Expected to receive an error\n")
}
if err != ErrBadClientProtocol {
t.Fatalf("Expected err of %q, got %q\n", ErrBadClientProtocol, err)
}
}

func TestClientPing(t *testing.T) {
_, c, cr := setupClient()

Expand Down
3 changes: 3 additions & 0 deletions server/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,7 @@ var (

// ErrReservedPublishSubject represents an error condition when sending to a reserved subject, e.g. _SYS.>
ErrReservedPublishSubject = errors.New("Reserved Internal Subject")

// ErrBadClientProtocol signals a client requested an invalud client protocol.
ErrBadClientProtocol = errors.New("Invalid Client Protocol")
)
83 changes: 73 additions & 10 deletions server/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,63 @@ func (c *client) processRouteInfo(info *Info) {
// Now let the known servers know about this new route
s.forwardNewRouteInfoToKnownServers(info)
}
// If the server Info did not have these URLs, update and send an INFO
// protocol to all clients that support it.
if s.updateServerINFO(info.ClientConnectURLs) {
s.sendAsyncInfoToClients()
}
} else {
c.Debugf("Detected duplicate remote route %q", info.ID)
c.closeConnection()
}
}

// sendAsyncInfoToClients sends an INFO protocol to all
// connected clients that accept async INFO updates.
func (s *Server) sendAsyncInfoToClients() {
s.mu.Lock()
// If there are no clients supporting async INFO protocols, we are done.
if s.cproto == 0 {
s.mu.Unlock()
return
}

// Capture under lock
proto := s.infoJSON

// Make a copy of ALL clients so we can release server lock while
// sending the protocol to clients. We could check the conditions
// (proto support, first PONG sent) here and so have potentially
// a limited number of clients, but that would mean grabbing the
// client's lock here, which we don't want since we would still
// need it in the second loop.
clients := make([]*client, 0, len(s.clients))
for _, c := range s.clients {
clients = append(clients, c)
}
s.mu.Unlock()

for _, c := range clients {
c.mu.Lock()
// If server did not yet receive the CONNECT protocol, check later
// when sending the first PONG.
if !c.flags.isSet(connectReceived) {
c.flags.set(infoUpdated)
} else if c.opts.Protocol >= ClientProtoInfo {
// Send only if first PONG was sent
if c.flags.isSet(firstPongSent) {
// sendInfo takes care of checking if the connection is still
// valid or not, so don't duplicate tests here.
c.sendInfo(proto)
} else {
// Otherwise, notify that INFO has changed and check later.
c.flags.set(infoUpdated)
}
}
c.mu.Unlock()
}
}

// This will process implicit route information received from another server.
// We will check to see if we have configured or are already connected,
// and if so we will ignore. Otherwise we will attempt to connect.
Expand Down Expand Up @@ -579,19 +630,31 @@ func (s *Server) routeAcceptLoop(ch chan struct{}) {

// StartRouting will start the accept loop on the cluster host:port
// and will actively try to connect to listed routes.
func (s *Server) StartRouting() {
func (s *Server) StartRouting(clientListenReady chan struct{}) {
defer s.grWG.Done()

// Wait for the client listen port to be opened, and
// the possible ephemeral port to be selected.
<-clientListenReady

// Get all possible URLs (when server listens to 0.0.0.0).
// This is going to be sent to other Servers, so that they can let their
// clients know about us.
clientConnectURLs := s.getClientConnectURLs()

// Check for TLSConfig
tlsReq := s.opts.ClusterTLSConfig != nil
info := Info{
ID: s.info.ID,
Version: s.info.Version,
Host: s.opts.ClusterHost,
Port: s.opts.ClusterPort,
AuthRequired: false,
TLSRequired: tlsReq,
SSLRequired: tlsReq,
TLSVerify: tlsReq,
MaxPayload: s.info.MaxPayload,
ID: s.info.ID,
Version: s.info.Version,
Host: s.opts.ClusterHost,
Port: s.opts.ClusterPort,
AuthRequired: false,
TLSRequired: tlsReq,
SSLRequired: tlsReq,
TLSVerify: tlsReq,
MaxPayload: s.info.MaxPayload,
ClientConnectURLs: clientConnectURLs,
}
// Check for Auth items
if s.opts.ClusterUsername != "" {
Expand Down
Loading

0 comments on commit e0bb798

Please sign in to comment.