Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs(readme): add error-handling to example code #513

Merged
merged 1 commit into from
Sep 19, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
docs(readme): add error-handling to example code
The current README includes quite a few code snippets that include no
error-handling. If someone is trying to learn this library for the first
time, it can make understanding why things fail much more difficult
without first adding this error-handling to their own code.

It's just good hygiene in Go to handle errors, so I think our examples
should reflect that, rather than try so hard to keep the code shorter.
  • Loading branch information
dominicbarnes committed Sep 18, 2020
commit a5185a8e38141eb3f8ba34e282fced373e20028b
55 changes: 41 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,23 +50,34 @@ Here are some examples showing typical use of a connection object:
topic := "my-topic"
partition := 0

conn, _ := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
if err != nil {
log.Fatal("failed to dial leader:", err)
}

conn.SetWriteDeadline(time.Now().Add(10*time.Second))
conn.WriteMessages(
err := conn.WriteMessages(
kafka.Message{Value: []byte("one!")},
kafka.Message{Value: []byte("two!")},
kafka.Message{Value: []byte("three!")},
)
if err != nil {
log.Fatal("failed to write messages:", err)
}

conn.Close()
if err := conn.Close(); err != nil {
log.Fatal("failed to close writer:", err)
}
```
```go
// to consume messages
topic := "my-topic"
partition := 0

conn, _ := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
if err != nil {
log.Fatal("failed to dial leader:", err)
}

conn.SetReadDeadline(time.Now().Add(10*time.Second))
batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max
Expand All @@ -80,8 +91,13 @@ for {
fmt.Println(string(b))
}

batch.Close()
conn.Close()
if err := batch.Close(); err != nil {
log.Fatal("failed to close batch:", err)
}

if err := conn.Close(); err != nil {
log.Fatal("failed to close connection:", err)
}
```

Because it is low level, the `Conn` type turns out to be a great building block
Expand Down Expand Up @@ -115,7 +131,9 @@ for {
fmt.Printf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value))
}

r.Close()
if err := r.Close(); err != nil {
log.Fatal("failed to close reader:", err)
}
```

### Consumer Groups
Expand Down Expand Up @@ -143,7 +161,9 @@ for {
fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
}

r.Close()
if err := r.Close(); err != nil {
log.Fatal("failed to close reader:", err)
}
```

There are a number of limitations when using consumer groups:
Expand All @@ -167,7 +187,9 @@ for {
break
}
fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
r.CommitMessages(ctx, m)
if err := r.CommitMessages(ctx, m); err != nil {
log.Fatal("failed to commit messages:", err)
}
}
```

Expand Down Expand Up @@ -210,7 +232,7 @@ w := kafka.NewWriter(kafka.WriterConfig{
Balancer: &kafka.LeastBytes{},
})

w.WriteMessages(context.Background(),
err := w.WriteMessages(context.Background(),
kafka.Message{
Key: []byte("Key-A"),
Value: []byte("Hello World!"),
Expand All @@ -224,8 +246,13 @@ w.WriteMessages(context.Background(),
Value: []byte("Two!"),
},
)
if err != nil {
log.Fatal("failed to write messages:", err)
}

w.Close()
if err := w.Close(); err != nil {
log.Fatal("failed to close writer:", err)
}
```

**Note:** Even though kafka.Message contain ```Topic``` and ```Partition``` fields, they **MUST NOT** be
Expand Down Expand Up @@ -286,9 +313,9 @@ w := kafka.NewWriter(kafka.WriterConfig{
})
```

The `Reader` will by determine if the consumed messages are compressed by
examining the message attributes. However, the package(s) for all expected
codecs must be imported so that they get loaded correctly. For example, if you
The `Reader` will by determine if the consumed messages are compressed by
examining the message attributes. However, the package(s) for all expected
codecs must be imported so that they get loaded correctly. For example, if you
are going to be receiving messages compressed with Snappy, add the following
import:

Expand Down