Skip to content

Commit

Permalink
JetStream: lot of changes
Browse files Browse the repository at this point in the history
They will be described in the release notes, but gist:

Added:
- `DeliverSubject()` option to configure the deliver subject of a JetStream consumer created by the `js.Subscribe()` call (and variants)
- `BindDeliverSubject()` option to subscribe directly to a JetStream consumer deliver subject (bypassing any lookup or JetStream consumer creation)
- Fields `DeliverGroup` in `ConsumerConfig`, `PushBound` in `ConsumerInfo`. They help making prevent incorrect subscriptions to JetStream consumers
- Field `Last` in `SequencePair`

Changed:
- With a `PullSubscription`, calling `NextMsg()` or `NextMsgWithContext()` will now return `ErrTypeSubscription`. You must use the `Fetch()` API
- If the library created internally a JetStream consumer, the consumer will be deleted on `Unsubscribe()` or when the `Drain()` completes
- Fail multiple instances of a subscription on the same durable push consumer (only one active at a time). Also, consumers now have the concept of `DeliverGroup`, which is the queue group name they are created for. Only queue member from the same group can attach to this consumer, and a non queue subscription cannot attach to it. Note that this requires server v2.3.5
- Attempting to create a queue subscription with a consumer configuration that has idle heartbeats and/or flow control will now result in an error

Fixed:
- Possible lock inversion
- JetStream consumers could be incorrectly deleted on subscription's `Unsubscribe()`

Resolves nats-io#785
Resolves nats-io#776
Resolves nats-io#775
Resolves nats-io#748
Resolves nats-io#747

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Aug 15, 2021
1 parent 9c00d13 commit 45b7e77
Show file tree
Hide file tree
Showing 11 changed files with 1,829 additions and 1,301 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@ before_script:
- find . -type f -name "*.go" | xargs misspell -error -locale US
- staticcheck ./...
script:
- go test -modfile=go_test.mod -v -run=TestNoRace -p=1 ./... --failfast
- if [[ "$TRAVIS_GO_VERSION" =~ 1.16 ]]; then ./scripts/cov.sh TRAVIS; else go test -modfile=go_test.mod -race -v -p=1 ./... --failfast; fi
- go test -modfile=go_test.mod -v -run=TestNoRace -p=1 ./... --failfast -vet=off
- if [[ "$TRAVIS_GO_VERSION" =~ 1.16 ]]; then ./scripts/cov.sh TRAVIS; else go test -modfile=go_test.mod -race -v -p=1 ./... --failfast -vet=off; fi
7 changes: 1 addition & 6 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,7 @@ func (s *Subscription) nextMsgWithContext(ctx context.Context, pullSubInternal,
}

s.mu.Lock()
err := s.validateNextMsgState()
// Unless this is from an internal call, reject use of this API.
// Users should use Fetch() instead.
if err == nil && !pullSubInternal && s.jsi != nil && s.jsi.pull {
err = ErrTypeSubscription
}
err := s.validateNextMsgState(pullSubInternal)
if err != nil {
s.mu.Unlock()
return nil, err
Expand Down
8 changes: 6 additions & 2 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,8 @@ func ExampleJetStream() {
}, nats.ManualAck())

// Async queue subscription where members load balance the
// received messages together.
// received messages together. Since no consumer name is specified,
// the queue name will be used as a durable name.
js.QueueSubscribe("foo", "group", func(msg *nats.Msg) {
msg.Ack()
}, nats.ManualAck())
Expand All @@ -333,7 +334,10 @@ func ExampleJetStream() {
msg, _ := sub.NextMsg(2 * time.Second)
msg.Ack()

// QueueSubscribe with group or load balancing.
// We can add a member to the group, with this member using
// the synchronous version of the QueueSubscribe.
// Since no consumer name is specified, the queue name will be
// used as a durable name.
sub, _ = js.QueueSubscribeSync("foo", "group")
msg, _ = sub.NextMsg(2 * time.Second)
msg.Ack()
Expand Down
2 changes: 1 addition & 1 deletion go_test.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.16

require (
github.com/golang/protobuf v1.4.2
github.com/nats-io/nats-server/v2 v2.3.4
github.com/nats-io/nats-server/v2 v2.3.5-0.20210815013007-eb8aeb217178
github.com/nats-io/nkeys v0.3.0
github.com/nats-io/nuid v1.0.1
google.golang.org/protobuf v1.23.0
Expand Down
6 changes: 3 additions & 3 deletions go_test.sum
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU=
github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q=
github.com/nats-io/jwt/v2 v2.0.3 h1:i/O6cmIsjpcQyWDYNcq2JyZ3/VTF8SJ4JWluI5OhpvI=
github.com/nats-io/jwt/v2 v2.0.3/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY=
github.com/nats-io/nats-server/v2 v2.3.4 h1:WcNa6HDFX8gjZPHb8CJ9wxRHEjJSlhWUb/MKb6/mlUY=
github.com/nats-io/nats-server/v2 v2.3.4/go.mod h1:3mtbaN5GkCo/Z5T3nNj0I0/W1fPkKzLiDC6jjWJKp98=
github.com/nats-io/nats.go v1.11.1-0.20210623165838-4b75fc59ae30/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats-server/v2 v2.3.5-0.20210815013007-eb8aeb217178 h1:6/bt9zMGA1D/i3ROeq8GjF8Tig5BVFh4V3gI+qpoWIs=
github.com/nats-io/nats-server/v2 v2.3.5-0.20210815013007-eb8aeb217178/go.mod h1:7mTh0KSxKc63xAVop97cFCIGRkWCv6HoX9lMXRSNOhU=
github.com/nats-io/nats.go v1.11.1-0.20210813172934-9c00d13a8a19/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
Expand Down
Loading

0 comments on commit 45b7e77

Please sign in to comment.