Skip to content

Commit

Permalink
[ADDED] SkipConsumerLookup option in js.Subscribe() (nats-io#1265)
Browse files Browse the repository at this point in the history
Signed-off-by: Waldemar Quevedo <wally@nats.io>
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
Co-authored-by: Waldemar Quevedo <wally@nats.io>
  • Loading branch information
piotrpio and wallyqs authored May 16, 2023
1 parent 2765665 commit 8faa842
Show file tree
Hide file tree
Showing 3 changed files with 191 additions and 3 deletions.
5 changes: 5 additions & 0 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,11 @@ func ExampleSubOpt() {
js.Subscribe("foo", func(msg *nats.Msg) {
fmt.Printf("Received a message: %s\n", string(msg.Data))
}, nats.Durable("FOO"), nats.ConsumerMemoryStorage())

// Skip consumer lookup when using explicit consumer name
js.Subscribe("foo", func(msg *nats.Msg) {
fmt.Printf("Received a message: %s\n", string(msg.Data))
}, nats.Durable("FOO"), nats.SkipConsumerLookup())
}

func ExampleMaxWait() {
Expand Down
35 changes: 32 additions & 3 deletions js.go
Original file line number Diff line number Diff line change
Expand Up @@ -1468,6 +1468,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
isDurable = o.cfg.Durable != _EMPTY_
consumerBound = o.bound
ctx = o.ctx
skipCInfo = o.skipCInfo
notFoundErr bool
lookupErr bool
nc = js.nc
Expand Down Expand Up @@ -1541,8 +1542,8 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,

// With an explicit durable name, we can lookup the consumer first
// to which it should be attaching to.
// If bind to ordered consumer is true, skip the lookup.
if consumer != _EMPTY_ {
// If SkipConsumerLookup was used, do not call consumer info.
if consumer != _EMPTY_ && !o.skipCInfo {
info, err = js.ConsumerInfo(stream, consumer)
notFoundErr = errors.Is(err, ErrConsumerNotFound)
lookupErr = err == ErrJetStreamNotEnabled || err == ErrTimeout || err == context.DeadlineExceeded
Expand All @@ -1563,6 +1564,19 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
if !(isPullMode && lookupErr && consumerBound) {
return nil, err
}
case skipCInfo:
// When skipping consumer info, need to rely on the manually passed sub options
// to match the expected behavior from the subscription.
hasFC, hbi = o.cfg.FlowControl, o.cfg.Heartbeat
hasHeartbeats = hbi > 0
maxap = o.cfg.MaxAckPending
deliver = o.cfg.DeliverSubject
if consumerBound {
break
}

// When not bound to a consumer already, proceed to create.
fallthrough
default:
// Attempt to create consumer if not found nor using Bind.
shouldCreate = true
Expand All @@ -1572,7 +1586,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
deliver = nc.NewInbox()
cfg.DeliverSubject = deliver
}

// Do filtering always, server will clear as needed.
cfg.FilterSubject = subj

Expand Down Expand Up @@ -2153,6 +2166,22 @@ type subOpts struct {
// For an ordered consumer.
ordered bool
ctx context.Context

// To disable calling ConsumerInfo
skipCInfo bool
}

// SkipConsumerLookup will omit lookipng up consumer when [Bind], [Durable]
// or [ConsumerName] are provided.
//
// NOTE: This setting may cause an existing consumer to be overwritten. Also,
// because consumer lookup is skipped, all consumer options like AckPolicy,
// DeliverSubject etc. need to be provided even if consumer already exists.
func SkipConsumerLookup() SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.skipCInfo = true
return nil
})
}

// OrderedConsumer will create a FIFO direct/ephemeral consumer for in order delivery of messages.
Expand Down
154 changes: 154 additions & 0 deletions test/js_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,160 @@ func TestJetStreamSubscribe(t *testing.T) {
}
}

func TestJetStreamSubscribe_SkipConsumerLookup(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Name: "cons",
DeliverSubject: "_INBOX.foo",
AckPolicy: nats.AckExplicitPolicy,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// for checking whether subscribe looks up the consumer
infoSub, err := nc.SubscribeSync("$JS.API.CONSUMER.INFO.TEST.*")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer infoSub.Unsubscribe()

// for checking whether subscribe creates the consumer
createConsSub, err := nc.SubscribeSync("$JS.API.CONSUMER.CREATE.>")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer createConsSub.Unsubscribe()
t.Run("use Bind to skip consumer lookup and create", func(t *testing.T) {
sub, err := js.SubscribeSync("", nats.Bind("TEST", "cons"), nats.SkipConsumerLookup(), nats.DeliverSubject("_INBOX.foo"))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer sub.Unsubscribe()
// we should get timeout waiting for msg on CONSUMER.INFO
if msg, err := infoSub.NextMsg(50 * time.Millisecond); err == nil {
t.Fatalf("Expected to skip consumer lookup; got message on %q", msg.Subject)
}

// we should get timeout waiting for msg on CONSUMER.CREATE
if msg, err := createConsSub.NextMsg(50 * time.Millisecond); err == nil {
t.Fatalf("Expected to skip consumer create; got message on %q", msg.Subject)
}
if _, err := js.Publish("foo", []byte("msg")); err != nil {
t.Fatalf("Unexpected error: %s", err)
}
if _, err := sub.NextMsg(100 * time.Millisecond); err != nil {
t.Fatalf("Expected to receive msg; got: %s", err)
}
})
t.Run("use Durable, skip consumer lookup but overwrite the consumer", func(t *testing.T) {
sub, err := js.SubscribeSync("foo", nats.Durable("cons"), nats.SkipConsumerLookup(), nats.DeliverSubject("_INBOX.foo"))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// we should get timeout waiting for msg on CONSUMER.INFO
if msg, err := infoSub.NextMsg(50 * time.Millisecond); err == nil {
t.Fatalf("Expected to skip consumer lookup; got message on %q", msg.Subject)
}

// we should get msg on CONSUMER.CREATE
if _, err := createConsSub.NextMsg(50 * time.Millisecond); err != nil {
t.Fatalf("Expected consumer create; got: %s", err)
}
if _, err := js.Publish("foo", []byte("msg")); err != nil {
t.Fatalf("Unexpected error: %s", err)
}
if _, err := sub.NextMsg(100 * time.Millisecond); err != nil {
t.Fatalf("Expected to receive msg; got: %s", err)
}
})
t.Run("create new consumer with Durable, skip lookup", func(t *testing.T) {
sub, err := js.SubscribeSync("foo", nats.Durable("pp"), nats.SkipConsumerLookup(), nats.DeliverSubject("_INBOX.foo1"))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer sub.Unsubscribe()
// we should get timeout waiting for msg on CONSUMER.INFO
if msg, err := infoSub.NextMsg(50 * time.Millisecond); err == nil {
t.Fatalf("Expected to skip consumer lookup; got message on %q", msg.Subject)
}

// we should get msg on CONSUMER.CREATE
if _, err := createConsSub.NextMsg(50 * time.Millisecond); err != nil {
t.Fatalf("Expected consumer create; got: %s", err)
}
if _, err := js.Publish("foo", []byte("msg")); err != nil {
t.Fatalf("Unexpected error: %s", err)
}
if _, err := sub.NextMsg(100 * time.Millisecond); err != nil {
t.Fatalf("Expected to receive msg; got: %s", err)
}
})
t.Run("create new consumer with ConsumerName, skip lookup", func(t *testing.T) {
sub, err := js.SubscribeSync("foo", nats.ConsumerName("pp"), nats.SkipConsumerLookup(), nats.DeliverSubject("_INBOX.foo1"))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer sub.Unsubscribe()
// we should get timeout waiting for msg on CONSUMER.INFO
if msg, err := infoSub.NextMsg(50 * time.Millisecond); err == nil {
t.Fatalf("Expected to skip consumer lookup; got message on %q", msg.Subject)
}

// we should get msg on CONSUMER.CREATE
if _, err := createConsSub.NextMsg(50 * time.Millisecond); err != nil {
t.Fatalf("Expected consumer create; got: %s", err)
}
if _, err := js.Publish("foo", []byte("msg")); err != nil {
t.Fatalf("Unexpected error: %s", err)
}
if _, err := sub.NextMsg(100 * time.Millisecond); err != nil {
t.Fatalf("Expected to receive msg; got: %s", err)
}
})

t.Run("create ephemeral consumer, SkipConsumerLookup has no effect", func(t *testing.T) {
sub, err := js.SubscribeSync("foo", nats.SkipConsumerLookup(), nats.DeliverSubject("_INBOX.foo2"))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer sub.Unsubscribe()
// we should get timeout waiting for msg on CONSUMER.INFO
if msg, err := infoSub.NextMsg(50 * time.Millisecond); err == nil {
t.Fatalf("Expected to skip consumer lookup; got message on %q", msg.Subject)
}

// we should get msg on CONSUMER.CREATE
if _, err := createConsSub.NextMsg(50 * time.Millisecond); err != nil {
t.Fatalf("Expected consumer create; got: %s", err)
}
if _, err := js.Publish("foo", []byte("msg")); err != nil {
t.Fatalf("Unexpected error: %s", err)
}
if _, err := sub.NextMsg(100 * time.Millisecond); err != nil {
t.Fatalf("Expected to receive msg; got: %s", err)
}
})
t.Run("attempt to update ack policy of existing consumer", func(t *testing.T) {
_, err := js.SubscribeSync("foo", nats.Durable("cons"), nats.SkipConsumerLookup(), nats.DeliverSubject("_INBOX.foo"), nats.AckAll())
if err == nil || !strings.Contains(err.Error(), "ack policy can not be updated") {
t.Fatalf("Expected update consumer error, got: %v", err)
}
})
}

func TestPullSubscribeFetchBatch(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)
Expand Down

0 comments on commit 8faa842

Please sign in to comment.