Skip to content

Commit

Permalink
docs(readme): add error-handling to example code (segmentio#513)
Browse files Browse the repository at this point in the history
  • Loading branch information
dominicbarnes authored Sep 19, 2020
1 parent 7bc2404 commit 418f1fd
Showing 1 changed file with 41 additions and 14 deletions.
55 changes: 41 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,23 +50,34 @@ Here are some examples showing typical use of a connection object:
topic := "my-topic"
partition := 0

conn, _ := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
if err != nil {
log.Fatal("failed to dial leader:", err)
}

conn.SetWriteDeadline(time.Now().Add(10*time.Second))
conn.WriteMessages(
err := conn.WriteMessages(
kafka.Message{Value: []byte("one!")},
kafka.Message{Value: []byte("two!")},
kafka.Message{Value: []byte("three!")},
)
if err != nil {
log.Fatal("failed to write messages:", err)
}

conn.Close()
if err := conn.Close(); err != nil {
log.Fatal("failed to close writer:", err)
}
```
```go
// to consume messages
topic := "my-topic"
partition := 0

conn, _ := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
if err != nil {
log.Fatal("failed to dial leader:", err)
}

conn.SetReadDeadline(time.Now().Add(10*time.Second))
batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max
Expand All @@ -80,8 +91,13 @@ for {
fmt.Println(string(b))
}

batch.Close()
conn.Close()
if err := batch.Close(); err != nil {
log.Fatal("failed to close batch:", err)
}

if err := conn.Close(); err != nil {
log.Fatal("failed to close connection:", err)
}
```

Because it is low level, the `Conn` type turns out to be a great building block
Expand Down Expand Up @@ -115,7 +131,9 @@ for {
fmt.Printf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value))
}

r.Close()
if err := r.Close(); err != nil {
log.Fatal("failed to close reader:", err)
}
```

### Consumer Groups
Expand Down Expand Up @@ -143,7 +161,9 @@ for {
fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
}

r.Close()
if err := r.Close(); err != nil {
log.Fatal("failed to close reader:", err)
}
```

There are a number of limitations when using consumer groups:
Expand All @@ -167,7 +187,9 @@ for {
break
}
fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
r.CommitMessages(ctx, m)
if err := r.CommitMessages(ctx, m); err != nil {
log.Fatal("failed to commit messages:", err)
}
}
```

Expand Down Expand Up @@ -210,7 +232,7 @@ w := kafka.NewWriter(kafka.WriterConfig{
Balancer: &kafka.LeastBytes{},
})

w.WriteMessages(context.Background(),
err := w.WriteMessages(context.Background(),
kafka.Message{
Key: []byte("Key-A"),
Value: []byte("Hello World!"),
Expand All @@ -224,8 +246,13 @@ w.WriteMessages(context.Background(),
Value: []byte("Two!"),
},
)
if err != nil {
log.Fatal("failed to write messages:", err)
}

w.Close()
if err := w.Close(); err != nil {
log.Fatal("failed to close writer:", err)
}
```

**Note:** Even though kafka.Message contain ```Topic``` and ```Partition``` fields, they **MUST NOT** be
Expand Down Expand Up @@ -286,9 +313,9 @@ w := kafka.NewWriter(kafka.WriterConfig{
})
```

The `Reader` will by determine if the consumed messages are compressed by
examining the message attributes. However, the package(s) for all expected
codecs must be imported so that they get loaded correctly. For example, if you
The `Reader` will by determine if the consumed messages are compressed by
examining the message attributes. However, the package(s) for all expected
codecs must be imported so that they get loaded correctly. For example, if you
are going to be receiving messages compressed with Snappy, add the following
import:

Expand Down

0 comments on commit 418f1fd

Please sign in to comment.