Skip to content

Commit

Permalink
Allow setting consumer replicas though options (nats-io#1019)
Browse files Browse the repository at this point in the history
[ADDED] Allow setting consumer replicas though options
  • Loading branch information
Deepak Sah authored Jul 29, 2022
1 parent 8b04057 commit fcc7c44
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 0 deletions.
5 changes: 5 additions & 0 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,11 @@ func ExampleSubOpt() {
js.Subscribe("foo", func(msg *nats.Msg) {
fmt.Printf("Received a message: %s\n", string(msg.Data))
}, nats.ManualAck(), nats.MaxDeliver(2), nats.BackOff([]time.Duration{50 * time.Millisecond, 250 * time.Millisecond}))

// Set consumer replicas count for a durable while subscribing.
js.Subscribe("foo", func(msg *nats.Msg) {
fmt.Printf("Received a message: %s\n", string(msg.Data))
}, nats.Durable("FOO"), nats.ConsumerReplicas(1))
}

func ExampleMaxWait() {
Expand Down
14 changes: 14 additions & 0 deletions js.go
Original file line number Diff line number Diff line change
Expand Up @@ -1332,6 +1332,9 @@ func checkConfig(s, u *ConsumerConfig) error {
if u.Heartbeat > 0 && u.Heartbeat != s.Heartbeat {
return makeErr("heartbeat", u.Heartbeat, s.Heartbeat)
}
if u.Replicas > 0 && u.Replicas != s.Replicas {
return makeErr("replicas", u.Replicas, s.Replicas)
}
return nil
}

Expand Down Expand Up @@ -2449,6 +2452,17 @@ func InactiveThreshold(threshold time.Duration) SubOpt {
})
}

// ConsumerReplicas sets the number of replica count for a consumer.
func ConsumerReplicas(replicas int) SubOpt {
return subOptFn(func(opts *subOpts) error {
if replicas < 1 {
return fmt.Errorf("invalid ConsumerReplicas value (%v), needs to be greater than 0", replicas)
}
opts.cfg.Replicas = replicas
return nil
})
}

func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) {
sub.mu.Lock()
// TODO(dlc) - Better way to mark especially if we attach.
Expand Down
33 changes: 33 additions & 0 deletions test/js_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7272,3 +7272,36 @@ func TestJetStreamDirectGetMsg(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}
}

func TestJetStreamConsumerReplicasOption(t *testing.T) {
withJSCluster(t, "CR", 3, func(t *testing.T, nodes ...*jsServer) {
nc, js := jsClient(t, nodes[0].Server)
defer nc.Close()

if _, err := js.AddStream(&nats.StreamConfig{
Name: "ConsumerReplicasTest",
Subjects: []string{"foo"},
Replicas: 3,
}); err != nil {
t.Fatalf("Error adding stream: %v", err)
}

// Subscribe to the stream with a durable consumer "bar" and replica set to 1.
cb := func(msg *nats.Msg) {}
_, err := js.Subscribe("foo", cb, nats.Durable("bar"), nats.ConsumerReplicas(1))
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}

// Get consumer info
consInfo, err := js.ConsumerInfo("ConsumerReplicasTest", "bar")
if err != nil {
t.Fatalf("Error getting consumer info: %v", err)
}

// Check if the number of replicas is the same as we provided.
if consInfo.Config.Replicas != 1 {
t.Fatalf("Expected consumer replica to be %v, got %+v", 1, consInfo.Config.Replicas)
}
})
}

0 comments on commit fcc7c44

Please sign in to comment.