Skip to content

Commit

Permalink
Randomize distribution to subscribers
Browse files Browse the repository at this point in the history
Better performance for queue subscriber distribution.
  • Loading branch information
derekcollison committed Feb 5, 2016
1 parent 809f88a commit 853b4d7
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 34 deletions.
2 changes: 1 addition & 1 deletion TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@
- [ ] Buffer pools/sync pools?
- [ ] IOVec pools and writev for high fanout?
- [ ] Add ability to reload config on signal
- [ ] NewSource on Rand to lower lock contention on QueueSubs, or redesign!
- [ ] Add ENV and variable support to dconf
- [ ] Modify cluster support for single message across routes between pub/sub and d-queue
- [ ] Memory limits/warnings?
- [ ] Limit number of subscriptions a client can have, total memory usage etc.
- [ ] Info updates contain other implicit route servers
- [ ] Multi-tenant accounts with isolation of subject space
- [ ] Pedantic state
- [X] NewSource on Rand to lower lock contention on QueueSubs, or redesign!
- [X] Default sort by cid on connz
- [X] Track last activity time per connection?
- [X] Add total connections to varz so we won't miss spikes, etc.
Expand Down
43 changes: 14 additions & 29 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ import (
"github.com/nats-io/gnatsd/sublist"
)

func init() {
rand.Seed(time.Now().UnixNano())
}

const (
// Scratch buffer size for the processMsg() calls.
msgScratchSize = 512
Expand Down Expand Up @@ -750,8 +754,7 @@ func (c *client) processMsg(msg []byte) {
msgh = append(msgh, ' ')
si := len(msgh)

var qmap map[string][]*subscription
var qsubs []*subscription
seenQ := map[string]struct{}{}

isRoute := c.typ == ROUTER
var rmap map[string]struct{}
Expand All @@ -770,37 +773,27 @@ func (c *client) processMsg(msg []byte) {
}

// Loop over all subscriptions that match.
for _, v := range r {
sub := v.(*subscription)
// We randomize/shuffle the list to optimize a bit on queue subscribers.
indexes := rand.Perm(len(r))
for _, i := range indexes {
sub := r[i].(*subscription)

// Process queue group subscriptions by gathering them all up
// here. We will pick the winners when we are done processing
// all of the subscriptions.
if sub.queue != nil {
// Queue subscriptions handled from routes directly above.
if isRoute {
continue
}
// FIXME(dlc), this can be more efficient
if qmap == nil {
qmap = make(map[string][]*subscription)
}
qname := string(sub.queue)
qsubs = qmap[qname]
if qsubs == nil {
qsubs = make([]*subscription, 0, 4)
if _, ok := seenQ[qname]; ok {
continue
}
qsubs = append(qsubs, sub)
qmap[qname] = qsubs
continue
seenQ[qname] = struct{}{}
}

// Process normal, non-queue group subscriptions.

// If this is a send to a ROUTER, make sure we only send it
// once. The other side will handle the appropriate re-processing.
// Also enforce 1-Hop.
if sub.client.typ == ROUTER {
if sub.client.typ == ROUTER && sub.queue == nil {
// Skip if sourced from a ROUTER and going to another ROUTER.
// This is 1-Hop semantics for ROUTERs.
if isRoute {
Expand All @@ -826,18 +819,10 @@ func (c *client) processMsg(msg []byte) {
sub.client.mu.Unlock()
}

// Process subscription.
mh := c.msgHeader(msgh[:si], sub)
c.deliverMsg(sub, mh, msg)
}

if qmap != nil {
for _, qsubs := range qmap {
index := rand.Int() % len(qsubs)
sub := qsubs[index]
mh := c.msgHeader(msgh[:si], sub)
c.deliverMsg(sub, mh, msg)
}
}
}

func (c *client) processPingTimer() {
Expand Down
5 changes: 3 additions & 2 deletions test/proto_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ func TestProtoBasics(t *testing.T) {
// 2 Messages
send("SUB * 2\r\nPUB foo 2\r\nok\r\n")
matches = expectMsgs(2)
checkMsg(t, matches[0], "foo", "1", "", "2", "ok")
checkMsg(t, matches[1], "foo", "2", "", "2", "ok")
// Could arrive in any order
checkMsg(t, matches[0], "foo", "", "", "2", "ok")
checkMsg(t, matches[1], "foo", "", "", "2", "ok")
}

func TestProtoErr(t *testing.T) {
Expand Down
13 changes: 11 additions & 2 deletions test/routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,11 +304,20 @@ func TestRouteQueueSemantics(t *testing.T) {
matches = expectMsgs(2)

// Expect first to be the normal subscriber, next will be the queue one.
checkMsg(t, matches[0], "foo", "RSID:2:4", "", "2", "ok")
if string(matches[0][SID_INDEX]) != "RSID:2:4" &&
string(matches[1][SID_INDEX]) != "RSID:2:4" {
t.Fatalf("Did not received routed sid\n")
}
checkMsg(t, matches[0], "foo", "", "", "2", "ok")
checkMsg(t, matches[1], "foo", "", "", "2", "ok")

// Check the rsid to verify it is one of the queue group subscribers.
rsid := string(matches[1][SID_INDEX])
var rsid string
if matches[0][SID_INDEX][0] == 'Q' {
rsid = string(matches[0][SID_INDEX])
} else {
rsid = string(matches[1][SID_INDEX])
}
if rsid != qrsid1 && rsid != qrsid2 {
t.Fatalf("Expected a queue group rsid, got %s\n", rsid)
}
Expand Down

0 comments on commit 853b4d7

Please sign in to comment.