Skip to content

Commit

Permalink
Moved tests over, reconnect and test cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
derekcollison committed Sep 17, 2015
1 parent 75a84ac commit e52a3ae
Show file tree
Hide file tree
Showing 11 changed files with 605 additions and 408 deletions.
30 changes: 0 additions & 30 deletions auth_test.go

This file was deleted.

111 changes: 84 additions & 27 deletions nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ const (
CONNECTED
CLOSED
RECONNECTING
CONNECTING
)

// ConnHandlers are used for asynchronous events such as
Expand Down Expand Up @@ -257,6 +258,7 @@ func (o Options) Connect() (*Conn, error) {
if nc.Opts.SubChanLen == 0 {
nc.Opts.SubChanLen = DefaultMaxChanLen
}

if err := nc.setupServerPool(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -400,13 +402,24 @@ func (nc *Conn) setupServerPool() error {
s := &srv{url: u}
nc.srvPool = append(nc.srvPool, s)
}

// Place default URL if pool is empty.
if len(nc.srvPool) <= 0 {
u, err := url.Parse(DefaultURL)
if err != nil {
return err
}
s := &srv{url: u}
nc.srvPool = append(nc.srvPool, s)
}

return nc.pickServer()
}

// createConn will connect to the server and wrap the appropriate
// bufio structures. It will do the right thing when an existing
// connection is in place.
func (nc *Conn) createConn() error {
func (nc *Conn) createConn() (err error) {
if nc.Opts.Timeout < 0 {
return ErrBadTimeout
}
Expand All @@ -415,9 +428,9 @@ func (nc *Conn) createConn() error {
} else {
cur.lastAttempt = time.Now()
}
nc.conn, nc.err = net.DialTimeout("tcp", nc.url.Host, nc.Opts.Timeout)
if nc.err != nil {
return nc.err
nc.conn, err = net.DialTimeout("tcp", nc.url.Host, nc.Opts.Timeout)
if err != nil {
return err
}

// No clue why, but this stalls and kills performance on Mac (Mavericks).
Expand Down Expand Up @@ -445,6 +458,8 @@ func (nc *Conn) makeTLSConn() {
func (nc *Conn) waitForExits() {
// Kick old flusher forcefully.
nc.fch <- true

// nc.fch <- true
// Wait for any previous go routines.
nc.wg.Wait()
}
Expand Down Expand Up @@ -516,8 +531,8 @@ func (nc *Conn) processConnectInit() error {
nc.mu.Lock()
nc.setup()

// Set our status to connected.
nc.status = CONNECTED
// Set our status to connecting.
nc.status = CONNECTING

// Make sure to process the INFO inline here.
if nc.err = nc.processExpectedInfo(); nc.err != nil {
Expand All @@ -526,7 +541,7 @@ func (nc *Conn) processConnectInit() error {
}
nc.mu.Unlock()

// We need these to process the sendConnect.
// We need these to process the sendConnect response.
go nc.spinUpSocketWatchers()

return nc.sendConnect()
Expand Down Expand Up @@ -652,7 +667,6 @@ func (nc *Conn) connectProto() (string, error) {
// applicable. Will wait for a flush to return from the server for error
// processing. The lock should not be held entering this function.
func (nc *Conn) sendConnect() error {

nc.mu.Lock()
cProto, err := nc.connectProto()
if err != nil {
Expand All @@ -664,7 +678,6 @@ func (nc *Conn) sendConnect() error {
nc.sendProto(cProto)

if err := nc.FlushTimeout(DefaultTimeout); err != nil {
nc.err = err
return err
}

Expand All @@ -674,7 +687,9 @@ func (nc *Conn) sendConnect() error {
if nc.isClosed() {
return nc.err
}
// This is where we are truly connected.
nc.status = CONNECTED

return nil
}

Expand Down Expand Up @@ -837,6 +852,9 @@ func (nc *Conn) doReconnect() {
cur.didConnect = true
cur.reconnects = 0

// Set our status to connecting.
nc.status = CONNECTING

// Process Connect logic
if nc.err = nc.processExpectedInfo(); nc.err == nil {
// Send our connect info as normal
Expand All @@ -845,14 +863,13 @@ func (nc *Conn) doReconnect() {
continue
}

// Set our status to connected.
nc.status = CONNECTED

nc.bw.WriteString(cProto)
// Send existing subscription state
nc.resendSubscriptions()
// Now send off and clear pending buffer
nc.flushReconnectPendingItems()
// This is where we are truly connected.
nc.status = CONNECTED

// Spin up socket watchers again
go nc.spinUpSocketWatchers()
Expand Down Expand Up @@ -944,6 +961,7 @@ func (nc *Conn) readLoop() {
nc.processOpErr(err)
break
}

if err := nc.parse(b[:n]); err != nil {
nc.processOpErr(err)
break
Expand Down Expand Up @@ -1051,7 +1069,7 @@ func (nc *Conn) processMsg(msg []byte) {
func (nc *Conn) processSlowConsumer(s *Subscription) {
nc.err = ErrSlowConsumer
if nc.Opts.AsyncErrorCB != nil && !s.sc {
go nc.Opts.AsyncErrorCB(nc, s, nc.err)
go nc.Opts.AsyncErrorCB(nc, s, ErrSlowConsumer)
}
s.sc = true
}
Expand All @@ -1062,20 +1080,27 @@ func (nc *Conn) flusher() {
// Release the wait group
defer nc.wg.Done()

// snapshot the bw and conn since they can change from underneath of us.
bw := nc.bw
conn := nc.conn

if conn == nil || bw == nil {
return
}

for {
if _, ok := <-nc.fch; !ok {
return
}
nc.mu.Lock()

// Check for closed or reconnecting
if nc.isClosed() || nc.isReconnecting() {
// Check to see if we should bail out.
if !nc.isConnected() || bw != nc.bw || conn != nc.conn {
nc.mu.Unlock()
return
}
b := nc.bw.Buffered()
if b > 0 && nc.conn != nil {
nc.err = nc.bw.Flush()
if bw.Buffered() > 0 {
nc.err = bw.Flush()
}
nc.mu.Unlock()
}
Expand Down Expand Up @@ -1140,8 +1165,11 @@ func (nc *Conn) processErr(e string) {
// kickFlusher will send a bool on a channel to kick the
// flush Go routine to flush data to the server.
func (nc *Conn) kickFlusher() {
if len(nc.fch) == 0 && nc.bw != nil {
nc.fch <- true
if nc.bw != nil {
select {
case nc.fch <- true:
default:
}
}
}

Expand Down Expand Up @@ -1204,18 +1232,21 @@ func (nc *Conn) publish(subj, reply string, data []byte) error {
msgh = append(msgh, _CRLF_...)

// FIXME, do deadlines here
if _, nc.err = nc.bw.Write(msgh); nc.err != nil {
if _, err := nc.bw.Write(msgh); err != nil {
defer nc.mu.Unlock()
return nc.err
nc.err = err
return err
}
if _, nc.err = nc.bw.Write(data); nc.err != nil {
if _, err := nc.bw.Write(data); err != nil {
defer nc.mu.Unlock()
return nc.err
nc.err = err
return err
}

if _, nc.err = nc.bw.WriteString(_CRLF_); nc.err != nil {
if _, err := nc.bw.WriteString(_CRLF_); err != nil {
defer nc.mu.Unlock()
return nc.err
nc.err = err
return err
}

nc.OutMsgs += 1
Expand Down Expand Up @@ -1456,6 +1487,16 @@ func (s *Subscription) NextMsg(timeout time.Duration) (msg *Msg, err error) {
return
}

// Queued returns the number of queued messages in the client for this subscription.
func (s *Subscription) QueuedMsgs() (int, error) {
s.mu.Lock()
defer s.mu.Unlock()
if s.conn == nil {
return -1, ErrBadSubscription
}
return len(s.mch), nil
}

// FIXME: This is a hack
// removeFlushEntry is needed when we need to discard queued up responses
// for our pings as part of a flush call. This happens when we have a flush
Expand All @@ -1479,7 +1520,8 @@ func (nc *Conn) removeFlushEntry(ch chan bool) bool {
func (nc *Conn) sendPing(ch chan bool) {
nc.pongs = append(nc.pongs, ch)
nc.bw.WriteString(pingProto)
nc.kickFlusher()
// Flush in place.
nc.bw.Flush()
}

func (nc *Conn) processPingTimer() {
Expand Down Expand Up @@ -1547,6 +1589,16 @@ func (nc *Conn) Flush() error {
return nc.FlushTimeout(60 * time.Second)
}

// Buffered will return the number of bytes buffered to be sent to the server.
func (nc *Conn) Buffered() (int, error) {
nc.mu.Lock()
defer nc.mu.Unlock()
if nc.isClosed() || nc.bw == nil {
return -1, ErrConnectionClosed
}
return nc.bw.Buffered(), nil
}

// resendSubscriptions will send our subscription state back to the
// server. Used in reconnects
func (nc *Conn) resendSubscriptions() {
Expand Down Expand Up @@ -1678,6 +1730,11 @@ func (nc *Conn) isReconnecting() bool {
return nc.status == RECONNECTING
}

// Test if Conn is connected or connecting.
func (nc *Conn) isConnected() bool {
return nc.status == CONNECTING || nc.status == CONNECTED
}

// Stats will return a race safe copy of the Statistics section for the connection.
func (nc *Conn) Stats() Statistics {
nc.mu.Lock()
Expand Down
35 changes: 35 additions & 0 deletions test/auth_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package test

import (
"testing"

"github.com/nats-io/gnatsd/auth"
gnatsd "github.com/nats-io/gnatsd/test"
"github.com/nats-io/nats"
)

func TestAuth(t *testing.T) {
opts := gnatsd.DefaultTestOptions
opts.Port = 8232
s := RunServerWithOptions(opts)

// Auth is pluggable, so need to set here..
auth := &auth.Plain{
Username: "derek",
Password: "foo",
}
s.SetAuthMethod(auth)

defer s.Shutdown()

_, err := nats.Connect("nats://localhost:8232")
if err == nil {
t.Fatal("Should have received an error while trying to connect")
}

nc, err := nats.Connect("nats://derek:foo@localhost:8232")
if err != nil {
t.Fatal("Should have connected successfully")
}
nc.Close()
}
Loading

0 comments on commit e52a3ae

Please sign in to comment.