Skip to content

Commit

Permalink
Add consumer to msg Metadata
Browse files Browse the repository at this point in the history
Signed-off-by: Waldemar Quevedo <wally@synadia.com>
  • Loading branch information
wallyqs committed Apr 12, 2021
1 parent b8530c7 commit 1645e08
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 1 deletion.
4 changes: 3 additions & 1 deletion example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,14 +688,16 @@ func ExampleMsg_Metadata() {
Subjects: []string{"foo"},
})

js.Publish("foo", []byte("hello"))

sub, _ := js.SubscribeSync("foo")
msg, _ := sub.NextMsg(2 * time.Second)

//
meta, _ := msg.Metadata()

// Stream and Consumer sequences.
fmt.Printf("Stream seq: %d, Consumer seq: %d\n", meta.Sequence.Stream, meta.Sequence.Consumer)
fmt.Printf("Stream seq: %s:%d, Consumer seq: %s:%d\n", meta.Stream, meta.Sequence.Stream, meta.Consumer, meta.Sequence.Consumer)
fmt.Printf("Pending: %d\n", meta.NumPending)
fmt.Printf("Pending: %d\n", meta.NumDelivered)
}
Expand Down
2 changes: 2 additions & 0 deletions js.go
Original file line number Diff line number Diff line change
Expand Up @@ -1734,6 +1734,7 @@ type MsgMetadata struct {
NumPending uint64
Timestamp time.Time
Stream string
Consumer string
}

func getMetadataFields(subject string) ([]string, error) {
Expand Down Expand Up @@ -1772,6 +1773,7 @@ func (m *Msg) Metadata() (*MsgMetadata, error) {
NumPending: uint64(parseNum(tokens[8])),
Timestamp: time.Unix(0, parseNum(tokens[7])),
Stream: tokens[2],
Consumer: tokens[3],
}
meta.Sequence.Stream = uint64(parseNum(tokens[5]))
meta.Sequence.Consumer = uint64(parseNum(tokens[6]))
Expand Down
21 changes: 21 additions & 0 deletions test/js_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,26 @@ func TestJetStreamSubscribe(t *testing.T) {
if info.Config.AckWait != ackWait {
t.Errorf("Expected %v, got %v", ackWait, info.Config.AckWait)
}

// Add Stream and Consumer name to metadata.
sub, err = js.SubscribeSync("bar", nats.Durable("consumer-name"))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
m, err := sub.NextMsg(1 * time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
meta, err := m.Metadata()
if err != nil {
t.Fatal(err)
}
if meta.Stream != "TEST" {
t.Fatalf("Unexpected stream name, got: %v", meta.Stream)
}
if meta.Consumer != "consumer-name" {
t.Fatalf("Unexpected consumer name, got: %v", meta.Consumer)
}
}

func TestJetStreamAckPending_Pull(t *testing.T) {
Expand All @@ -598,6 +618,7 @@ func TestJetStreamAckPending_Pull(t *testing.T) {

if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)

}

nc, err := nats.Connect(s.ClientURL())
Expand Down

0 comments on commit 1645e08

Please sign in to comment.