Skip to content

Commit

Permalink
[ADDED] Support for multiple subject filters on consumers (nats-io#1214)
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
  • Loading branch information
piotrpio committed Sep 20, 2023
1 parent f1e011b commit 8ca661b
Show file tree
Hide file tree
Showing 8 changed files with 229 additions and 11 deletions.
5 changes: 5 additions & 0 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,11 @@ func ExampleSubOpt() {
js.Subscribe("foo", func(msg *nats.Msg) {
fmt.Printf("Received a message: %s\n", string(msg.Data))
}, nats.Durable("FOO"), nats.SkipConsumerLookup())

// Use multiple subject filters.
js.Subscribe("", func(msg *nats.Msg) {
fmt.Printf("Received a message: %s\n", string(msg.Data))
}, nats.Durable("FOO"), nats.ConsumerFilterSubjects("foo", "bar"), nats.BindStream("test_stream"))
}

func ExampleMaxWait() {
Expand Down
7 changes: 6 additions & 1 deletion jetstream/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func upsertConsumer(ctx context.Context, js *jetStream, stream string, cfg Consu
}

var ccSubj string
if cfg.FilterSubject != "" {
if cfg.FilterSubject != "" && len(cfg.FilterSubjects) == 0 {
ccSubj = apiSubj(js.apiPrefix, fmt.Sprintf(apiConsumerCreateWithFilterSubjectT, stream, consumerName, cfg.FilterSubject))
} else {
ccSubj = apiSubj(js.apiPrefix, fmt.Sprintf(apiConsumerCreateT, stream, consumerName))
Expand All @@ -128,6 +128,11 @@ func upsertConsumer(ctx context.Context, js *jetStream, stream string, cfg Consu
return nil, resp.Error
}

// check whether multiple filter subjects (if used) are reflected in the returned ConsumerInfo
if len(cfg.FilterSubjects) != 0 && len(resp.Config.FilterSubjects) == 0 {
return nil, ErrConsumerMultipleFilterSubjectsNotSupported
}

return &pullConsumer{
jetStream: js,
stream: stream,
Expand Down
25 changes: 21 additions & 4 deletions jetstream/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,13 @@ const (
JSErrCodeStreamNotFound ErrorCode = 10059
JSErrCodeStreamNameInUse ErrorCode = 10058

JSErrCodeConsumerCreate ErrorCode = 10012
JSErrCodeConsumerNotFound ErrorCode = 10014
JSErrCodeConsumerNameExists ErrorCode = 10013
JSErrCodeConsumerAlreadyExists ErrorCode = 10105
JSErrCodeConsumerCreate ErrorCode = 10012
JSErrCodeConsumerNotFound ErrorCode = 10014
JSErrCodeConsumerNameExists ErrorCode = 10013
JSErrCodeConsumerAlreadyExists ErrorCode = 10105
JSErrCodeDuplicateFilterSubjects ErrorCode = 10136
JSErrCodeOverlappingFilterSubjects ErrorCode = 10138
JSErrCodeConsumerEmptyFilter ErrorCode = 10139

JSErrCodeMessageNotFound ErrorCode = 10037

Expand Down Expand Up @@ -88,8 +91,22 @@ var (
// ErrConsumerCreate is returned when nats-server reports error when creating consumer (e.g. illegal update).
ErrConsumerCreate JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerCreate, Description: "could not create consumer", Code: 500}}

// ErrDuplicateFilterSubjects is returned when both FilterSubject and FilterSubjects are specified when creating consumer.
ErrDuplicateFilterSubjects JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeDuplicateFilterSubjects, Description: "consumer cannot have both FilterSubject and FilterSubjects specified", Code: 500}}

// ErrDuplicateFilterSubjects is returned when filter subjects overlap when creating consumer.
ErrOverlappingFilterSubjects JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeOverlappingFilterSubjects, Description: "consumer subject filters cannot overlap", Code: 500}}

// ErrEmptyFilter is returned when a filter in FilterSubjects is empty.
ErrEmptyFilter JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerEmptyFilter, Description: "consumer filter in FilterSubjects cannot be empty", Code: 500}}

// Client errors

// ErrConsumerMultipleFilterSubjectsNotSupported is returned when the connected nats-server version does not support setting
// multiple filter subjects with filter_subjects field. If this error is returned when executing AddConsumer(), the consumer with invalid
// configuration was already created in the server.
ErrConsumerMultipleFilterSubjectsNotSupported JetStreamError = &jsError{message: "multiple consumer filter subjects not supported by nats-server"}

// ErrConsumerNotFound is an error returned when consumer with given name does not exist.
ErrConsumerNameAlreadyInUse JetStreamError = &jsError{message: "consumer name already in use"}

Expand Down
18 changes: 18 additions & 0 deletions jetstream/test/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,21 @@ func TestCreateOrUpdateConsumer(t *testing.T) {
consumerConfig: jetstream.ConsumerConfig{FilterSubjects: []string{"FOO.A", "FOO.B"}},
shouldCreate: true,
},
{
name: "with multiple filter subjects, overlapping subjects",
consumerConfig: jetstream.ConsumerConfig{FilterSubjects: []string{"FOO.*", "FOO.B"}},
withError: jetstream.ErrOverlappingFilterSubjects,
},
{
name: "with multiple filter subjects and filter subject provided",
consumerConfig: jetstream.ConsumerConfig{FilterSubjects: []string{"FOO.A", "FOO.B"}, FilterSubject: "FOO.C"},
withError: jetstream.ErrDuplicateFilterSubjects,
},
{
name: "with empty subject in FilterSubjects",
consumerConfig: jetstream.ConsumerConfig{FilterSubjects: []string{"FOO.A", ""}},
withError: jetstream.ErrEmptyFilter,
},
{
name: "consumer already exists, update",
consumerConfig: jetstream.ConsumerConfig{Durable: "dur", Description: "test consumer"},
Expand Down Expand Up @@ -119,6 +134,9 @@ func TestCreateOrUpdateConsumer(t *testing.T) {
if ci.CachedInfo().Config.AckPolicy != test.consumerConfig.AckPolicy {
t.Fatalf("Invalid ack policy; want: %s; got: %s", test.consumerConfig.AckPolicy, ci.CachedInfo().Config.AckPolicy)
}
if !reflect.DeepEqual(test.consumerConfig.FilterSubjects, ci.CachedInfo().Config.FilterSubjects) {
t.Fatalf("Invalid filter subjects; want: %v; got: %v", test.consumerConfig.FilterSubjects, ci.CachedInfo().Config.FilterSubjects)
}
})
}
}
Expand Down
11 changes: 11 additions & 0 deletions js.go
Original file line number Diff line number Diff line change
Expand Up @@ -1118,6 +1118,7 @@ type ConsumerConfig struct {
MaxDeliver int `json:"max_deliver,omitempty"`
BackOff []time.Duration `json:"backoff,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
FilterSubjects []string `json:"filter_subjects,omitempty"`
ReplayPolicy ReplayPolicy `json:"replay_policy"`
RateLimit uint64 `json:"rate_limit_bps,omitempty"` // Bits per sec
SampleFrequency string `json:"sample_freq,omitempty"`
Expand Down Expand Up @@ -2570,6 +2571,16 @@ func ConsumerName(name string) SubOpt {
})
}

// ConsumerFilterSubjects can be used to set multiple subject filters on the consumer.
// It has to be used in conjunction with [nats.BindStream] and
// with empty 'subject' parameter.
func ConsumerFilterSubjects(subjects ...string) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.FilterSubjects = subjects
return nil
})
}

func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) {
sub.mu.Lock()
// TODO(dlc) - Better way to mark especially if we attach.
Expand Down
23 changes: 20 additions & 3 deletions jserrors.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,15 @@ var (
// ErrBadRequest is returned when invalid request is sent to JetStream API.
ErrBadRequest JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeBadRequest, Description: "bad request", Code: 400}}

// ErrDuplicateFilterSubjects is returned when both FilterSubject and FilterSubjects are specified when creating consumer.
ErrDuplicateFilterSubjects JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeDuplicateFilterSubjects, Description: "consumer cannot have both FilterSubject and FilterSubjects specified", Code: 500}}

// ErrDuplicateFilterSubjects is returned when filter subjects overlap when creating consumer.
ErrOverlappingFilterSubjects JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeOverlappingFilterSubjects, Description: "consumer subject filters cannot overlap", Code: 500}}

// ErrEmptyFilter is returned when a filter in FilterSubjects is empty.
ErrEmptyFilter JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerEmptyFilter, Description: "consumer filter in FilterSubjects cannot be empty", Code: 500}}

// Client errors

// ErrConsumerNameAlreadyInUse is an error returned when consumer with given name already exists.
Expand All @@ -62,6 +71,11 @@ var (
// ErrConsumerNameRequired is returned when the provided consumer durable name is empty.
ErrConsumerNameRequired JetStreamError = &jsError{message: "consumer name is required"}

// ErrConsumerMultipleFilterSubjectsNotSupported is returned when the connected nats-server version does not support setting
// multiple filter subjects with filter_subjects field. If this error is returned when executing AddConsumer(), the consumer with invalid
// configuration was already created in the server.
ErrConsumerMultipleFilterSubjectsNotSupported JetStreamError = &jsError{message: "multiple consumer filter subjects not supported by nats-server"}

// ErrConsumerConfigRequired is returned when empty consumer consuguration is supplied to add/update consumer.
ErrConsumerConfigRequired JetStreamError = &jsError{message: "consumer configuration is required"}

Expand Down Expand Up @@ -123,9 +137,12 @@ const (
JSErrCodeStreamNotFound ErrorCode = 10059
JSErrCodeStreamNameInUse ErrorCode = 10058

JSErrCodeConsumerNotFound ErrorCode = 10014
JSErrCodeConsumerNameExists ErrorCode = 10013
JSErrCodeConsumerAlreadyExists ErrorCode = 10105
JSErrCodeConsumerNotFound ErrorCode = 10014
JSErrCodeConsumerNameExists ErrorCode = 10013
JSErrCodeConsumerAlreadyExists ErrorCode = 10105
JSErrCodeDuplicateFilterSubjects ErrorCode = 10136
JSErrCodeOverlappingFilterSubjects ErrorCode = 10138
JSErrCodeConsumerEmptyFilter ErrorCode = 10139

JSErrCodeMessageNotFound ErrorCode = 10037

Expand Down
5 changes: 5 additions & 0 deletions jsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,11 @@ func (js *js) upsertConsumer(stream, consumerName string, cfg *ConsumerConfig, o
}
return nil, info.Error
}

// check whether multiple filter subjects (if used) are reflected in the returned ConsumerInfo
if len(cfg.FilterSubjects) != 0 && len(info.Config.FilterSubjects) == 0 {
return nil, ErrConsumerMultipleFilterSubjectsNotSupported
}
return info.ConsumerInfo, nil
}

Expand Down
146 changes: 143 additions & 3 deletions test/js_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2070,7 +2070,7 @@ func TestJetStreamManagement(t *testing.T) {
// Create the stream using our client API.
var si *nats.StreamInfo
t.Run("create stream", func(t *testing.T) {
si, err := js.AddStream(&nats.StreamConfig{Name: "foo", Subjects: []string{"foo", "bar"}})
si, err := js.AddStream(&nats.StreamConfig{Name: "foo", Subjects: []string{"foo", "bar", "baz"}})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand Down Expand Up @@ -2294,6 +2294,82 @@ func TestJetStreamManagement(t *testing.T) {
}
})

t.Run("durable consumer with multiple filter subjects", func(t *testing.T) {
sub, err := nc.SubscribeSync("$JS.API.CONSUMER.CREATE.foo.dlc-5")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer sub.Unsubscribe()
ci, err := js.AddConsumer("foo", &nats.ConsumerConfig{
Durable: "dlc-5",
AckPolicy: nats.AckExplicitPolicy,
FilterSubjects: []string{"foo", "bar"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
msg, err := sub.NextMsg(1 * time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !strings.Contains(string(msg.Data), `"durable_name":"dlc-5"`) {
t.Fatalf("create consumer message is not correct: %q", string(msg.Data))
}
if ci == nil || ci.Config.Durable != "dlc-5" || !reflect.DeepEqual(ci.Config.FilterSubjects, []string{"foo", "bar"}) {
t.Fatalf("ConsumerInfo is not correct %+v", ci)
}
})

t.Run("ephemeral consumer with multiple filter subjects", func(t *testing.T) {
sub, err := nc.SubscribeSync("$JS.API.CONSUMER.CREATE.foo")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer sub.Unsubscribe()
ci, err := js.AddConsumer("foo", &nats.ConsumerConfig{
AckPolicy: nats.AckExplicitPolicy,
FilterSubjects: []string{"foo", "bar"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
_, err = sub.NextMsg(1 * time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if ci == nil || !reflect.DeepEqual(ci.Config.FilterSubjects, []string{"foo", "bar"}) {
t.Fatalf("ConsumerInfo is not correct %+v", ci)
}
})

t.Run("multiple filter subjects errors", func(t *testing.T) {
// both filter subject and filter subjects provided
_, err := js.AddConsumer("foo", &nats.ConsumerConfig{
AckPolicy: nats.AckExplicitPolicy,
FilterSubjects: []string{"foo", "bar"},
FilterSubject: "baz",
})
if !errors.Is(err, nats.ErrDuplicateFilterSubjects) {
t.Fatalf("Expected: %v; got: %v", nats.ErrDuplicateFilterSubjects, err)
}
// overlapping filter subjects
_, err = js.AddConsumer("foo", &nats.ConsumerConfig{
AckPolicy: nats.AckExplicitPolicy,
FilterSubjects: []string{"foo.*", "foo.A"},
})
if !errors.Is(err, nats.ErrOverlappingFilterSubjects) {
t.Fatalf("Expected: %v; got: %v", nats.ErrOverlappingFilterSubjects, err)
}
// empty filter subject in filter subjects
_, err = js.AddConsumer("foo", &nats.ConsumerConfig{
AckPolicy: nats.AckExplicitPolicy,
FilterSubjects: []string{"foo", ""},
})
if !errors.Is(err, nats.ErrEmptyFilter) {
t.Fatalf("Expected: %v; got: %v", nats.ErrEmptyFilter, err)
}
})

t.Run("with invalid consumer name", func(t *testing.T) {
if _, err = js.AddConsumer("foo", &nats.ConsumerConfig{Durable: "test.durable"}); err != nats.ErrInvalidConsumerName {
t.Fatalf("Expected: %v; got: %v", nats.ErrInvalidConsumerName, err)
Expand Down Expand Up @@ -2400,7 +2476,7 @@ func TestJetStreamManagement(t *testing.T) {
for info := range js.Consumers("foo") {
infos = append(infos, info)
}
if len(infos) != 6 || infos[0].Stream != "foo" {
if len(infos) != 8 || infos[0].Stream != "foo" {
t.Fatalf("ConsumerInfo is not correct %+v", infos)
}
})
Expand All @@ -2412,7 +2488,7 @@ func TestJetStreamManagement(t *testing.T) {
for name := range js.ConsumerNames("foo", nats.Context(ctx)) {
names = append(names, name)
}
if got, want := len(names), 6; got != want {
if got, want := len(names), 8; got != want {
t.Fatalf("Unexpected names, got=%d, want=%d", got, want)
}
})
Expand Down Expand Up @@ -5571,6 +5647,70 @@ func TestJetStreamSubscribe_RateLimit(t *testing.T) {
}
}

func TestJetStreamSubscribe_FilterSubjects(t *testing.T) {
tests := []struct {
name string
durable string
}{
{
name: "ephemeral consumer",
},
{
name: "durable consumer",
durable: "cons",
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

var err error

_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo", "bar", "baz"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
for i := 0; i < 5; i++ {
js.Publish("foo", []byte("msg"))
}
for i := 0; i < 5; i++ {
js.Publish("bar", []byte("msg"))
}
for i := 0; i < 5; i++ {
js.Publish("baz", []byte("msg"))
}

opts := []nats.SubOpt{nats.BindStream("TEST"), nats.ConsumerFilterSubjects("foo", "baz")}
if test.durable != "" {
opts = append(opts, nats.Durable(test.durable))
}
sub, err := js.SubscribeSync("", opts...)
if err != nil {
t.Fatalf("Unexpected error: %s", err)
}

for i := 0; i < 10; i++ {
msg, err := sub.NextMsg(500 * time.Millisecond)
if err != nil {
t.Fatalf("Unexpected error: %s", err)
}
if msg.Subject != "foo" && msg.Subject != "baz" {
t.Fatalf("Unexpected message subject: %s", msg.Subject)
}
}
})
}

}

func TestJetStreamSubscribe_ConfigCantChange(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)
Expand Down

0 comments on commit 8ca661b

Please sign in to comment.