Skip to content

Commit

Permalink
Addresses comments change requests based on code review
Browse files Browse the repository at this point in the history
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Aug 16, 2021
1 parent f730c02 commit e077154
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 8 deletions.
15 changes: 11 additions & 4 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,8 +323,17 @@ func ExampleJetStream() {
}, nats.ManualAck())

// Async queue subscription where members load balance the
// received messages together. Since no consumer name is specified,
// the queue name will be used as a durable name.
// received messages together.
// If no consumer name is specified, either with nats.Bind()
// or nats.Durable() options, the queue name is used as the
// durable name (that is, as if you were passing the
// nats.Durable(<queue group name>) option.
// It is recommended to use nats.Bind() or nats.Durable()
// and preferably create the JetStream consumer beforehand
// (using js.AddConsumer) so that the JS consumer is not
// deleted on an Unsubscribe() or Drain() when the member
// that created the consumer goes away first.
// Check Godoc for the QueueSubscribe() API for more details.
js.QueueSubscribe("foo", "group", func(msg *nats.Msg) {
msg.Ack()
}, nats.ManualAck())
Expand All @@ -336,8 +345,6 @@ func ExampleJetStream() {

// We can add a member to the group, with this member using
// the synchronous version of the QueueSubscribe.
// Since no consumer name is specified, the queue name will be
// used as a durable name.
sub, _ = js.QueueSubscribeSync("foo", "group")
msg, _ = sub.NextMsg(2 * time.Second)
msg.Ack()
Expand Down
14 changes: 11 additions & 3 deletions js.go
Original file line number Diff line number Diff line change
Expand Up @@ -879,6 +879,12 @@ func (opt subOptFn) configureSubscribe(opts *subOpts) error {

// Subscribe will create a subscription to the appropriate stream and consumer.
//
// The stream and consumer names can be provided with the nats.Bind() option.
// For creating an ephemeral (where the consumer name is picked by the server),
// you can provide the stream name with nats.BindStream().
// If no stream name is specified, the library will attempt to figure out which
// stream the subscription is for. See important notes below for more details.
//
// IMPORTANT NOTES:
// * If Bind() and Durable() options are not specified, the library will
// send a request to the server to create an ephemeral JetStream consumer,
Expand Down Expand Up @@ -1868,9 +1874,11 @@ func IdleHeartbeat(duration time.Duration) SubOpt {
}

// DeliverSubject specifies the JetStream consumer deliver subject.
// This applies only in cases where the no consumer exists and it will be
// created by the library by the subscribe API.
// If a consumer exists, then the NATS subscription will be created to
//
// This option is used only in situations where the consumer does not exist
// and a creation request is sent to the server. If not provided, an inbox
// will be selected.
// If a consumer exists, then the NATS subscription will be created on
// the JetStream consumer's DeliverSubject, not necessarily this subject.
func DeliverSubject(subject string) SubOpt {
return subOptFn(func(opts *subOpts) error {
Expand Down
3 changes: 2 additions & 1 deletion nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -3908,7 +3908,8 @@ func (s *Subscription) Drain() error {
// error will be returned.
// If you do not wish the JetStream consumer to be automatically deleted,
// ensure that the consumer is not created by the library, which means
// create the consumer with AddConsumer and bind to this consumer.
// create the consumer with AddConsumer and bind to this consumer (using
// the nats.Bind() option).
func (s *Subscription) Unsubscribe() error {
if s == nil {
return ErrBadSubscription
Expand Down

0 comments on commit e077154

Please sign in to comment.