Skip to content

Commit

Permalink
lint fixups
Browse files Browse the repository at this point in the history
  • Loading branch information
derekcollison committed Jan 16, 2016
1 parent 48bb165 commit 9e99c31
Show file tree
Hide file tree
Showing 15 changed files with 65 additions and 43 deletions.
7 changes: 4 additions & 3 deletions enc.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Encoder interface {
var encMap map[string]Encoder
var encLock sync.Mutex

// Indexe names into the Registered Encoders.
const (
JSON_ENCODER = "json"
GOB_ENCODER = "gob"
Expand All @@ -31,9 +32,9 @@ const (
func init() {
encMap = make(map[string]Encoder)
// Register json, gob and default encoder
RegisterEncoder("json", &JsonEncoder{})
RegisterEncoder("gob", &GobEncoder{})
RegisterEncoder("default", &DefaultEncoder{})
RegisterEncoder(JSON_ENCODER, &JsonEncoder{})
RegisterEncoder(GOB_ENCODER, &GobEncoder{})
RegisterEncoder(DEFAULT_ENCODER, &DefaultEncoder{})
}

// EncodedConn are the preferred way to interface with NATS. They wrap a bare connection to
Expand Down
4 changes: 3 additions & 1 deletion encoders/builtin/default_enc.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"unsafe"
)

// A Default Encoder implementation for EncodedConn.
// DefaultEncoder implementation for EncodedConn.
// This encoder will leave []byte and string untouched, but will attempt to
// turn numbers into appropriate strings that can be decoded. It will also
// propely encoded and decode bools. If will encode a struct, but if you want
Expand All @@ -23,6 +23,7 @@ var trueB = []byte("true")
var falseB = []byte("false")
var nilB = []byte("")

// Encode
func (je *DefaultEncoder) Encode(subject string, v interface{}) ([]byte, error) {
switch arg := v.(type) {
case string:
Expand All @@ -45,6 +46,7 @@ func (je *DefaultEncoder) Encode(subject string, v interface{}) ([]byte, error)
}
}

// Decode
func (je *DefaultEncoder) Decode(subject string, data []byte, vPtr interface{}) error {
// Figure out what it's pointing to...
sData := *(*string)(unsafe.Pointer(&data))
Expand Down
4 changes: 3 additions & 1 deletion encoders/builtin/gob_enc.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"encoding/gob"
)

// A Go specific GOB Encoder implementation for EncodedConn
// GobEncoder is a Go specific GOB Encoder implementation for EncodedConn.
// This encoder will use the builtin encoding/gob to Marshal
// and Unmarshal most types, including structs.
type GobEncoder struct {
Expand All @@ -16,6 +16,7 @@ type GobEncoder struct {

// FIXME(dlc) - This could probably be more efficient.

// Encode
func (ge *GobEncoder) Encode(subject string, v interface{}) ([]byte, error) {
b := new(bytes.Buffer)
enc := gob.NewEncoder(b)
Expand All @@ -25,6 +26,7 @@ func (ge *GobEncoder) Encode(subject string, v interface{}) ([]byte, error) {
return b.Bytes(), nil
}

// Decode
func (ge *GobEncoder) Decode(subject string, data []byte, vPtr interface{}) (err error) {
dec := gob.NewDecoder(bytes.NewBuffer(data))
err = dec.Decode(vPtr)
Expand Down
4 changes: 3 additions & 1 deletion encoders/builtin/json_enc.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ import (
"strings"
)

// A JSON Encoder implementation for EncodedConn
// JsonEncoder is a JSON Encoder implementation for EncodedConn.
// This encoder will use the builtin encoding/json to Marshal
// and Unmarshal most types, including structs.
type JsonEncoder struct {
// Empty
}

// Encode
func (je *JsonEncoder) Encode(subject string, v interface{}) ([]byte, error) {
b, err := json.Marshal(v)
if err != nil {
Expand All @@ -22,6 +23,7 @@ func (je *JsonEncoder) Encode(subject string, v interface{}) ([]byte, error) {
return b, nil
}

// Decode
func (je *JsonEncoder) Decode(subject string, data []byte, vPtr interface{}) (err error) {
switch arg := vPtr.(type) {
case *string:
Expand Down
5 changes: 4 additions & 1 deletion encoders/protobuf/protobuf_enc.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/nats-io/nats"
)

// Additional index for registered Encoders.
const (
PROTOBUF_ENCODER = "protobuf"
)
Expand All @@ -18,7 +19,7 @@ func init() {
nats.RegisterEncoder(PROTOBUF_ENCODER, &ProtobufEncoder{})
}

// A protobuf Encoder implementation for EncodedConn
// ProtobufEncoder is a protobuf implementation for EncodedConn
// This encoder will use the builtin protobuf lib to Marshal
// and Unmarshal structs.
type ProtobufEncoder struct {
Expand All @@ -30,6 +31,7 @@ var (
ErrInvalidProtoMsgDecode = errors.New("nats: Invalid protobuf proto.Message object passed to decode")
)

// Encode
func (pb *ProtobufEncoder) Encode(subject string, v interface{}) ([]byte, error) {
i, found := v.(proto.Message)
if !found {
Expand All @@ -43,6 +45,7 @@ func (pb *ProtobufEncoder) Encode(subject string, v interface{}) ([]byte, error)
return b, nil
}

// Decode
func (pb *ProtobufEncoder) Decode(subject string, data []byte, vPtr interface{}) error {
i, found := vPtr.(proto.Message)
if !found {
Expand Down
4 changes: 2 additions & 2 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func ExampleConn_QueueSubscribe() {
received := 0

nc.QueueSubscribe("foo", "worker_group", func(_ *nats.Msg) {
received += 1
received++
})
}

Expand All @@ -145,7 +145,7 @@ func ExampleSubscription_AutoUnsubscribe() {
received, wanted, total := 0, 10, 100

sub, _ := nc.Subscribe("foo", func(_ *nats.Msg) {
received += 1
received++
})
sub.AutoUnsubscribe(wanted)

Expand Down
3 changes: 2 additions & 1 deletion examples/nats-bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/nats-io/nats"
)

// Some sane defaults
const (
DefaultNumMsgs = 100000
DefaultNumPubs = 1
Expand Down Expand Up @@ -115,7 +116,7 @@ func runSubscriber(startwg, donewg *sync.WaitGroup, opts nats.Options, numMsgs i

received := 0
nc.Subscribe(subj, func(msg *nats.Msg) {
received += 1
received++
if received%HashModulo == 0 {
fmt.Fprintf(os.Stderr, "*")
}
Expand Down
2 changes: 1 addition & 1 deletion examples/nats-qsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func main() {
subj, queue, i := args[0], args[1], 0

nc.QueueSubscribe(subj, queue, func(msg *nats.Msg) {
i += 1
i++
printMsg(msg, i)
})

Expand Down
2 changes: 1 addition & 1 deletion examples/nats-rply.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func main() {
subj, reply, i := args[0], args[1], 0

nc.Subscribe(subj, func(msg *nats.Msg) {
i += 1
i++
printMsg(msg, i)
nc.Publish(msg.Reply, []byte(reply))
})
Expand Down
40 changes: 21 additions & 19 deletions nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
mrand "math/rand"
)

// Defaults
const (
Version = "1.1.6"
DefaultURL = "nats://localhost:4222"
Expand All @@ -39,9 +40,10 @@ const (
LangString = "go"
)

// For detection and proper handling of a Stale Connection
// STALE_CONNECTION is for detection and proper handling of stale connections.
const STALE_CONNECTION = "Stale Connection"

// Errors
var (
ErrConnectionClosed = errors.New("nats: Connection Closed")
ErrSecureConnRequired = errors.New("nats: Secure Connection required")
Expand Down Expand Up @@ -71,6 +73,7 @@ var DefaultOptions = Options{
SubChanLen: DefaultMaxChanLen,
}

// Status represents the state of the connection.
type Status int

const (
Expand All @@ -81,15 +84,15 @@ const (
CONNECTING
)

// ConnHandlers are used for asynchronous events such as
// ConnHandler is used for asynchronous events such as
// disconnected and closed connections.
type ConnHandler func(*Conn)

// ErrHandlers are used to process asynchronous errors encountered
// ErrHandler is used to process asynchronous errors encountered
// while processing inbound messages.
type ErrHandler func(*Conn, *Subscription, error)

// Options can be used to create a customized Connection.
// Options can be used to create a customized connection.
type Options struct {
Url string
Servers []string
Expand Down Expand Up @@ -356,8 +359,8 @@ func (nc *Conn) selectNextServer() (*srv, error) {
sp := nc.srvPool
num := len(sp)
copy(sp[i:num-1], sp[i+1:num])
max_reconnect := nc.Opts.MaxReconnect
if max_reconnect < 0 || s.reconnects < max_reconnect {
maxReconnect := nc.Opts.MaxReconnect
if maxReconnect < 0 || s.reconnects < maxReconnect {
nc.srvPool[num-1] = s
} else {
nc.srvPool = sp[0 : num-1]
Expand Down Expand Up @@ -866,7 +869,7 @@ func (nc *Conn) doReconnect() {
}

// Mark that we tried a reconnect
cur.reconnects += 1
cur.reconnects++

// Try to create a new connection
err = nc.createConn()
Expand All @@ -879,7 +882,7 @@ func (nc *Conn) doReconnect() {
}

// We are reconnected
nc.Reconnects += 1
nc.Reconnects++

// Clear out server stats for the server we connected to..
cur.didConnect = true
Expand Down Expand Up @@ -947,7 +950,6 @@ func (nc *Conn) processOpErr(err error) {
if nc.Opts.AllowReconnect && nc.status == CONNECTED {
// Set our new status
nc.status = RECONNECTING

if nc.ptmr != nil {
nc.ptmr.Stop()
}
Expand All @@ -957,15 +959,14 @@ func (nc *Conn) processOpErr(err error) {
nc.conn = nil
}
go nc.doReconnect()

nc.mu.Unlock()
return
} else {
nc.processDisconnect()
nc.err = err
nc.mu.Unlock()
nc.Close()
}

nc.processDisconnect()
nc.err = err
nc.mu.Unlock()
nc.Close()
}

// readLoop() will sit on the socket reading and processing the
Expand Down Expand Up @@ -1074,7 +1075,7 @@ func (nc *Conn) processMsg(data []byte) {
nc.mu.Lock()

// Stats
nc.InMsgs += 1
nc.InMsgs++
nc.InBytes += uint64(len(data))

sub := nc.subs[nc.ps.ma.sid]
Expand Down Expand Up @@ -1108,7 +1109,7 @@ func (nc *Conn) processMsg(data []byte) {
}

// Sub internal stats
sub.msgs += 1
sub.msgs++
sub.bytes += uint64(len(data))

if sub.mch != nil {
Expand Down Expand Up @@ -1317,7 +1318,7 @@ func (nc *Conn) publish(subj, reply string, data []byte) error {
return err
}

nc.OutMsgs += 1
nc.OutMsgs++
nc.OutBytes += uint64(len(data))

if len(nc.fch) == 0 {
Expand Down Expand Up @@ -1365,6 +1366,7 @@ func (nc *Conn) Request(subj string, data []byte, timeout time.Duration) (m *Msg
return
}

// InboxPrefix is the prefix for all inbox subjects.
const InboxPrefix = "_INBOX."

// NewInbox will return an inbox string which can be used for directed replies from
Expand Down Expand Up @@ -1631,7 +1633,7 @@ func (nc *Conn) processPingTimer() {
}

// Check for violation
nc.pout += 1
nc.pout++
if nc.pout > nc.Opts.MaxPingsOut {
nc.mu.Unlock()
nc.processOpErr(ErrStaleConnection)
Expand Down
6 changes: 3 additions & 3 deletions netchan.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
// to subjects and optionally queue groups.
// Data will be encoded and decoded via the EncodedConn and its associated encoders.

// Bind a channel for send operations to nats.
// BindSendChan binds a channel for send operations to NATS.
func (c *EncodedConn) BindSendChan(subject string, channel interface{}) error {
chVal := reflect.ValueOf(channel)
if chVal.Kind() != reflect.Chan {
Expand Down Expand Up @@ -40,12 +40,12 @@ func chPublish(c *EncodedConn, chVal reflect.Value, subject string) {
}
}

// Bind a channel for receive operations from nats.
// BindRecvChan binds a channel for receive operations from NATS.
func (c *EncodedConn) BindRecvChan(subject string, channel interface{}) (*Subscription, error) {
return c.bindRecvChan(subject, _EMPTY_, channel)
}

// Bind a channel for queue-based receive operations from nats.
// BindRecvQueueChan binds a channel for queue-based receive operations from NATS.
func (c *EncodedConn) BindRecvQueueChan(subject, queue string, channel interface{}) (*Subscription, error) {
return c.bindRecvChan(subject, queue, channel)
}
Expand Down
2 changes: 1 addition & 1 deletion test/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func TestHotSpotReconnect(t *testing.T) {
// Walk the clients and calculate how many of each..
cs := make(map[string]int)
for _, nc := range clients {
cs[nc.ConnectedUrl()] += 1
cs[nc.ConnectedUrl()]++
nc.Close()
}
if len(cs) != numServers {
Expand Down
Loading

0 comments on commit 9e99c31

Please sign in to comment.