From 6f01c1e17252ef0582845658b78508040fc5145d Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Thu, 1 Apr 2021 09:58:49 -0700 Subject: [PATCH] js: Add godoc examples for JetStream context Signed-off-by: Waldemar Quevedo --- example_test.go | 503 +++++++++++++++++++++++++++++++++++++++++++++++- js.go | 4 +- nats.go | 17 ++ norace_test.go | 2 +- 4 files changed, 520 insertions(+), 6 deletions(-) diff --git a/example_test.go b/example_test.go index f23f69073..992629683 100644 --- a/example_test.go +++ b/example_test.go @@ -1,4 +1,4 @@ -// Copyright 2012-2019 The NATS Authors +// Copyright 2012-2021 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -14,16 +14,17 @@ package nats_test import ( + "context" "fmt" + "log" "time" "github.com/nats-io/nats.go" ) -// Shows different ways to create a Conn +// Shows different ways to create a Conn. func ExampleConnect() { - - nc, _ := nats.Connect(nats.DefaultURL) + nc, _ := nats.Connect("demo.nats.io") nc.Close() nc, _ = nats.Connect("nats://derek:secretpassword@demo.nats.io:4222") @@ -277,3 +278,497 @@ func ExampleEncodedConn_BindRecvChan() { fmt.Printf("%v says hello!\n", who) } + +func ExampleJetStream() { + nc, err := nats.Connect("localhost") + if err != nil { + log.Fatal(err) + } + + // Use the JetStream context to produce and consumer messages + // that have been persisted. + js, err := nc.JetStream(nats.PublishAsyncMaxPending(256)) + if err != nil { + log.Fatal(err) + } + + js.AddStream(&nats.StreamConfig{ + Name: "FOO", + Subjects: []string{"foo"}, + }) + + js.Publish("foo", []byte("Hello JS!")) + + // Publish messages asynchronously. + for i := 0; i < 500; i++ { + js.PublishAsync("foo", []byte("Hello JS Async!")) + } + select { + case <-js.PublishAsyncComplete(): + case <-time.After(5 * time.Second): + fmt.Println("Did not resolve in time") + } + + // Create async consumer on subject 'foo'. Async subscribers + // ack a message once exiting the callback. + js.Subscribe("foo", func(msg *nats.Msg) { + meta, _ := msg.Metadata() + fmt.Printf("Stream Sequence : %v\n", meta.Sequence.Stream) + fmt.Printf("Consumer Sequence: %v\n", meta.Sequence.Consumer) + }) + + // Async subscriber with manual acks. + js.Subscribe("foo", func(msg *nats.Msg) { + msg.Ack() + }, nats.ManualAck()) + + // Async queue subscription where members load balance the + // received messages together. + js.QueueSubscribe("foo", "group", func(msg *nats.Msg) { + msg.Ack() + }, nats.ManualAck()) + + // Subscriber to consume messages synchronously. + sub, _ := js.SubscribeSync("foo") + msg, _ := sub.NextMsg(2 * time.Second) + msg.Ack() + + // QueueSubscribe with group or load balancing. + sub, _ = js.QueueSubscribeSync("foo", "group") + msg, _ = sub.NextMsg(2 * time.Second) + msg.Ack() + + // ChanSubscribe + msgCh := make(chan *nats.Msg, 8192) + sub, _ = js.ChanSubscribe("foo", msgCh) + + select { + case msg := <-msgCh: + fmt.Println("[Received]", msg) + case <-time.After(1 * time.Second): + } + + // Create Pull based consumer with maximum 128 inflight. + sub, _ = js.PullSubscribe("foo", "wq", nats.PullMaxWaiting(128)) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + for { + select { + case <-ctx.Done(): + return + default: + } + + msgs, _ := sub.Fetch(10, nats.Context(ctx)) + for _, msg := range msgs { + msg.Ack() + } + } +} + +// A JetStream context can be configured with a default timeout using nats.MaxWait +// or with a custom API prefix in case of using an imported JetStream from another account. +func ExampleJSOpt() { + nc, err := nats.Connect("localhost") + if err != nil { + log.Fatal(err) + } + + // Use the JetStream context to manage streams and consumers (with nats.APIPrefix JSOpt) + js, err := nc.JetStream(nats.APIPrefix("dlc"), nats.MaxWait(5*time.Second)) + if err != nil { + log.Fatal(err) + } + sub, _ := js.SubscribeSync("foo") + js.Publish("foo", []byte("Hello JS!")) + sub.NextMsg(2 * time.Second) +} + +func ExampleJetStreamManager() { + nc, _ := nats.Connect("localhost") + + js, _ := nc.JetStream() + + // Create a stream + js.AddStream(&nats.StreamConfig{ + Name: "FOO", + Subjects: []string{"foo"}, + MaxBytes: 1024, + }) + + // Update a stream + js.UpdateStream(&nats.StreamConfig{ + Name: "FOO", + MaxBytes: 2048, + }) + + // Create a druable consumer + js.AddConsumer("FOO", &nats.ConsumerConfig{ + Durable: "BAR", + }) + + // Get information about all streams (with Context JSOpt) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + for info := range js.StreamsInfo(nats.Context(ctx)) { + fmt.Println("stream name:", info.Config.Name) + } + + // Get information about all consumers (with MaxWait JSOpt) + for info := range js.ConsumersInfo("FOO", nats.MaxWait(10*time.Second)) { + fmt.Println("consumer name:", info.Name) + } + + // Delete a consumer + js.DeleteConsumer("FOO", "BAR") + + // Delete a stream + js.DeleteStream("FOO") +} + +// A JetStreamContext is the composition of a JetStream and JetStreamManagement interfaces. +// In case of only requiring publishing/consuming messages, can create a context that +// only uses the JetStream interface. +func ExampleJetStreamContext() { + nc, _ := nats.Connect("localhost") + + var js nats.JetStream + var jsm nats.JetStreamManager + var jsctx nats.JetStreamContext + + // JetStream that can publish/subscribe but cannot manage streams. + js, _ = nc.JetStream() + js.Publish("foo", []byte("hello")) + + // JetStream context that can manage streams/consumers but cannot produce messages. + jsm, _ = nc.JetStream() + jsm.AddStream(&nats.StreamConfig{Name: "FOO"}) + + // JetStream context that can both manage streams/consumers + // as well as publish/subscribe. + jsctx, _ = nc.JetStream() + jsctx.AddStream(&nats.StreamConfig{Name: "BAR"}) + jsctx.Publish("bar", []byte("hello world")) +} + +func ExamplePubOpt() { + nc, err := nats.Connect("localhost") + if err != nil { + log.Fatal(err) + } + + // Create JetStream context to produce/consumer messages that will be persisted. + js, err := nc.JetStream() + if err != nil { + log.Fatal(err) + } + + // Create stream to persist messages published on 'foo'. + js.AddStream(&nats.StreamConfig{ + Name: "FOO", + Subjects: []string{"foo"}, + }) + + // Publish is synchronous by default, and waits for a PubAck response. + js.Publish("foo", []byte("Hello JS!")) + + // Publish with a custom timeout. + js.Publish("foo", []byte("Hello JS!"), nats.AckWait(500*time.Millisecond)) + + // Publish with a context. + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + js.Publish("foo", []byte("Hello JS!"), nats.Context(ctx)) + + // Publish and assert the expected stream name. + js.Publish("foo", []byte("Hello JS!"), nats.ExpectStream("FOO")) + + // Publish and assert the last sequence. + js.Publish("foo", []byte("Hello JS!"), nats.ExpectLastSequence(5)) + + // Publish and tag the message with an ID. + js.Publish("foo", []byte("Hello JS!"), nats.MsgId("foo:6")) + + // Publish and assert the last msg ID. + js.Publish("foo", []byte("Hello JS!"), nats.ExpectLastMsgId("foo:6")) +} + +func ExampleSubOpt() { + nc, err := nats.Connect("localhost") + if err != nil { + log.Fatal(err) + } + + // Create JetStream context to produce/consumer messages that will be persisted. + js, err := nc.JetStream() + if err != nil { + log.Fatal(err) + } + + // Auto-ack each individual message. + js.Subscribe("foo", func(msg *nats.Msg) { + fmt.Printf("Received a message: %s\n", string(msg.Data)) + }) + + // Auto-ack current sequence and all below. + js.Subscribe("foo", func(msg *nats.Msg) { + fmt.Printf("Received a message: %s\n", string(msg.Data)) + }, nats.AckAll()) + + // Auto-ack each individual message. + js.Subscribe("foo", func(msg *nats.Msg) { + fmt.Printf("Received a message: %s\n", string(msg.Data)) + }, nats.AckExplicit()) + + // Acks are not required. + js.Subscribe("foo", func(msg *nats.Msg) { + fmt.Printf("Received a message: %s\n", string(msg.Data)) + }, nats.AckNone()) + + // Manually acknowledge messages. + js.Subscribe("foo", func(msg *nats.Msg) { + msg.Ack() + }, nats.ManualAck()) + + // Bind to an existing stream. + sub, _ := js.SubscribeSync("origin", nats.BindStream("m1")) + msg, _ := sub.NextMsg(2 * time.Second) + msg.Ack() + + // Deliver all messages from the beginning. + js.Subscribe("foo", func(msg *nats.Msg) { + fmt.Printf("Received a message: %s\n", string(msg.Data)) + }, nats.DeliverAll()) + + // Deliver messages starting from the last one. + js.Subscribe("foo", func(msg *nats.Msg) { + fmt.Printf("Received a message: %s\n", string(msg.Data)) + }, nats.DeliverLast()) + + // Deliver only new messages that arrive after subscription. + js.Subscribe("foo", func(msg *nats.Msg) { + fmt.Printf("Received a message: %s\n", string(msg.Data)) + }, nats.DeliverNew()) + + // Create durable consumer FOO, if it doesn't exist. + js.Subscribe("foo", func(msg *nats.Msg) { + fmt.Printf("Received a message: %s\n", string(msg.Data)) + }, nats.Durable("FOO")) + + // Create consumer on Foo with flow control and heartbeats. + js.SubscribeSync("foo", + // Redeliver after 30s + nats.AckWait(30*time.Second), + // Redeliver only once + nats.MaxDeliver(1), + // Activate Flow control algorithm from the server. + nats.EnableFlowControl(), + // Track heartbeats from the server fro missed sequences. + nats.IdleHeartbeat(500*time.Millisecond), + ) + + // Set the allowable number of outstanding acks. + js.Subscribe("foo", func(msg *nats.Msg) { + fmt.Printf("Received a message: %s\n", string(msg.Data)) + }, nats.MaxAckPending(5)) + + // Set the number of redeliveries for a message. + js.Subscribe("foo", func(msg *nats.Msg) { + fmt.Printf("Received a message: %s\n", string(msg.Data)) + }, nats.MaxDeliver(5)) + + // Set the number the max inflight pull requests. + js.Subscribe("foo", func(msg *nats.Msg) { + fmt.Printf("Received a message: %s\n", string(msg.Data)) + }, nats.PullMaxWaiting(5)) + + // Set the number the max inflight pull requests. + js.Subscribe("foo", func(msg *nats.Msg) { + fmt.Printf("Received a message: %s\n", string(msg.Data)) + }, nats.PullMaxWaiting(5)) + + // Set the rate limit on a push consumer. + js.Subscribe("foo", func(msg *nats.Msg) { + fmt.Printf("Received a message: %s\n", string(msg.Data)) + }, nats.RateLimit(1024)) + + // Replay messages at original speed, instead of as fast as possible. + js.Subscribe("foo", func(msg *nats.Msg) { + fmt.Printf("Received a message: %s\n", string(msg.Data)) + }, nats.ReplayOriginal()) + + // Start delivering messages at a given sequence. + js.Subscribe("foo", func(msg *nats.Msg) { + fmt.Printf("Received a message: %s\n", string(msg.Data)) + }, nats.StartSequence(10)) + + // Start delivering messages at a given time. + js.Subscribe("foo", func(msg *nats.Msg) { + fmt.Printf("Received a message: %s\n", string(msg.Data)) + }, nats.StartTime(time.Now().Add(-2*time.Hour))) +} + +func ExampleMaxWait() { + nc, _ := nats.Connect("localhost") + + // Set default timeout for JetStream API requests, + // following requests will inherit this timeout. + js, _ := nc.JetStream(nats.MaxWait(3 * time.Second)) + + // Set custom timeout for a JetStream API request. + js.AddStream(&nats.StreamConfig{ + Name: "FOO", + Subjects: []string{"foo"}, + }, nats.MaxWait(2*time.Second)) + + sub, _ := js.PullSubscribe("foo", "my-durable-name") + + // Fetch using the default timeout of 3 seconds. + msgs, _ := sub.Fetch(1) + + // Set custom timeout for a pull batch request. + msgs, _ = sub.Fetch(1, nats.MaxWait(2*time.Second)) + + for _, msg := range msgs { + msg.Ack() + } +} + +func ExampleAckWait() { + nc, _ := nats.Connect("localhost") + js, _ := nc.JetStream() + + // Set custom timeout for a JetStream API request. + js.AddStream(&nats.StreamConfig{ + Name: "FOO", + Subjects: []string{"foo"}, + }) + + // Wait for an ack response for 2 seconds. + js.Publish("foo", []byte("Hello JS!"), nats.AckWait(2*time.Second)) + + // Create consumer on 'foo' subject that waits for an ack for 10s, + // after which the message will be delivered. + sub, _ := js.SubscribeSync("foo", nats.AckWait(10*time.Second)) + msg, _ := sub.NextMsg(2 * time.Second) + + // Wait for ack of ack for 2s. + msg.AckSync(nats.AckWait(2 * time.Second)) +} + +func ExampleMsg_AckSync() { + nc, _ := nats.Connect("localhost") + js, _ := nc.JetStream() + + // Set custom timeout for a JetStream API request. + js.AddStream(&nats.StreamConfig{ + Name: "FOO", + Subjects: []string{"foo"}, + }) + + sub, _ := js.SubscribeSync("foo") + msg, _ := sub.NextMsg(2 * time.Second) + + // Wait for ack of an ack. + msg.AckSync() +} + +// When a message has been delivered by JetStream, it will be possible +// to access some of its metadata such as sequence numbers. +func ExampleMsg_Metadata() { + nc, _ := nats.Connect("localhost") + js, _ := nc.JetStream() + + // Set custom timeout for a JetStream API request. + js.AddStream(&nats.StreamConfig{ + Name: "FOO", + Subjects: []string{"foo"}, + }) + + sub, _ := js.SubscribeSync("foo") + msg, _ := sub.NextMsg(2 * time.Second) + + // + meta, _ := msg.Metadata() + + // Stream and Consumer sequences. + fmt.Printf("Stream seq: %d, Consumer seq: %d\n", meta.Sequence.Stream, meta.Sequence.Consumer) + fmt.Printf("Pending: %d\n", meta.NumPending) + fmt.Printf("Pending: %d\n", meta.NumDelivered) +} + +// AckOpt are the options that can be passed when acknowledge a message. +func ExampleAckOpt() { + nc, err := nats.Connect("localhost") + if err != nil { + log.Fatal(err) + } + + // Create JetStream context to produce/consumer messages that will be persisted. + js, err := nc.JetStream() + if err != nil { + log.Fatal(err) + } + + // Create stream to persist messages published on 'foo'. + js.AddStream(&nats.StreamConfig{ + Name: "FOO", + Subjects: []string{"foo"}, + }) + + // Publish is synchronous by default, and waits for a PubAck response. + js.Publish("foo", []byte("Hello JS!")) + + sub, _ := js.SubscribeSync("foo") + msg, _ := sub.NextMsg(2 * time.Second) + + // Ack and wait for 2 seconds + msg.InProgress(nats.AckWait(2)) + + // Using a context. + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + msg.Ack(nats.Context(ctx)) +} + +func ExamplePullOpt() { + nc, err := nats.Connect("localhost") + if err != nil { + log.Fatal(err) + } + + // Create JetStream context to produce/consumer messages that will be persisted. + js, err := nc.JetStream() + if err != nil { + log.Fatal(err) + } + + // Create stream to persist messages published on 'foo'. + js.AddStream(&nats.StreamConfig{ + Name: "FOO", + Subjects: []string{"foo"}, + }) + + // Publish is synchronous by default, and waits for a PubAck response. + js.Publish("foo", []byte("Hello JS!")) + + sub, _ := js.PullSubscribe("foo", "wq") + + // Pull one message, + msgs, _ := sub.Fetch(1, nats.MaxWait(2*time.Second)) + for _, msg := range msgs { + msg.Ack() + } + + // Using a context to timeout waiting for a message. + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + msgs, _ = sub.Fetch(1, nats.Context(ctx)) + for _, msg := range msgs { + msg.Ack() + } +} diff --git a/js.go b/js.go index 4d39cd7a3..4e1b23273 100644 --- a/js.go +++ b/js.go @@ -693,6 +693,7 @@ type ackOpts struct { ctx context.Context } +// AckOpt are the options that can be passed when acknowledge a message. type AckOpt interface { configureAck(opts *ackOpts) error } @@ -1304,11 +1305,12 @@ type pullOpts struct { ctx context.Context } +// PullOpt are the options that can be passed when pulling a batch of messages. type PullOpt interface { configurePull(opts *pullOpts) error } -// PullMaxWaiting defines the max inflight pull requests to be delivered more messages. +// PullMaxWaiting defines the max inflight pull requests. func PullMaxWaiting(n int) SubOpt { return subOptFn(func(opts *subOpts) error { opts.cfg.MaxWaiting = n diff --git a/nats.go b/nats.go index b9037db8c..bcec733ae 100644 --- a/nats.go +++ b/nats.go @@ -542,6 +542,23 @@ type Subscription struct { // Msg represents a message delivered by NATS. This structure is used // by Subscribers and PublishMsg(). +// +// Types of Acknowledgements +// +// In case using JetStream, there are multiple ways to ack a Msg: +// +// // Acknowledgement that a message has been processed. +// msg.Ack() +// +// // Negatively acknowledges a message. +// msg.Nak() +// +// // Terminate a message so that it is not redelivered further. +// msg.Term() +// +// // Signal the server that the message is being worked on and reset redelivery timer. +// msg.InProgress() +// type Msg struct { Subject string Reply string diff --git a/norace_test.go b/norace_test.go index 512a359fb..f704de832 100644 --- a/norace_test.go +++ b/norace_test.go @@ -135,7 +135,7 @@ func TestNoRaceJetStreamConsumerSlowConsumer(t *testing.T) { nc.SetErrorHandler(func(_ *Conn, _ *Subscription, _ error) {}) // Queue up 1M small messages. - toSend := uint64(1_000_000) + toSend := uint64(1000000) for i := uint64(0); i < toSend; i++ { nc.Publish("js.p", []byte("ok")) }