Skip to content

Commit

Permalink
Fixes after rebase
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
  • Loading branch information
piotrpio committed May 23, 2023
1 parent ccd3a0d commit d85ef5d
Show file tree
Hide file tree
Showing 16 changed files with 188 additions and 150 deletions.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
2 changes: 1 addition & 1 deletion jetstream/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func main() {
fmt.Printf("Received a JetStream message: %s\n", string(msg.Data()))
}
if msgs.Error() {
fmt.Println("Error duting Fetch(): ", msgs.Error())
fmt.Println("Error during Fetch(): ", msgs.Error())
}

// Receive messages continuously in a callback
Expand Down
47 changes: 28 additions & 19 deletions jetstream/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,16 +97,13 @@ var (
// ErrStreamNameRequired is returned when the provided stream name is empty.
ErrStreamNameRequired JetStreamError = &jsError{message: "stream name is required"}

// ErrConsumerNameRequired is returned when the provided consumer durable name is empty,
ErrConsumerNameRequired JetStreamError = &jsError{message: "consumer name is required"}

// ErrMsgAlreadyAckd is returned when attempting to acknowledge message more than once.
ErrMsgAlreadyAckd JetStreamError = &jsError{message: "message was already acknowledged"}

// ErrNoStreamResponse is returned when there is no response from stream (e.g. no responders error).
ErrNoStreamResponse JetStreamError = &jsError{message: "no response from stream"}

// ErrNotJSMessage is returned when attempting to get metadata from non JetStream message .
// ErrNotJSMessage is returned when attempting to get metadata from non JetStream message.
ErrNotJSMessage JetStreamError = &jsError{message: "not a jetstream message"}

// ErrInvalidStreamName is returned when the provided stream name is invalid (contains '.').
Expand All @@ -121,47 +118,59 @@ var (
// ErrMaxBytesExceeded is returned when a message would exceed MaxBytes set on a pull request.
ErrMaxBytesExceeded = &jsError{message: "message size exceeds max bytes"}

// ErrConsumerDeleted is returned when attempting to send pull request to a consumer which does not exist
// ErrConsumerDeleted is returned when attempting to send pull request to a consumer which does not exist.
ErrConsumerDeleted JetStreamError = &jsError{message: "consumer deleted"}

// ErrConsumerLeadershipChanged is returned when pending requests are no longer valid after leadership has changed
// ErrConsumerLeadershipChanged is returned when pending requests are no longer valid after leadership has changed.
ErrConsumerLeadershipChanged JetStreamError = &jsError{message: "leadership change"}

// ErrHandlerRequired is returned when no handler func is provided in Stream()
// ErrHandlerRequired is returned when no handler func is provided in Stream().
ErrHandlerRequired = &jsError{message: "handler cannot be empty"}

// ErrEndOfData is returned when iterating over paged API from JetStream reaches end of data
// ErrEndOfData is returned when iterating over paged API from JetStream reaches end of data.
ErrEndOfData = errors.New("nats: end of data reached")

// ErrNoHeartbeat is received when no message is received in IdleHeartbeat time (if set)
// ErrNoHeartbeat is received when no message is received in IdleHeartbeat time (if set).
ErrNoHeartbeat = &jsError{message: "no heartbeat received"}

// ErrConsumerHasActiveSubscription is returned when a consumer is already subscribed to a stream
// ErrConsumerHasActiveSubscription is returned when a consumer is already subscribed to a stream.
ErrConsumerHasActiveSubscription = &jsError{message: "consumer has active subscription"}

// ErrMsgNotBound is returned when given message is not bound to any subscription
// ErrMsgNotBound is returned when given message is not bound to any subscription.
ErrMsgNotBound = &jsError{message: "message is not bound to subscription/connection"}

// ErrMsgNoReply is returned when attempting to reply to a message without a reply subject
// ErrMsgNoReply is returned when attempting to reply to a message without a reply subject.
ErrMsgNoReply = &jsError{message: "message does not have a reply"}

// ErrMsgDeleteUnsuccessful is returned when an attempt to delete a message is unsuccessful
// ErrMsgDeleteUnsuccessful is returned when an attempt to delete a message is unsuccessful.
ErrMsgDeleteUnsuccessful = &jsError{message: "message deletion unsuccessful"}

// ErrAsyncPublishReplySubjectSet is returned when reply subject is set on async message publish
// ErrAsyncPublishReplySubjectSet is returned when reply subject is set on async message publish.
ErrAsyncPublishReplySubjectSet = &jsError{message: "reply subject should be empty"}

// ErrTooManyStalledMsgs is returned when too many outstanding async messages are waiting for ack
// ErrTooManyStalledMsgs is returned when too many outstanding async messages are waiting for ack.
ErrTooManyStalledMsgs = &jsError{message: "stalled with too many outstanding async published messages"}

// ErrInvalidOption is returned when there is a collision between options
// ErrInvalidOption is returned when there is a collision between options.
ErrInvalidOption = &jsError{message: "invalid jetstream option"}

// ErrMsgIteratorClosed is returned when attempting to get message from a closed iterator
// ErrMsgIteratorClosed is returned when attempting to get message from a closed iterator.
ErrMsgIteratorClosed = &jsError{message: "messages iterator closed"}

ErrOrderedConsumerReset = &jsError{message: "recreating ordered consumer"}
ErrOrderedSequenceMismatch = &jsError{message: "sequence mismatch"}
// ErrOrderedConsumerReset is returned when resetting ordered consumer fails due to too many attempts.
ErrOrderedConsumerReset = &jsError{message: "recreating ordered consumer"}

// ErrOrderConsumerUsedAsFetch is returned when ordered consumer was already used to process
// messages using Fetch (or FetchBytes).
ErrOrderConsumerUsedAsFetch = &jsError{message: "ordered consumer initialized as fetch"}

// ErrOrderConsumerUsedAsFetch is returned when ordered consumer was already used to process
// messages using Consume or Messages.
ErrOrderConsumerUsedAsConsume = &jsError{message: "ordered consumer initialized as consume"}

// ErrOrderedConsumerConcurrentRequests is returned when attempting to run concurrent operations
// on ordered consumers.
ErrOrderedConsumerConcurrentRequests = &jsError{message: "cannot run concurrent processing using ordered consumer"}
)

// Error prints the JetStream API error code and description
Expand Down
33 changes: 17 additions & 16 deletions jetstream/ordered.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ const (
consumerTypeFetch
)

var errOrderedSequenceMismatch = errors.New("sequence mismatch")

// Consume can be used to continuously receive messages and handle them with the provided callback function
func (c *orderedConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt) (ConsumeContext, error) {
if c.consumerType == consumerTypeNotSet || c.consumerType == consumerTypeConsume && c.currentConsumer == nil {
Expand All @@ -68,9 +70,11 @@ func (c *orderedConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt
if err != nil {
return nil, err
}
} else if c.consumerType == consumerTypeConsume && c.currentConsumer != nil {
return nil, ErrOrderedConsumerConcurrentRequests
}
if c.consumerType == consumerTypeFetch {
return nil, fmt.Errorf("ordered consumer initialized as fetch")
return nil, ErrOrderConsumerUsedAsFetch
}
consumeOpts, err := parseConsumeOpts(opts...)
if err != nil {
Expand All @@ -91,7 +95,7 @@ func (c *orderedConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt
}
dseq := meta.Sequence.Consumer
if dseq != c.cursor.deliverSeq+1 {
c.errHandler(serial)(c.currentConsumer.subscriptions[""], ErrOrderedSequenceMismatch)
c.errHandler(serial)(c.currentConsumer.subscriptions[""], errOrderedSequenceMismatch)
return
}
c.cursor.deliverSeq = dseq
Expand Down Expand Up @@ -131,11 +135,11 @@ func (c *orderedConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt

func (c *orderedConsumer) errHandler(serial int) func(cc ConsumeContext, err error) {
return func(cc ConsumeContext, err error) {
if c.userErrHandler != nil {
if c.userErrHandler != nil && !errors.Is(err, errOrderedSequenceMismatch) {
c.userErrHandler(cc, err)
}
if errors.Is(err, ErrNoHeartbeat) ||
errors.Is(err, ErrOrderedSequenceMismatch) ||
errors.Is(err, errOrderedSequenceMismatch) ||
errors.Is(err, ErrConsumerDeleted) {
// only reset if serial matches the currect consumer serial and there is no reset in progress
if serial == c.serial && atomic.LoadUint32(&c.resetInProgress) == 0 {
Expand All @@ -159,7 +163,7 @@ func (c *orderedConsumer) Messages(opts ...PullMessagesOpt) (MessagesContext, er
return nil, ErrOrderedConsumerConcurrentRequests
}
if c.consumerType == consumerTypeFetch {
return nil, fmt.Errorf("ordered consumer initialized as fetch")
return nil, ErrOrderConsumerUsedAsFetch
}
consumeOpts, err := parseMessagesOpts(opts...)
if err != nil {
Expand Down Expand Up @@ -204,7 +208,7 @@ func (s *orderedSubscription) Next() (Msg, error) {
serial := serialNumberFromConsumer(meta.Consumer)
dseq := meta.Sequence.Consumer
if dseq != s.consumer.cursor.deliverSeq+1 {
s.consumer.errHandler(serial)(currentConsumer.subscriptions[""], ErrOrderedSequenceMismatch)
s.consumer.errHandler(serial)(currentConsumer.subscriptions[""], errOrderedSequenceMismatch)
continue
}
s.consumer.cursor.deliverSeq = dseq
Expand All @@ -228,11 +232,11 @@ func (s *orderedSubscription) Stop() {
// or context reaches its deadline.
func (c *orderedConsumer) Fetch(batch int, opts ...FetchOpt) (MessageBatch, error) {
if c.consumerType == consumerTypeConsume {
return nil, fmt.Errorf("ordered consumer initialized as consume")
return nil, ErrOrderConsumerUsedAsConsume
}
if c.runningFetch != nil {
if !c.runningFetch.done {
return nil, fmt.Errorf("cannot run concurrent ordered Fetch requests")
return nil, ErrOrderedConsumerConcurrentRequests
}
c.cursor.streamSeq = c.runningFetch.sseq
}
Expand All @@ -254,11 +258,11 @@ func (c *orderedConsumer) Fetch(batch int, opts ...FetchOpt) (MessageBatch, erro
// exceeded or request times out.
func (c *orderedConsumer) FetchBytes(maxBytes int, opts ...FetchOpt) (MessageBatch, error) {
if c.consumerType == consumerTypeConsume {
return nil, fmt.Errorf("ordered consumer initialized as consume")
return nil, ErrOrderConsumerUsedAsConsume
}
if c.runningFetch != nil {
if !c.runningFetch.done {
return nil, fmt.Errorf("cannot run concurrent ordered Fetch requests")
return nil, ErrOrderedConsumerConcurrentRequests
}
c.cursor.streamSeq = c.runningFetch.sseq
}
Expand All @@ -279,10 +283,10 @@ func (c *orderedConsumer) FetchBytes(maxBytes int, opts ...FetchOpt) (MessageBat
// This method will always send a single request and immediately return up to a provided number of messages
func (c *orderedConsumer) FetchNoWait(batch int) (MessageBatch, error) {
if c.consumerType == consumerTypeConsume {
return nil, fmt.Errorf("ordered consumer initialized as consume")
return nil, ErrOrderConsumerUsedAsConsume
}
if c.runningFetch != nil && !c.runningFetch.done {
return nil, fmt.Errorf("cannot run concurrent ordered Fetch requests")
return nil, ErrOrderedConsumerConcurrentRequests
}
c.consumerType = consumerTypeFetch
err := c.reset()
Expand Down Expand Up @@ -326,19 +330,16 @@ func (c *orderedConsumer) reset() error {
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
err = c.jetStream.DeleteConsumer(ctx, c.stream, c.currentConsumer.CachedInfo().Name)
cancel()
if err != nil {
if errors.Is(err, ErrConsumerNotFound) {
cancel()
break
}
if errors.Is(err, nats.ErrTimeout) || errors.Is(err, context.DeadlineExceeded) {
cancel()
continue
}
cancel()
return err
}
cancel()
break
}
}
Expand Down
Loading

0 comments on commit d85ef5d

Please sign in to comment.