Skip to content

Commit

Permalink
added consumer group support to Reader (segmentio#55)
Browse files Browse the repository at this point in the history
* added consumer group support to Reader

* simplified ready logic around NewReader

Previously, if consumer groups were enabled we waited until after rebalancing and the first subscriptions were complete before returning from NewReader.

Instead, we now advance Reader.version to 1 to ensure readers are only spawned from the rebalance func

* fixed typo

* exposing RebalanceTimeout to ReaderConfig

* fixed validation error with SessionTimeout

* renamed hbLoop to heartbeatLoop for clarity

* removed backoff on rebalance.  this is likely to cause more harm than help.  the broker naturally slows down the rate of rebalancing through join group and sync group.

* pulled loop logic for run out into separate func

* introduced runGroup to simplify management of rebalance goroutines

* renamed variable subs to assignments for consistency

* renamed newJoinGroupRequestV2 to makeJoinGroupRequestV2 to follow Go conventions when returning a value (and not a pointer)

* removing unnecessary Partition definition for consumer groups

* corrected docs

* making var declaration style consistent

* updated readme example as per recommendation

* pushing withErrorLogger up from runOnce to run

* embedding withErrorLogger messages into error

* continuing to convert withErrorLogger to errors

* moving unsubscribe call to the top of the runOnce loop

* converted withErrorLogger to withLogger for semantic correctness

* converting time.After to time.NewTimer as per request

* added comment to clarify reasoning for size == 0 check in (*groupAssignment).readFrom

* renamed funcs that returned values makeXXX rather than newXXX

* extracted logic, r.config.GroupID != "" into useConsumerGroup func as per suggestion

* replaced a few additional cases of r.config.GroupID == "" with !r.useConsumerGroup()

* added leaveGroup for more graceful interaction with broker

* cleaned up noisy error handling in heartbeat.  intentionally stopped handling InvalidMemberId in heartbeat

* updated docs and log messages

* renamed (*Reader).broker and (*Reader).connect to connect and coordinator respectively to clarify responsibilities

* leaveGroup now called on Close when using consumer groups

* ReadMessage now automatically commits offsets.  Introduced FetchMessage to handle explicit commits (in conjunction with CommitMessage)

* refactored shape of CommitMessage to CommitMessages to accept a context and multiple messages

* renamed ProtocolMetadata to GroupMetadata and updated associated docs for clarity

* created type offsetStash to managing temporary cache of offsets

* promoted lifecycle of offsetStash to the same as Reader (previously was one consumer group generation)

* commit will now retry up to defaultCommitRetries times before failing

* added test case to verify consumer group rebalancing across many partitions

* resolves edge case where rebalance happens during call to (*Reader).Close
  • Loading branch information
savaki authored and achille-roussel committed Jan 8, 2018
1 parent 17c1389 commit 949ee27
Show file tree
Hide file tree
Showing 14 changed files with 2,742 additions and 50 deletions.
72 changes: 72 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,78 @@ for {
r.Close()
```

### Consumer Groups

```kafka-go``` also supports Kafka consumer groups including broker managed offsets.
To enable consumer groups, simplify specify the GroupID in the ReaderConfig.

ReadMessage automatically commits offsets when using consumer groups.

```go
// make a new reader that consumes from topic-A
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
GroupID: "consumer-group-id",
Topic: "topic-A",
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
})

for {
m, err := r.ReadMessage(context.Background())
if err != nil {
break
}
fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
}

r.Close()
```

There are a number of limitations when using consumer groups:

* ```(*Reader).SetOffset``` will return an error when GroupID is set
* ```(*Reader).Offset``` will always return ```-1``` when GroupID is set
* ```(*Reader).Lag``` will always return ```-1``` when GroupID is set
* ```(*Reader).ReadLag``` will return an error when GroupID is set
* ```(*Reader).Stats``` will return a partition of ```-1``` when GroupID is set

### Explicit Commits

```kafka-go``` also supports explicit commits. Instead of calling ```ReadMessage```,
call ```FetchMessage``` followed by ```CommitMessages```.

```go
ctx := context.Background()
for {
m, err := r.FetchMessage(ctx)
if err != nil {
break
}
fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
m.CommitMessages(ctx, m)
}
```

### Managing Commits

By default, CommitMessages will synchronously commit offsets to Kafka. For
improved performance, you can instead periodically commit offsets to Kafka
by setting CommitInterval on the ReaderConfig.


```go
// make a new reader that consumes from topic-A
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
GroupID: "consumer-group-id",
Topic: "topic-A",
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
CommitInterval: time.Second, // flushes commits to Kafka every second
})
```

## Writer [![GoDoc](https://godoc.org/github.com/segmentio/kafka-go?status.svg)](https://godoc.org/github.com/segmentio/kafka-go#Writer)

To produce messages to Kafka, a program may use the low-level `Conn` API, but
Expand Down
7 changes: 6 additions & 1 deletion dialer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,8 @@ func TestDialerTLS(t *testing.T) {

type MockConn struct {
net.Conn
done chan struct{}
done chan struct{}
partitions []Partition
}

func (m *MockConn) Read(b []byte) (n int, err error) {
Expand Down Expand Up @@ -277,6 +278,10 @@ func (m *MockConn) Close() error {
return nil
}

func (m *MockConn) ReadPartitions(topics ...string) (partitions []Partition, err error) {
return m.partitions, err
}

func TestDialerConnectTLSHonorsContext(t *testing.T) {
config := tlsConfig(t)
d := &Dialer{
Expand Down
47 changes: 47 additions & 0 deletions joingroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,55 @@ package kafka

import (
"bufio"
"bytes"
)

type memberGroupMetadata struct {
// MemberID assigned by the group coordinator or null if joining for the
// first time.
MemberID string
Metadata groupMetadata
}

type groupMetadata struct {
Version int16
Topics []string
UserData []byte
}

func (t groupMetadata) size() int32 {
return sizeofInt16(t.Version) +
sizeofStringArray(t.Topics) +
sizeofBytes(t.UserData)
}

func (t groupMetadata) writeTo(w *bufio.Writer) {
writeInt16(w, t.Version)
writeStringArray(w, t.Topics)
writeBytes(w, t.UserData)
}

func (t groupMetadata) bytes() []byte {
buf := bytes.NewBuffer(nil)
w := bufio.NewWriter(buf)
t.writeTo(w)
w.Flush()
return buf.Bytes()
}

func (t *groupMetadata) readFrom(r *bufio.Reader, size int) (remain int, err error) {
if remain, err = readInt16(r, size, &t.Version); err != nil {
return
}
if remain, err = readStringArray(r, remain, &t.Topics); err != nil {
return
}
if remain, err = readBytes(r, remain, &t.UserData); err != nil {
return
}
return
}

type joinGroupRequestGroupProtocolV2 struct {
ProtocolName string
ProtocolMetadata []byte
Expand Down
94 changes: 94 additions & 0 deletions joingroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,100 @@ import (
"testing"
)

func TestSaramaCompatibility(t *testing.T) {
var (
// sample data from github.com/Shopify/sarama
//
// See consumer_group_members_test.go
//
groupMemberMetadata = []byte{
0, 1, // Version
0, 0, 0, 2, // Topic array length
0, 3, 'o', 'n', 'e', // Topic one
0, 3, 't', 'w', 'o', // Topic two
0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata
}
groupMemberAssignment = []byte{
0, 1, // Version
0, 0, 0, 1, // Topic array length
0, 3, 'o', 'n', 'e', // Topic one
0, 0, 0, 3, // Topic one, partition array length
0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 4, // 0, 2, 4
0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata
}
)

t.Run("verify metadata", func(t *testing.T) {
var item groupMetadata
remain, err := (&item).readFrom(bufio.NewReader(bytes.NewReader(groupMemberMetadata)), len(groupMemberMetadata))
if err != nil {
t.Fatalf("bad err: %v", err)
}
if remain != 0 {
t.Fatalf("expected 0; got %v", remain)
}

if v := item.Version; v != 1 {
t.Errorf("expected Version 1; got %v", v)
}
if v := item.Topics; !reflect.DeepEqual([]string{"one", "two"}, v) {
t.Errorf(`expected {"one", "two"}; got %v`, v)
}
if v := item.UserData; !reflect.DeepEqual([]byte{0x01, 0x02, 0x03}, v) {
t.Errorf("expected []byte{0x01, 0x02, 0x03}; got %v", v)
}
})

t.Run("verify assignments", func(t *testing.T) {
var item groupAssignment
remain, err := (&item).readFrom(bufio.NewReader(bytes.NewReader(groupMemberAssignment)), len(groupMemberAssignment))
if err != nil {
t.Fatalf("bad err: %v", err)
}
if remain != 0 {
t.Fatalf("expected 0; got %v", remain)
}

if v := item.Version; v != 1 {
t.Errorf("expected Version 1; got %v", v)
}
if v := item.Topics; !reflect.DeepEqual(map[string][]int32{"one": {0, 2, 4}}, v) {
t.Errorf(`expected map[string][]int32{"one": {0, 2, 4}}; got %v`, v)
}
if v := item.UserData; !reflect.DeepEqual([]byte{0x01, 0x02, 0x03}, v) {
t.Errorf("expected []byte{0x01, 0x02, 0x03}; got %v", v)
}
})
}

func TestMemberMetadata(t *testing.T) {
item := groupMetadata{
Version: 1,
Topics: []string{"a", "b"},
UserData: []byte(`blah`),
}

buf := bytes.NewBuffer(nil)
w := bufio.NewWriter(buf)
item.writeTo(w)
w.Flush()

var found groupMetadata
remain, err := (&found).readFrom(bufio.NewReader(buf), buf.Len())
if err != nil {
t.Error(err)
t.FailNow()
}
if remain != 0 {
t.Errorf("expected 0 remain, got %v", remain)
t.FailNow()
}
if !reflect.DeepEqual(item, found) {
t.Error("expected item and found to be the same")
t.FailNow()
}
}

func TestJoinGroupResponseV1(t *testing.T) {
item := joinGroupResponseV2{
ThrottleTimeMS: 1,
Expand Down
4 changes: 2 additions & 2 deletions read.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ func readStringArray(r *bufio.Reader, sz int, v *[]string) (remain int, err erro
}

func readMapStringInt32(r *bufio.Reader, sz int, v *map[string][]int32) (remain int, err error) {
var len int16
if remain, err = readInt16(r, sz, &len); err != nil {
var len int32
if remain, err = readInt32(r, sz, &len); err != nil {
return
}

Expand Down
2 changes: 1 addition & 1 deletion read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestReadMapStringInt32(t *testing.T) {
buf := bytes.NewBuffer(nil)

w := bufio.NewWriter(buf)
writeInt16(w, int16(len(test.Data)))
writeInt32(w, int32(len(test.Data)))
for key, values := range test.Data {
writeString(w, key)
writeInt32Array(w, values)
Expand Down
Loading

0 comments on commit 949ee27

Please sign in to comment.