Skip to content

Commit

Permalink
support custom inbox prefixes
Browse files Browse the repository at this point in the history
Signed-off-by: R.I.Pienaar <rip@devco.net>
  • Loading branch information
ripienaar committed Jun 18, 2021
1 parent e4b051a commit 559051c
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 16 deletions.
64 changes: 48 additions & 16 deletions nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,9 @@ type Options struct {
// For websocket connections, indicates to the server that the connection
// supports compression. If the server does too, then data will be compressed.
Compression bool

// InboxPrefix allows the default _INBOX prefix to be customized
InboxPrefix string
}

const (
Expand Down Expand Up @@ -494,11 +497,13 @@ type Conn struct {
ws bool // true if a websocket connection

// New style response handler
respSub string // The wildcard subject
respScanf string // The scanf template to extract mux token
respMux *Subscription // A single response subscription
respMap map[string]chan *Msg // Request map for the response msg channels
respRand *rand.Rand // Used for generating suffix
respSub string // The wildcard subject
respSubPrefix string // the wildcard prefix including trailing .
respSubLen int // the length of the wildcard prefix excluding trailing .
respScanf string // The scanf template to extract mux token
respMux *Subscription // A single response subscription
respMap map[string]chan *Msg // Request map for the response msg channels
respRand *rand.Rand // Used for generating suffix
}

type natsReader struct {
Expand Down Expand Up @@ -1101,6 +1106,17 @@ func Compression(enabled bool) Option {
}
}

// CustomInboxPrefix configures the request + reply inbox prefix
func CustomInboxPrefix(p string) Option {
return func(o *Options) error {
if p == "" || strings.HasSuffix(p, ">") || strings.HasSuffix(p, "*") || strings.HasSuffix(p, ".") || strings.HasPrefix(p, ">") || strings.HasPrefix(p, "*") {
return fmt.Errorf("nats: invald custom prefix")
}
o.InboxPrefix = p
return nil
}
}

// Handler processing

// SetDisconnectHandler will set the disconnect event handler.
Expand Down Expand Up @@ -3343,7 +3359,8 @@ func (nc *Conn) createNewRequestAndSend(subj string, hdr, data []byte) (chan *Ms
// Create new literal Inbox and map to a chan msg.
mch := make(chan *Msg, RequestChanLen)
respInbox := nc.newRespInbox()
token := respInbox[respInboxPrefixLen:]
token := respInbox[nc.respSubLen+1:]

nc.respMap[token] = mch
if nc.respMux == nil {
// Create the response subscription we will use for all new style responses.
Expand Down Expand Up @@ -3450,7 +3467,7 @@ func (nc *Conn) newRequest(subj string, hdr, data []byte, timeout time.Duration)
// with the Inbox reply and return the first reply received.
// This is optimized for the case of multiple responses.
func (nc *Conn) oldRequest(subj string, hdr, data []byte, timeout time.Duration) (*Msg, error) {
inbox := NewInbox()
inbox := nc.newInbox()
ch := make(chan *Msg, RequestChanLen)

s, err := nc.subscribe(inbox, _EMPTY_, nil, ch, true, nil)
Expand Down Expand Up @@ -3490,10 +3507,23 @@ func NewInbox() string {
return string(b[:])
}

func (nc *Conn) newInbox() string {
if nc.Opts.InboxPrefix == _EMPTY_ {
return NewInbox()
}

var sb strings.Builder
sb.WriteString(nc.Opts.InboxPrefix)
sb.WriteByte('.')
sb.WriteString(nuid.Next())
return sb.String()
}

// Function to init new response structures.
func (nc *Conn) initNewResp() {
// _INBOX wildcard
nc.respSub = fmt.Sprintf("%s.*", NewInbox())
nc.respSubPrefix = fmt.Sprintf("%s.", nc.newInbox())
nc.respSubLen = len(nc.respSubPrefix) - 1
nc.respSub = fmt.Sprintf("%s*", nc.respSubPrefix)
nc.respMap = make(map[string]chan *Msg)
nc.respRand = rand.New(rand.NewSource(time.Now().UnixNano()))
}
Expand All @@ -3505,15 +3535,17 @@ func (nc *Conn) newRespInbox() string {
if nc.respMap == nil {
nc.initNewResp()
}
var b [respInboxPrefixLen + replySuffixLen]byte
pres := b[:respInboxPrefixLen]
copy(pres, nc.respSub)

var sb strings.Builder
sb.WriteString(nc.respSubPrefix)

rn := nc.respRand.Int63()
for i, l := respInboxPrefixLen, rn; i < len(b); i++ {
b[i] = rdigits[l%base]
l /= base
for i := 0; i < nuidSize; i++ {
sb.WriteByte(rdigits[rn%base])
rn /= base
}
return string(b[:])

return sb.String()
}

// NewRespInbox is the new format used for _INBOX.
Expand Down
43 changes: 43 additions & 0 deletions nats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2659,3 +2659,46 @@ func TestMsg_RespondMsg(t *testing.T) {
t.Fatalf("did not get correct response: %q", resp.Data)
}
}

func TestCustomInboxPrefix(t *testing.T) {
opts := &Options{}
for _, p := range []string{"$BOB.", "$BOB.*", "$BOB.>", ">", ".", ""} {
err := CustomInboxPrefix(p)(opts)
if err == nil {
t.Fatalf("Expeted error for %q", p)
}
}

s := RunServerOnPort(-1)
defer s.Shutdown()

nc, err := Connect(s.ClientURL(), CustomInboxPrefix("$BOB"))
if err != nil {
t.Fatalf("Expected to connect to server, got %v", err)
}
defer nc.Close()

sub, err := nc.Subscribe(NewInbox(), func(msg *Msg) {
if !strings.HasPrefix(msg.Reply, "$BOB.") {
t.Fatalf("invalid inbox subject %q received", msg.Reply)
}

if len(strings.Split(msg.Reply, ".")) != 3 {
t.Fatalf("invalid number tokens in %s", msg.Reply)
}

msg.Respond([]byte("ok"))
})
if err != nil {
t.Fatalf("subscribe failed: %s", err)
}

resp, err := nc.Request(sub.Subject, nil, time.Second)
if err != nil {
t.Fatalf("request failed: %s", err)
}

if !bytes.Equal(resp.Data, []byte("ok")) {
t.Fatalf("did not receive ok: %q", resp.Data)
}
}

0 comments on commit 559051c

Please sign in to comment.