Skip to content

Commit

Permalink
Updated Writer and Reader docs to show use of multiple brokers. (segm…
Browse files Browse the repository at this point in the history
…entio#959)

Based on the discussion [here](segmentio#894 (comment))
  • Loading branch information
lakamsani authored Jul 30, 2022
1 parent f5c0e30 commit 174188e
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 33 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,8 @@ _testmain.go
# Goland
.idea

#IntelliJ
*.iml

# govendor
/vendor/*/
63 changes: 32 additions & 31 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ process shutdown.
```go
// make a new reader that consumes from topic-A, partition 0, at offset 42
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Brokers: []string{"localhost:9092","localhost:9093", "localhost:9094"},
Topic: "topic-A",
Partition: 0,
MinBytes: 10e3, // 10KB
Expand Down Expand Up @@ -253,7 +253,7 @@ ReadMessage automatically commits offsets when using consumer groups.
```go
// make a new reader that consumes from topic-A
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},
GroupID: "consumer-group-id",
Topic: "topic-A",
MinBytes: 10e3, // 10KB
Expand Down Expand Up @@ -317,7 +317,7 @@ by setting CommitInterval on the ReaderConfig.
```go
// make a new reader that consumes from topic-A
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},
GroupID: "consumer-group-id",
Topic: "topic-A",
MinBytes: 10e3, // 10KB
Expand All @@ -342,7 +342,7 @@ to use in most cases as it provides additional features:
```go
// make a writer that produces to topic-A, using the least-bytes distribution
w := &kafka.Writer{
Addr: kafka.TCP("localhost:9092"),
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
Topic: "topic-A",
Balancer: &kafka.LeastBytes{},
}
Expand Down Expand Up @@ -376,7 +376,7 @@ if err := w.Close(); err != nil {
// Make a writer that publishes messages to topic-A.
// The topic will be created if it is missing.
w := &Writer{
Addr: TCP("localhost:9092"),
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
Topic: "topic-A",
AllowAutoTopicCreation: true,
}
Expand Down Expand Up @@ -427,7 +427,7 @@ the topic on a per-message basis by setting `Message.Topic`.

```go
w := &kafka.Writer{
Addr: kafka.TCP("localhost:9092"),
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
// NOTE: When Topic is not defined here, each Message must define it instead.
Balancer: &kafka.LeastBytes{},
}
Expand Down Expand Up @@ -478,7 +478,7 @@ aforementioned Sarama partitioners would route them to.

```go
w := &kafka.Writer{
Addr: kafka.TCP("localhost:9092"),
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
Topic: "topic-A",
Balancer: &kafka.Hash{},
}
Expand All @@ -491,7 +491,7 @@ default ```consistent_random``` partition strategy.

```go
w := &kafka.Writer{
Addr: kafka.TCP("localhost:9092"),
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
Topic: "topic-A",
Balancer: kafka.CRC32Balancer{},
}
Expand All @@ -505,7 +505,7 @@ the partition which is not permitted.

```go
w := &kafka.Writer{
Addr: kafka.TCP("localhost:9092"),
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
Topic: "topic-A",
Balancer: kafka.Murmur2Balancer{},
}
Expand All @@ -517,7 +517,7 @@ Compression can be enabled on the `Writer` by setting the `Compression` field:

```go
w := &kafka.Writer{
Addr: kafka.TCP("localhost:9092"),
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
Topic: "topic-A",
Compression: kafka.Snappy,
}
Expand Down Expand Up @@ -559,7 +559,7 @@ dialer := &kafka.Dialer{
}

r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9093"},
Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},
GroupID: "consumer-group-id",
Topic: "topic-A",
Dialer: dialer,
Expand All @@ -568,6 +568,20 @@ r := kafka.NewReader(kafka.ReaderConfig{

### Writer


Direct Writer creation

```go
w := kafka.Writer{
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
Topic: "topic-A",
Balancer: &kafka.Hash{},
Transport: &kafka.Transport{
TLS: &tls.Config{},
},
}
```

Using `kafka.NewWriter`

```go
Expand All @@ -578,26 +592,13 @@ dialer := &kafka.Dialer{
}

w := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"localhost:9093"},
Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},
Topic: "topic-A",
Balancer: &kafka.Hash{},
Dialer: dialer,
})
```

Direct Writer creation

```go
w := kafka.Writer{
Addr: kafka.TCP("localhost:9093"),
Topic: "topic-A",
Balancer: &kafka.Hash{},
Transport: &kafka.Transport{
TLS: &tls.Config{},
},
}

```
Note that `kafka.NewWriter` and `kafka.WriterConfig` are deprecated and will be removed in a future release.

## SASL Support

Expand Down Expand Up @@ -654,7 +655,7 @@ dialer := &kafka.Dialer{
}

r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9093"},
Brokers: []string{"localhost:9092","localhost:9093", "localhost:9094"},
GroupID: "consumer-group-id",
Topic: "topic-A",
Dialer: dialer,
Expand All @@ -677,7 +678,7 @@ sharedTransport := &kafka.Transport{
}

w := kafka.Writer{
Addr: kafka.TCP("localhost:9092"),
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
Topic: "topic-A",
Balancer: &kafka.Hash{},
Transport: sharedTransport,
Expand All @@ -700,7 +701,7 @@ sharedTransport := &kafka.Transport{
}

client := &kafka.Client{
Addr: kafka.TCP("localhost:9092"),
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
Timeout: 10 * time.Second,
Transport: sharedTransport,
}
Expand All @@ -714,7 +715,7 @@ endTime := time.Now()
batchSize := int(10e6) // 10MB

r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},
Topic: "my-topic1",
Partition: 0,
MinBytes: batchSize,
Expand Down Expand Up @@ -756,7 +757,7 @@ func logf(msg string, a ...interface{}) {
}

r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},
Topic: "my-topic1",
Partition: 0,
Logger: kafka.LoggerFunc(logf),
Expand Down
4 changes: 2 additions & 2 deletions writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
//
// // Construct a synchronous writer (the default mode).
// w := &kafka.Writer{
// Addr: kafka.TCP("localhost:9092"),
// Addr: Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
// Topic: "topic-A",
// RequiredAcks: kafka.RequireAll,
// }
Expand All @@ -55,7 +55,7 @@ import (
// writer to receive notifications of messages being written to kafka:
//
// w := &kafka.Writer{
// Addr: kafka.TCP("localhost:9092"),
// Addr: Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
// Topic: "topic-A",
// RequiredAcks: kafka.RequireAll,
// Async: true, // make the writer asynchronous
Expand Down

0 comments on commit 174188e

Please sign in to comment.