Skip to content

Commit

Permalink
Merge pull request segmentio#752 from segmentio/yolken-fix-group-prot…
Browse files Browse the repository at this point in the history
…ocol

Fix member metadata decoding and remove deprecated describeGroups API
  • Loading branch information
yolken-segment authored Oct 1, 2021
2 parents 1ca9e66 + 4d75f82 commit c03923d
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 268 deletions.
28 changes: 0 additions & 28 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,34 +315,6 @@ func (c *Conn) DeleteTopics(topics ...string) error {
return err
}

// describeGroups retrieves the specified groups
//
// See http://kafka.apache.org/protocol.html#The_Messages_DescribeGroups
func (c *Conn) describeGroups(request describeGroupsRequestV0) (describeGroupsResponseV0, error) {
var response describeGroupsResponseV0

err := c.readOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(describeGroups, v0, id, request)
},
func(deadline time.Time, size int) error {
return expectZeroSize(func() (remain int, err error) {
return (&response).readFrom(&c.rbuf, size)
}())
},
)
if err != nil {
return describeGroupsResponseV0{}, err
}
for _, group := range response.Groups {
if group.ErrorCode != 0 {
return describeGroupsResponseV0{}, Error(group.ErrorCode)
}
}

return response, nil
}

// findCoordinator finds the coordinator for the specified group or transaction
//
// See http://kafka.apache.org/protocol.html#The_Messages_FindCoordinator
Expand Down
26 changes: 0 additions & 26 deletions conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,12 +201,6 @@ func TestConn(t *testing.T) {
function: testConnReadBatchWithMaxWait,
},

{
scenario: "describe groups retrieves all groups when no groupID specified",
function: testConnDescribeGroupRetrievesAllGroups,
minVersion: "0.11.0",
},

{
scenario: "find the group coordinator",
function: testConnFindCoordinator,
Expand Down Expand Up @@ -735,26 +729,6 @@ func createGroup(t *testing.T, conn *Conn, groupID string) (generationID int32,
return
}

func testConnDescribeGroupRetrievesAllGroups(t *testing.T, conn *Conn) {
groupID := makeGroupID()
_, _, stop1 := createGroup(t, conn, groupID)
defer stop1()

out, err := conn.describeGroups(describeGroupsRequestV0{
GroupIDs: []string{groupID},
})
if err != nil {
t.Fatalf("bad describeGroups: %s", err)
}

if v := len(out.Groups); v != 1 {
t.Fatalf("expected 1 group, got %v", v)
}
if id := out.Groups[0].GroupID; id != groupID {
t.Errorf("bad group: got %v, expected %v", id, groupID)
}
}

func testConnFindCoordinator(t *testing.T, conn *Conn) {
groupID := makeGroupID()

Expand Down
212 changes: 47 additions & 165 deletions describegroups.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,18 @@ type DescribeGroupsResponseMemberMetadata struct {

// UserData is the user data for the member.
UserData []byte

// OwnedPartitions contains the partitions owned by this group member; only set if
// consumers are using a cooperative rebalancing assignor protocol.
OwnedPartitions []DescribeGroupsResponseMemberMetadataOwnedPartition
}

type DescribeGroupsResponseMemberMetadataOwnedPartition struct {
// Topic is the name of the topic.
Topic string

// Partitions is the partitions that are owned by the group in the topic.
Partitions []int
}

// GroupMemberAssignmentsInfo stores the topic partition assignment data for a group member.
Expand All @@ -93,6 +105,9 @@ type GroupMemberTopic struct {
Partitions []int
}

// DescribeGroups calls the Kafka DescribeGroups API to get information about one or more
// consumer groups. See https://kafka.apache.org/protocol#The_Messages_DescribeGroups for
// more information.
func (c *Client) DescribeGroups(
ctx context.Context,
req *DescribeGroupsRequest,
Expand Down Expand Up @@ -141,176 +156,18 @@ func (c *Client) DescribeGroups(
return resp, nil
}

// See http://kafka.apache.org/protocol.html#The_Messages_DescribeGroups
//
// TODO: Remove everything below and use protocol-based version above everywhere.
type describeGroupsRequestV0 struct {
// List of groupIds to request metadata for (an empty groupId array
// will return empty group metadata).
GroupIDs []string
}

func (t describeGroupsRequestV0) size() int32 {
return sizeofStringArray(t.GroupIDs)
}

func (t describeGroupsRequestV0) writeTo(wb *writeBuffer) {
wb.writeStringArray(t.GroupIDs)
}

type describeGroupsResponseMemberV0 struct {
// MemberID assigned by the group coordinator
MemberID string

// ClientID used in the member's latest join group request
ClientID string

// ClientHost used in the request session corresponding to the member's
// join group.
ClientHost string

// MemberMetadata the metadata corresponding to the current group protocol
// in use (will only be present if the group is stable).
MemberMetadata []byte

// MemberAssignments provided by the group leader (will only be present if
// the group is stable).
//
// See consumer groups section of https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
MemberAssignments []byte
}

func (t describeGroupsResponseMemberV0) size() int32 {
return sizeofString(t.MemberID) +
sizeofString(t.ClientID) +
sizeofString(t.ClientHost) +
sizeofBytes(t.MemberMetadata) +
sizeofBytes(t.MemberAssignments)
}

func (t describeGroupsResponseMemberV0) writeTo(wb *writeBuffer) {
wb.writeString(t.MemberID)
wb.writeString(t.ClientID)
wb.writeString(t.ClientHost)
wb.writeBytes(t.MemberMetadata)
wb.writeBytes(t.MemberAssignments)
}

func (t *describeGroupsResponseMemberV0) readFrom(r *bufio.Reader, size int) (remain int, err error) {
if remain, err = readString(r, size, &t.MemberID); err != nil {
return
}
if remain, err = readString(r, remain, &t.ClientID); err != nil {
return
}
if remain, err = readString(r, remain, &t.ClientHost); err != nil {
return
}
if remain, err = readBytes(r, remain, &t.MemberMetadata); err != nil {
return
}
if remain, err = readBytes(r, remain, &t.MemberAssignments); err != nil {
// readFrom decodes an owned partition item from the member metadata.
func (t *DescribeGroupsResponseMemberMetadataOwnedPartition) readFrom(r *bufio.Reader, size int) (remain int, err error) {
if remain, err = readString(r, size, &t.Topic); err != nil {
return
}
return
}

type describeGroupsResponseGroupV0 struct {
// ErrorCode holds response error code
ErrorCode int16

// GroupID holds the unique group identifier
GroupID string

// State holds current state of the group (one of: Dead, Stable, AwaitingSync,
// PreparingRebalance, or empty if there is no active group)
State string

// ProtocolType holds the current group protocol type (will be empty if there is
// no active group)
ProtocolType string

// Protocol holds the current group protocol (only provided if the group is Stable)
Protocol string

// Members contains the current group members (only provided if the group is not Dead)
Members []describeGroupsResponseMemberV0
}

func (t describeGroupsResponseGroupV0) size() int32 {
return sizeofInt16(t.ErrorCode) +
sizeofString(t.GroupID) +
sizeofString(t.State) +
sizeofString(t.ProtocolType) +
sizeofString(t.Protocol) +
sizeofArray(len(t.Members), func(i int) int32 { return t.Members[i].size() })
}
partitions := []int32{}

func (t describeGroupsResponseGroupV0) writeTo(wb *writeBuffer) {
wb.writeInt16(t.ErrorCode)
wb.writeString(t.GroupID)
wb.writeString(t.State)
wb.writeString(t.ProtocolType)
wb.writeString(t.Protocol)
wb.writeArray(len(t.Members), func(i int) { t.Members[i].writeTo(wb) })
}

func (t *describeGroupsResponseGroupV0) readFrom(r *bufio.Reader, size int) (remain int, err error) {
if remain, err = readInt16(r, size, &t.ErrorCode); err != nil {
return
}
if remain, err = readString(r, remain, &t.GroupID); err != nil {
return
}
if remain, err = readString(r, remain, &t.State); err != nil {
return
}
if remain, err = readString(r, remain, &t.ProtocolType); err != nil {
return
}
if remain, err = readString(r, remain, &t.Protocol); err != nil {
return
}

fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
item := describeGroupsResponseMemberV0{}
if fnRemain, fnErr = (&item).readFrom(r, size); err != nil {
return
}
t.Members = append(t.Members, item)
if remain, err = readInt32Array(r, remain, &partitions); err != nil {
return
}
if remain, err = readArrayWith(r, remain, fn); err != nil {
return
}

return
}

type describeGroupsResponseV0 struct {
// Groups holds selected group information
Groups []describeGroupsResponseGroupV0
}

func (t describeGroupsResponseV0) size() int32 {
return sizeofArray(len(t.Groups), func(i int) int32 { return t.Groups[i].size() })
}

func (t describeGroupsResponseV0) writeTo(wb *writeBuffer) {
wb.writeArray(len(t.Groups), func(i int) { t.Groups[i].writeTo(wb) })
}

func (t *describeGroupsResponseV0) readFrom(r *bufio.Reader, sz int) (remain int, err error) {
fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
item := describeGroupsResponseGroupV0{}
if fnRemain, fnErr = (&item).readFrom(r, size); fnErr != nil {
return
}
t.Groups = append(t.Groups, item)
return
}
if remain, err = readArrayWith(r, sz, fn); err != nil {
return
for _, partition := range partitions {
t.Partitions = append(t.Partitions, int(partition))
}

return
Expand Down Expand Up @@ -347,6 +204,31 @@ func decodeMemberMetadata(rawMetadata []byte) (DescribeGroupsResponseMemberMetad
return mm, err
}

if mm.Version == 1 && remain > 0 {
fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
op := DescribeGroupsResponseMemberMetadataOwnedPartition{}
if fnRemain, fnErr = readString(r, size, &op.Topic); fnErr != nil {
return
}

ps := []int32{}
if fnRemain, fnErr = readInt32Array(r, fnRemain, &ps); fnErr != nil {
return
}

for _, p := range ps {
op.Partitions = append(op.Partitions, int(p))
}

mm.OwnedPartitions = append(mm.OwnedPartitions, op)
return
}

if remain, err = readArrayWith(bufReader, remain, fn); err != nil {
return mm, err
}
}

if remain != 0 {
return mm, fmt.Errorf("Got non-zero number of bytes remaining: %d", remain)
}
Expand Down
44 changes: 0 additions & 44 deletions describegroups_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package kafka

import (
"bufio"
"bytes"
"context"
"fmt"
"os"
Expand All @@ -12,48 +10,6 @@ import (
"time"
)

func TestDescribeGroupsResponseV0(t *testing.T) {
item := describeGroupsResponseV0{
Groups: []describeGroupsResponseGroupV0{
{
ErrorCode: 2,
GroupID: "a",
State: "b",
ProtocolType: "c",
Protocol: "d",
Members: []describeGroupsResponseMemberV0{
{
MemberID: "e",
ClientID: "f",
ClientHost: "g",
MemberMetadata: []byte("h"),
MemberAssignments: []byte("i"),
},
},
},
},
}

b := bytes.NewBuffer(nil)
w := &writeBuffer{w: b}
item.writeTo(w)

var found describeGroupsResponseV0
remain, err := (&found).readFrom(bufio.NewReader(b), b.Len())
if err != nil {
t.Error(err)
t.FailNow()
}
if remain != 0 {
t.Errorf("expected 0 remain, got %v", remain)
t.FailNow()
}
if !reflect.DeepEqual(item, found) {
t.Error("expected item and found to be the same")
t.FailNow()
}
}

func TestClientDescribeGroups(t *testing.T) {
if os.Getenv("KAFKA_VERSION") == "2.3.1" {
// There's a bug in 2.3.1 that causes the MemberMetadata to be in the wrong format and thus
Expand Down
Loading

0 comments on commit c03923d

Please sign in to comment.