Skip to content

Commit

Permalink
Introduced top-level ConsumerGroup construct (segmentio#277)
Browse files Browse the repository at this point in the history
This is a lower-level API that unlocks use cases such as consuming
each assigned partition using a separate Reader or to overwrite
offsets for a group.

Refactored logic out of Reader and into ConsumerGroup so that
Reader uses the ConsumerGroup when GroupID is set.
  • Loading branch information
Steve van Loben Sels authored Aug 9, 2019
1 parent 18548e9 commit 11670d5
Show file tree
Hide file tree
Showing 9 changed files with 2,064 additions and 1,271 deletions.
5 changes: 2 additions & 3 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,10 +496,10 @@ func (c *Conn) offsetFetch(request offsetFetchRequestV1) (offsetFetchResponseV1,
return response, nil
}

// syncGroups completes the handshake to join a consumer group
// syncGroup completes the handshake to join a consumer group
//
// See http://kafka.apache.org/protocol.html#The_Messages_SyncGroup
func (c *Conn) syncGroups(request syncGroupRequestV0) (syncGroupResponseV0, error) {
func (c *Conn) syncGroup(request syncGroupRequestV0) (syncGroupResponseV0, error) {
var response syncGroupResponseV0

err := c.readOperation(
Expand Down Expand Up @@ -767,7 +767,6 @@ func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch {
id, err := c.doRequest(&c.rdeadline, func(deadline time.Time, id int32) error {
now := time.Now()
deadline = adjustDeadlineForRTT(deadline, now, defaultRTT)
adjustedDeadline = deadline
switch c.fetchVersion {
case v10:
return c.wb.writeFetchRequestV10(
Expand Down
9 changes: 5 additions & 4 deletions conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ func createGroup(t *testing.T, conn *Conn, groupID string) (generationID int32,
joinGroup := join()

// sync the group
_, err := conn.syncGroups(syncGroupRequestV0{
_, err := conn.syncGroup(syncGroupRequestV0{
GroupID: groupID,
GenerationID: joinGroup.GenerationID,
MemberID: joinGroup.MemberID,
Expand All @@ -609,7 +609,7 @@ func createGroup(t *testing.T, conn *Conn, groupID string) (generationID int32,
},
})
if err != nil {
t.Fatalf("bad syncGroups: %s", err)
t.Fatalf("bad syncGroup: %s", err)
}

generationID = joinGroup.GenerationID
Expand Down Expand Up @@ -710,7 +710,7 @@ func testConnHeartbeatErr(t *testing.T, conn *Conn) {
groupID := makeGroupID()
createGroup(t, conn, groupID)

_, err := conn.syncGroups(syncGroupRequestV0{
_, err := conn.syncGroup(syncGroupRequestV0{
GroupID: groupID,
})
if err != UnknownMemberId && err != NotCoordinatorForGroup {
Expand All @@ -734,7 +734,7 @@ func testConnSyncGroupErr(t *testing.T, conn *Conn) {
groupID := makeGroupID()
waitForCoordinator(t, conn, groupID)

_, err := conn.syncGroups(syncGroupRequestV0{
_, err := conn.syncGroup(syncGroupRequestV0{
GroupID: groupID,
})
if err != UnknownMemberId && err != NotCoordinatorForGroup {
Expand Down Expand Up @@ -844,6 +844,7 @@ func testConnFetchAndCommitOffsets(t *testing.T, conn *Conn) {
}

func testConnWriteReadConcurrently(t *testing.T, conn *Conn) {

const N = 1000
var msgs = make([]string, N)
var done = make(chan struct{})
Expand Down
Loading

0 comments on commit 11670d5

Please sign in to comment.