Skip to content

Commit

Permalink
Simplify request, allow for mux subject to change
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Oct 8, 2019
1 parent e16cf3a commit d5ed91c
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 58 deletions.
24 changes: 1 addition & 23 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,29 +43,7 @@ func (nc *Conn) RequestWithContext(ctx context.Context, subj string, data []byte
return nc.oldRequestWithContext(ctx, subj, data)
}

// Do setup for the new style.
if nc.respMap == nil {
nc.initNewResp()
}
// Create literal Inbox and map to a chan msg.
mch := make(chan *Msg, RequestChanLen)
respInbox := nc.newRespInbox()
token := respToken(respInbox)
nc.respMap[token] = mch
createSub := nc.respMux == nil
ginbox := nc.respSub
nc.mu.Unlock()

if createSub {
// Make sure scoped subscription is setup only once.
var err error
nc.respSetup.Do(func() { err = nc.createRespMux(ginbox) })
if err != nil {
return nil, err
}
}

err := nc.PublishRequest(subj, respInbox, data)
mch, token, err := nc.createNewRequestAndSend(subj, data)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module github.com/nats-io/nats.go

require (
github.com/nats-io/jwt v0.2.12
github.com/nats-io/jwt v0.3.0
github.com/nats-io/nkeys v0.1.0
github.com/nats-io/nuid v1.0.1
)
11 changes: 4 additions & 7 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
github.com/nats-io/jwt v0.2.12 h1:Y3YLoJey+Q/yMk/1Ig3xhWxYXE7vNSefozkArIcnSlU=
github.com/nats-io/jwt v0.2.12/go.mod h1:mQxQ0uHQ9FhEVPIcTSKwx2lqZEpXWWcCgA7R6NrWvvY=
github.com/nats-io/nkeys v0.0.2/go.mod h1:dab7URMsZm6Z/jp9Z5UGa87Uutgc2mVpXLC4B7TDb/4=
github.com/nats-io/jwt v0.3.0 h1:xdnzwFETV++jNc4W1mw//qFyJGb2ABOombmZJQS4+Qo=
github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5VglpSg=
github.com/nats-io/nkeys v0.1.0 h1:qMd4+pRHgdr1nAClu+2h/2a5F2TmKcCzjCDazVgRoX4=
github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 h1:HuIa8hRrWRSrqYzx1qI49NNxhdi2PrY7gxVSq1JjLDc=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e h1:D5TXcfTk7xF7hvieo4QErS3qqCB4teTffacDWr7CI+0=
golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
78 changes: 52 additions & 26 deletions nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,10 +412,11 @@ type Conn struct {

// New style response handler
respSub string // The wildcard subject
respScanf string // The scanf template to extract mux token
respMux *Subscription // A single response subscription
respMap map[string]chan *Msg // Request map for the response msg channels
respSetup sync.Once // Ensures response subscription occurs once
respRand *rand.Rand // Used for generating suffix.
respRand *rand.Rand // Used for generating suffix
}

// A Subscription represents interest in a given subject.
Expand Down Expand Up @@ -2647,21 +2648,28 @@ func (nc *Conn) publish(subj, reply string, data []byte) error {
// the appropriate channel based on the last token and place
// the message on the channel if possible.
func (nc *Conn) respHandler(m *Msg) {
rt := respToken(m.Subject)

nc.mu.Lock()

// Just return if closed.
if nc.isClosed() {
nc.mu.Unlock()
return
}

// Grab mch
rt := nc.respToken(m.Subject)
mch := nc.respMap[rt]
// Delete the key regardless, one response only.
// FIXME(dlc) - should we track responses past 1
// just statistics wise?
delete(nc.respMap, rt)
// If something went wrong and we only have one entry, use that.
// This can happen if the system rewrites the subject, e.g. js.
if mch == nil && len(nc.respMap) == 1 {
for k, v := range nc.respMap {
mch = v
delete(nc.respMap, k)
break
}
}
nc.mu.Unlock()

// Don't block, let Request timeout instead, mch is
Expand All @@ -2685,33 +2693,22 @@ func (nc *Conn) createRespMux(respSub string) error {
return err
}
nc.mu.Lock()
nc.respScanf = strings.Replace(respSub, "*", "%s", -1)
nc.respMux = s
nc.mu.Unlock()
return nil
}

// Request will send a request payload and deliver the response message,
// or an error, including a timeout if no message was received properly.
func (nc *Conn) Request(subj string, data []byte, timeout time.Duration) (*Msg, error) {
if nc == nil {
return nil, ErrInvalidConnection
}

nc.mu.Lock()
// If user wants the old style.
if nc.Opts.UseOldRequestStyle {
nc.mu.Unlock()
return nc.oldRequest(subj, data, timeout)
}

// Do setup for the new style.
// Helper to setup and send new request style requests. Return the chan to receive the response.
func (nc *Conn) createNewRequestAndSend(subj string, data []byte) (chan *Msg, string, error) {
// Do setup for the new style if needed.
if nc.respMap == nil {
nc.initNewResp()
}
// Create literal Inbox and map to a chan msg.
// Create new literal Inbox and map to a chan msg.
mch := make(chan *Msg, RequestChanLen)
respInbox := nc.newRespInbox()
token := respToken(respInbox)
token := respInbox[respInboxPrefixLen:]
nc.respMap[token] = mch
createSub := nc.respMux == nil
ginbox := nc.respSub
Expand All @@ -2722,11 +2719,33 @@ func (nc *Conn) Request(subj string, data []byte, timeout time.Duration) (*Msg,
var err error
nc.respSetup.Do(func() { err = nc.createRespMux(ginbox) })
if err != nil {
return nil, err
return nil, token, err
}
}

if err := nc.PublishRequest(subj, respInbox, data); err != nil {
return nil, token, err
}

return mch, token, nil
}

// Request will send a request payload and deliver the response message,
// or an error, including a timeout if no message was received properly.
func (nc *Conn) Request(subj string, data []byte, timeout time.Duration) (*Msg, error) {
if nc == nil {
return nil, ErrInvalidConnection
}

nc.mu.Lock()
// If user wants the old style.
if nc.Opts.UseOldRequestStyle {
nc.mu.Unlock()
return nc.oldRequest(subj, data, timeout)
}

mch, token, err := nc.createNewRequestAndSend(subj, data)
if err != nil {
return nil, err
}

Expand Down Expand Up @@ -2829,9 +2848,16 @@ func (nc *Conn) NewRespInbox() string {
}

// respToken will return the last token of a literal response inbox
// which we use for the message channel lookup.
func respToken(respInbox string) string {
return respInbox[respInboxPrefixLen:]
// which we use for the message channel lookup. This needs to do a
// scan to protect itself against the server changing the subject.
// Lock should be held.
func (nc *Conn) respToken(respInbox string) string {
var token string
n, err := fmt.Sscanf(respInbox, nc.respScanf, &token)
if err != nil || n != 1 {
return ""
}
return token
}

// Subscribe will express interest in the given subject. The subject
Expand Down
34 changes: 34 additions & 0 deletions nats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2217,3 +2217,37 @@ func TestStatsRace(t *testing.T) {
close(ch)
wg.Wait()
}

func TestRequestLeaksMapEntries(t *testing.T) {
o := natsserver.DefaultTestOptions
o.Port = -1
s := RunServerWithOptions(&o)
defer s.Shutdown()

nc, err := Connect(fmt.Sprintf("nats://%s:%d", o.Host, o.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()

response := []byte("I will help you")
nc.Subscribe("foo", func(m *Msg) {
nc.Publish(m.Reply, response)
})

for i := 0; i < 100; i++ {
msg, err := nc.Request("foo", nil, 500*time.Millisecond)
if err != nil {
t.Fatalf("Received an error on Request test: %s", err)
}
if !bytes.Equal(msg.Data, response) {
t.Fatalf("Received invalid response")
}
}
nc.mu.Lock()
num := len(nc.respMap)
nc.mu.Unlock()
if num != 0 {
t.Fatalf("Expected 0 entries in response map, got %d", num)
}
}
2 changes: 1 addition & 1 deletion test/basic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ func TestRequestTimeout(t *testing.T) {
nc := NewDefaultConnection(t)
defer nc.Close()

if _, err := nc.Request("foo", []byte("help"), 10*time.Millisecond); err == nil {
if _, err := nc.Request("foo", []byte("help"), 10*time.Millisecond); err != nats.ErrTimeout {
t.Fatalf("Expected to receive a timeout error")
}
}
Expand Down

0 comments on commit d5ed91c

Please sign in to comment.