Skip to content

Commit

Permalink
Make consumer create idempotent
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrpio committed May 23, 2023
1 parent cae5dbe commit 8d9ad1b
Show file tree
Hide file tree
Showing 24 changed files with 1,990 additions and 605 deletions.
8 changes: 8 additions & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
issues:
max-issues-per-linter: 0
max-same-issues: 0
exclude-rules:
- linters:
- errcheck
text: "Unsubscribe"
path: jsv2/jetstream/consumer.go
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go:
go_import_path: github.com/nats-io/nats.go
install:
- go get -t ./...
- curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin
- if [[ "$TRAVIS_GO_VERSION" =~ 1.20 ]]; then
go install github.com/mattn/goveralls@latest;
go install github.com/wadey/gocovmerge@latest;
Expand All @@ -18,6 +19,7 @@ before_script:
find . -type f -name "*.go" | xargs misspell -error -locale US;
GOFLAGS="-mod=mod -modfile=go_test.mod" staticcheck ./...;
fi
- golangci-lint run ./jsv2/...
script:
- go test -modfile=go_test.mod -v -run=TestNoRace -p=1 ./... --failfast -vet=off
- if [[ "$TRAVIS_GO_VERSION" =~ 1.20 ]]; then ./scripts/cov.sh TRAVIS; else go test -modfile=go_test.mod -race -v -p=1 ./... --failfast -vet=off; fi
Expand Down
1 change: 1 addition & 0 deletions go_test.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/nats-io/nats-server/v2 v2.9.6
github.com/nats-io/nkeys v0.4.4
github.com/nats-io/nuid v1.0.1
golang.org/x/text v0.3.6
google.golang.org/protobuf v1.23.0
)

Expand Down
2 changes: 2 additions & 0 deletions go_test.sum
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/time v0.0.0-20220922220347-f3bd1da661af h1:Yx9k8YCG3dvF87UAn2tu2HQLf2dt/eR1bXxpLMWeH+Y=
golang.org/x/time v0.0.0-20220922220347-f3bd1da661af/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
Expand Down
86 changes: 0 additions & 86 deletions headers/headers.go

This file was deleted.

2 changes: 1 addition & 1 deletion jsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -1110,7 +1110,7 @@ func (js *js) getMsg(name string, mreq *apiMsgGetRequest, opts ...JSOpt) (*RawSt

var hdr Header
if len(msg.Header) > 0 {
hdr, err = decodeHeadersMsg(msg.Header)
hdr, err = DecodeHeadersMsg(msg.Header)
if err != nil {
return nil, err
}
Expand Down
109 changes: 106 additions & 3 deletions jsv2/jetstream/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type (
Consumer interface {
// Next is used to retrieve a single message from the stream
Next(context.Context, ...ConsumerNextOpt) (JetStreamMsg, error)
// Stream can be used to continously receive messages and handle them with the provided callback function
// Stream can be used to continuously receive messages and handle them with the provided callback function
Stream(context.Context, MessageHandler, ...ConsumerStreamOpt) error

// Info returns Consumer details
Expand Down Expand Up @@ -83,8 +83,8 @@ type (
// WithStreamHeartbeat() - sets an idle heartbeat setting for a pull request
func (p *pullConsumer) Next(ctx context.Context, opts ...ConsumerNextOpt) (JetStreamMsg, error) {
p.Lock()
defer p.Unlock()
if atomic.LoadUint32(&p.isStreaming) == 1 {
p.Unlock()
return nil, ErrConsumerHasActiveSubscription
}
timeout := 30 * time.Second
Expand All @@ -101,6 +101,7 @@ func (p *pullConsumer) Next(ctx context.Context, opts ...ConsumerNextOpt) (JetSt
}
for _, opt := range opts {
if err := opt(req); err != nil {
p.Unlock()
return nil, err
}
}
Expand All @@ -110,6 +111,7 @@ func (p *pullConsumer) Next(ctx context.Context, opts ...ConsumerNextOpt) (JetSt
msgChan := make(chan *jetStreamMsg, 1)
p.heartbeat = make(chan struct{})
errs := make(chan error)
p.Unlock()

go func() {
err := p.fetch(ctx, *req, msgChan)
Expand All @@ -127,11 +129,16 @@ func (p *pullConsumer) Next(ctx context.Context, opts ...ConsumerNextOpt) (JetSt
case msg := <-msgChan:
return msg, nil
case err := <-errs:
if errors.Is(err, ErrNoMessages) {
return nil, nil
}
return nil, err
case <-p.heartbeat:
case <-time.After(2 * req.Heartbeat):
p.Lock()
p.subscription.Unsubscribe()
p.subscription = nil
p.Unlock()
return nil, ErrNoHeartbeat
}
continue
Expand All @@ -145,7 +152,7 @@ func (p *pullConsumer) Next(ctx context.Context, opts ...ConsumerNextOpt) (JetSt
}
}

// Stream continously receives messages from a consumer and handles them with the provided callback function
// Stream continuously receives messages from a consumer and handles them with the provided callback function
// ctx is used to handle the whole operation, not individual messages batch, so to avoid cancellation, an empty context should be provided
//
// Available options:
Expand Down Expand Up @@ -203,13 +210,17 @@ func (p *pullConsumer) Stream(ctx context.Context, handler MessageHandler, opts
case <-time.After(2 * req.Heartbeat):
handler(nil, ErrNoHeartbeat)
cancel()
p.Lock()
p.subscription.Unsubscribe()
p.subscription = nil
p.Unlock()
atomic.StoreUint32(&p.isStreaming, 0)
return
case <-ctx.Done():
p.Lock()
p.subscription.Unsubscribe()
p.subscription = nil
p.Unlock()
atomic.StoreUint32(&p.isStreaming, 0)
return
}
Expand All @@ -221,8 +232,10 @@ func (p *pullConsumer) Stream(ctx context.Context, handler MessageHandler, opts
case err := <-errs:
handler(nil, err)
case <-ctx.Done():
p.Lock()
p.subscription.Unsubscribe()
p.subscription = nil
p.Unlock()
atomic.StoreUint32(&p.isStreaming, 0)
return
}
Expand All @@ -238,6 +251,8 @@ func (c *pullConsumer) fetch(ctx context.Context, req pullRequest, target chan<-
if req.Batch < 1 {
return fmt.Errorf("%w: batch size must be at least 1", nats.ErrInvalidArg)
}
c.Lock()
defer c.Unlock()
// if there is no subscription for this consumer, create new inbox subject and subscribe
if c.subscription == nil {
inbox := nats.NewInbox()
Expand Down Expand Up @@ -403,3 +418,91 @@ func validateDurableName(dur string) error {
}
return nil
}

func compareConsumerConfig(s, u *ConsumerConfig) error {
makeErr := func(fieldName string, usrVal, srvVal interface{}) error {
return fmt.Errorf("configuration requests %s to be %v, but consumer's value is %v", fieldName, usrVal, srvVal)
}

if u.Durable != s.Durable {
return makeErr("durable", u.Durable, s.Durable)
}
if u.Description != s.Description {
return makeErr("description", u.Description, s.Description)
}
if u.DeliverPolicy != s.DeliverPolicy {
return makeErr("deliver policy", u.DeliverPolicy, s.DeliverPolicy)
}
if u.OptStartSeq != s.OptStartSeq {
return makeErr("optional start sequence", u.OptStartSeq, s.OptStartSeq)
}
if u.OptStartTime != nil && !u.OptStartTime.IsZero() && !(*u.OptStartTime).Equal(*s.OptStartTime) {
return makeErr("optional start time", u.OptStartTime, s.OptStartTime)
}
if u.AckPolicy != s.AckPolicy {
return makeErr("ack policy", u.AckPolicy, s.AckPolicy)
}
if u.AckWait != 0 && u.AckWait != s.AckWait {
return makeErr("ack wait", u.AckWait.String(), s.AckWait.String())
}
if !(u.MaxDeliver == 0 && s.MaxDeliver == -1) && u.MaxDeliver != s.MaxDeliver {
return makeErr("max deliver", u.MaxDeliver, s.MaxDeliver)
}
if len(u.BackOff) != len(s.BackOff) {
return makeErr("backoff", u.BackOff, s.BackOff)
}
for i, val := range u.BackOff {
if val != s.BackOff[i] {
return makeErr("backoff", u.BackOff, s.BackOff)
}
}
if u.FilterSubject != s.FilterSubject {
return makeErr("filter subject", u.FilterSubject, s.FilterSubject)
}
if u.ReplayPolicy != s.ReplayPolicy {
return makeErr("replay policy", u.ReplayPolicy, s.ReplayPolicy)
}
if u.RateLimit != s.RateLimit {
return makeErr("rate limit", u.RateLimit, s.RateLimit)
}
if u.SampleFrequency != s.SampleFrequency {
return makeErr("sample frequency", u.SampleFrequency, s.SampleFrequency)
}
if u.MaxWaiting != 0 && u.MaxWaiting != s.MaxWaiting {
return makeErr("max waiting", u.MaxWaiting, s.MaxWaiting)
}
if u.MaxAckPending != 0 && u.MaxAckPending != s.MaxAckPending {
return makeErr("max ack pending", u.MaxAckPending, s.MaxAckPending)
}
if u.FlowControl != s.FlowControl {
return makeErr("flow control", u.FlowControl, s.FlowControl)
}
if u.Heartbeat != s.Heartbeat {
return makeErr("heartbeat", u.Heartbeat, s.Heartbeat)
}
if u.HeadersOnly != s.HeadersOnly {
return makeErr("headers only", u.HeadersOnly, s.HeadersOnly)
}
if u.MaxRequestBatch != s.MaxRequestBatch {
return makeErr("max request batch", u.MaxRequestBatch, s.MaxRequestBatch)
}
if u.MaxRequestExpires != s.MaxRequestExpires {
return makeErr("max request expires", u.MaxRequestExpires.String(), s.MaxRequestExpires.String())
}
if u.DeliverSubject != s.DeliverSubject {
return makeErr("deliver subject", u.DeliverSubject, s.DeliverSubject)
}
if u.DeliverGroup != s.DeliverGroup {
return makeErr("deliver group", u.DeliverSubject, s.DeliverSubject)
}
if u.InactiveThreshold != s.InactiveThreshold {
return makeErr("inactive threshhold", u.InactiveThreshold.String(), s.InactiveThreshold.String())
}
if u.Replicas != s.Replicas {
return makeErr("replicas", u.Replicas, s.Replicas)
}
if u.MemoryStorage != s.MemoryStorage {
return makeErr("memory storage", u.MemoryStorage, s.MemoryStorage)
}
return nil
}
13 changes: 13 additions & 0 deletions jsv2/jetstream/consumer_config.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
// Copyright 2020-2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package jetstream

import (
Expand Down
16 changes: 11 additions & 5 deletions jsv2/jetstream/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ func TestPullConsumerNext(t *testing.T) {
msgs := make([]JetStreamMsg, 0)
var msg JetStreamMsg
errs := make(chan error)
done := make(chan struct{})
go func() {
for {
nextCtx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
Expand All @@ -399,18 +400,23 @@ func TestPullConsumerNext(t *testing.T) {
errs <- err
break
}
if msg == nil {
close(done)
break
}
msgs = append(msgs, msg)
}
}()

time.Sleep(20 * time.Millisecond)
publishTestMsgs(t, nc)
err = <-errs
if !errors.Is(err, ErrNoMessages) {
select {
case err := <-errs:
t.Fatalf("Unexpected error: %v", err)
}
if len(msgs) != len(testMsgs) {
t.Fatalf("Unexpected received message count; want %d; got %d", len(testMsgs), len(msgs))
case <-done:
if len(msgs) != len(testMsgs) {
t.Fatalf("Unexpected received message count; want %d; got %d", len(testMsgs), len(msgs))
}
}
for i, msg := range msgs {
if string(msg.Data()) != testMsgs[i] {
Expand Down
Loading

0 comments on commit 8d9ad1b

Please sign in to comment.