Skip to content

Commit

Permalink
[ADDED] Option to set memory storage to true for a consumer (nats-io#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Deepak Sah authored Sep 15, 2022
1 parent caf5af2 commit 866ce08
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 0 deletions.
5 changes: 5 additions & 0 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,11 @@ func ExampleSubOpt() {
js.Subscribe("foo", func(msg *nats.Msg) {
fmt.Printf("Received a message: %s\n", string(msg.Data))
}, nats.Durable("FOO"), nats.ConsumerReplicas(1))

// Force memory storage while subscribing.
js.Subscribe("foo", func(msg *nats.Msg) {
fmt.Printf("Received a message: %s\n", string(msg.Data))
}, nats.Durable("FOO"), nats.ConsumerMemoryStorage())
}

func ExampleMaxWait() {
Expand Down
11 changes: 11 additions & 0 deletions js.go
Original file line number Diff line number Diff line change
Expand Up @@ -1357,6 +1357,9 @@ func checkConfig(s, u *ConsumerConfig) error {
if u.Replicas > 0 && u.Replicas != s.Replicas {
return makeErr("replicas", u.Replicas, s.Replicas)
}
if u.MemoryStorage && !s.MemoryStorage {
return makeErr("memory storage", u.MemoryStorage, s.MemoryStorage)
}
return nil
}

Expand Down Expand Up @@ -2485,6 +2488,14 @@ func ConsumerReplicas(replicas int) SubOpt {
})
}

// ConsumerMemoryStorage sets the memory storage to true for a consumer.
func ConsumerMemoryStorage() SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.MemoryStorage = true
return nil
})
}

func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) {
sub.mu.Lock()
// TODO(dlc) - Better way to mark especially if we attach.
Expand Down
61 changes: 61 additions & 0 deletions js_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1096,3 +1096,64 @@ func TestJetStreamConvertDirectMsgResponseToMsg(t *testing.T) {
t.Fatalf("Wrong header: %v", r.Header)
}
}

func TestJetStreamConsumerMemoryStorage(t *testing.T) {
opts := natsserver.DefaultTestOptions
opts.Port = -1
opts.JetStream = true
s := natsserver.RunServer(&opts)
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

if _, err := js.AddStream(&StreamConfig{Name: "STR", Subjects: []string{"foo"}}); err != nil {
t.Fatalf("Error adding stream: %v", err)
}

// Pull ephemeral consumer with memory storage.
sub, err := js.PullSubscribe("foo", "", ConsumerMemoryStorage())
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}

consInfo, err := sub.ConsumerInfo()
if err != nil {
t.Fatalf("Error getting consumer info: %v", err)
}

if !consInfo.Config.MemoryStorage {
t.Fatalf("Expected memory storage to be %v, got %+v", true, consInfo.Config.MemoryStorage)
}

// Create a sync subscription with an in-memory ephemeral consumer.
sub, err = js.SubscribeSync("foo", ConsumerMemoryStorage())
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}

consInfo, err = sub.ConsumerInfo()
if err != nil {
t.Fatalf("Error getting consumer info: %v", err)
}

if !consInfo.Config.MemoryStorage {
t.Fatalf("Expected memory storage to be %v, got %+v", true, consInfo.Config.MemoryStorage)
}

// Async subscription with an in-memory ephemeral consumer.
cb := func(msg *Msg) {}
sub, err = js.Subscribe("foo", cb, ConsumerMemoryStorage())
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}

consInfo, err = sub.ConsumerInfo()
if err != nil {
t.Fatalf("Error getting consumer info: %v", err)
}

if !consInfo.Config.MemoryStorage {
t.Fatalf("Expected memory storage to be %v, got %+v", true, consInfo.Config.MemoryStorage)
}
}

0 comments on commit 866ce08

Please sign in to comment.