From 6009fd3f78ad9fc1a078b6e6dc38bdacedc479a6 Mon Sep 17 00:00:00 2001 From: Steve van Loben Sels Date: Tue, 5 Feb 2019 21:06:12 -0800 Subject: [PATCH] Downgraded DescribeGroups to v0 for compatibility with 0.10.0 (#194) --- conn.go | 10 ++++---- conn_test.go | 2 +- describegroups.go | 52 +++++++++++++++++------------------------- describegroups_test.go | 11 ++++----- reader_test.go | 4 ++-- 5 files changed, 34 insertions(+), 45 deletions(-) diff --git a/conn.go b/conn.go index 89ae52a38..c4fc042a7 100644 --- a/conn.go +++ b/conn.go @@ -160,12 +160,12 @@ func (c *Conn) DeleteTopics(topics ...string) error { // describeGroups retrieves the specified groups // // See http://kafka.apache.org/protocol.html#The_Messages_DescribeGroups -func (c *Conn) describeGroups(request describeGroupsRequestV1) (describeGroupsResponseV1, error) { - var response describeGroupsResponseV1 +func (c *Conn) describeGroups(request describeGroupsRequestV0) (describeGroupsResponseV0, error) { + var response describeGroupsResponseV0 err := c.readOperation( func(deadline time.Time, id int32) error { - return c.writeRequest(describeGroupsRequest, v1, id, request) + return c.writeRequest(describeGroupsRequest, v0, id, request) }, func(deadline time.Time, size int) error { return expectZeroSize(func() (remain int, err error) { @@ -174,11 +174,11 @@ func (c *Conn) describeGroups(request describeGroupsRequestV1) (describeGroupsRe }, ) if err != nil { - return describeGroupsResponseV1{}, err + return describeGroupsResponseV0{}, err } for _, group := range response.Groups { if group.ErrorCode != 0 { - return describeGroupsResponseV1{}, Error(group.ErrorCode) + return describeGroupsResponseV0{}, Error(group.ErrorCode) } } diff --git a/conn_test.go b/conn_test.go index a4ec416bd..31bb41fb2 100644 --- a/conn_test.go +++ b/conn_test.go @@ -590,7 +590,7 @@ func testConnDescribeGroupRetrievesAllGroups(t *testing.T, conn *Conn) { _, _, stop1 := createGroup(t, conn, groupID) defer stop1() - out, err := conn.describeGroups(describeGroupsRequestV1{ + out, err := conn.describeGroups(describeGroupsRequestV0{ GroupIDs: []string{groupID}, }) if err != nil { diff --git a/describegroups.go b/describegroups.go index a7b7982ac..bdb1c744a 100644 --- a/describegroups.go +++ b/describegroups.go @@ -3,21 +3,21 @@ package kafka import "bufio" // See http://kafka.apache.org/protocol.html#The_Messages_DescribeGroups -type describeGroupsRequestV1 struct { +type describeGroupsRequestV0 struct { // List of groupIds to request metadata for (an empty groupId array // will return empty group metadata). GroupIDs []string } -func (t describeGroupsRequestV1) size() int32 { +func (t describeGroupsRequestV0) size() int32 { return sizeofStringArray(t.GroupIDs) } -func (t describeGroupsRequestV1) writeTo(w *bufio.Writer) { +func (t describeGroupsRequestV0) writeTo(w *bufio.Writer) { writeStringArray(w, t.GroupIDs) } -type describeGroupsResponseMemberV1 struct { +type describeGroupsResponseMemberV0 struct { // MemberID assigned by the group coordinator MemberID string @@ -39,7 +39,7 @@ type describeGroupsResponseMemberV1 struct { MemberAssignments []byte } -func (t describeGroupsResponseMemberV1) size() int32 { +func (t describeGroupsResponseMemberV0) size() int32 { return sizeofString(t.MemberID) + sizeofString(t.ClientID) + sizeofString(t.ClientHost) + @@ -47,7 +47,7 @@ func (t describeGroupsResponseMemberV1) size() int32 { sizeofBytes(t.MemberAssignments) } -func (t describeGroupsResponseMemberV1) writeTo(w *bufio.Writer) { +func (t describeGroupsResponseMemberV0) writeTo(w *bufio.Writer) { writeString(w, t.MemberID) writeString(w, t.ClientID) writeString(w, t.ClientHost) @@ -55,7 +55,7 @@ func (t describeGroupsResponseMemberV1) writeTo(w *bufio.Writer) { writeBytes(w, t.MemberAssignments) } -func (t *describeGroupsResponseMemberV1) readFrom(r *bufio.Reader, size int) (remain int, err error) { +func (t *describeGroupsResponseMemberV0) readFrom(r *bufio.Reader, size int) (remain int, err error) { if remain, err = readString(r, size, &t.MemberID); err != nil { return } @@ -74,7 +74,7 @@ func (t *describeGroupsResponseMemberV1) readFrom(r *bufio.Reader, size int) (re return } -type describeGroupsResponseGroupV1 struct { +type describeGroupsResponseGroupV0 struct { // ErrorCode holds response error code ErrorCode int16 @@ -93,10 +93,10 @@ type describeGroupsResponseGroupV1 struct { Protocol string // Members contains the current group members (only provided if the group is not Dead) - Members []describeGroupsResponseMemberV1 + Members []describeGroupsResponseMemberV0 } -func (t describeGroupsResponseGroupV1) size() int32 { +func (t describeGroupsResponseGroupV0) size() int32 { return sizeofInt16(t.ErrorCode) + sizeofString(t.GroupID) + sizeofString(t.State) + @@ -105,7 +105,7 @@ func (t describeGroupsResponseGroupV1) size() int32 { sizeofArray(len(t.Members), func(i int) int32 { return t.Members[i].size() }) } -func (t describeGroupsResponseGroupV1) writeTo(w *bufio.Writer) { +func (t describeGroupsResponseGroupV0) writeTo(w *bufio.Writer) { writeInt16(w, t.ErrorCode) writeString(w, t.GroupID) writeString(w, t.State) @@ -114,7 +114,7 @@ func (t describeGroupsResponseGroupV1) writeTo(w *bufio.Writer) { writeArray(w, len(t.Members), func(i int) { t.Members[i].writeTo(w) }) } -func (t *describeGroupsResponseGroupV1) readFrom(r *bufio.Reader, size int) (remain int, err error) { +func (t *describeGroupsResponseGroupV0) readFrom(r *bufio.Reader, size int) (remain int, err error) { if remain, err = readInt16(r, size, &t.ErrorCode); err != nil { return } @@ -132,7 +132,7 @@ func (t *describeGroupsResponseGroupV1) readFrom(r *bufio.Reader, size int) (rem } fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) { - item := describeGroupsResponseMemberV1{} + item := describeGroupsResponseMemberV0{} if fnRemain, fnErr = (&item).readFrom(r, size); err != nil { return } @@ -146,39 +146,29 @@ func (t *describeGroupsResponseGroupV1) readFrom(r *bufio.Reader, size int) (rem return } -type describeGroupsResponseV1 struct { - // Duration in milliseconds for which the request was throttled due - // to quota violation (Zero if the request did not violate any quota) - ThrottleTimeMS int32 - +type describeGroupsResponseV0 struct { // Groups holds selected group information - Groups []describeGroupsResponseGroupV1 + Groups []describeGroupsResponseGroupV0 } -func (t describeGroupsResponseV1) size() int32 { - return sizeofInt32(t.ThrottleTimeMS) + - sizeofArray(len(t.Groups), func(i int) int32 { return t.Groups[i].size() }) +func (t describeGroupsResponseV0) size() int32 { + return sizeofArray(len(t.Groups), func(i int) int32 { return t.Groups[i].size() }) } -func (t describeGroupsResponseV1) writeTo(w *bufio.Writer) { - writeInt32(w, t.ThrottleTimeMS) +func (t describeGroupsResponseV0) writeTo(w *bufio.Writer) { writeArray(w, len(t.Groups), func(i int) { t.Groups[i].writeTo(w) }) } -func (t *describeGroupsResponseV1) readFrom(r *bufio.Reader, size int) (remain int, err error) { - if remain, err = readInt32(r, size, &t.ThrottleTimeMS); err != nil { - return - } - +func (t *describeGroupsResponseV0) readFrom(r *bufio.Reader, sz int) (remain int, err error) { fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) { - item := describeGroupsResponseGroupV1{} + item := describeGroupsResponseGroupV0{} if fnRemain, fnErr = (&item).readFrom(r, size); fnErr != nil { return } t.Groups = append(t.Groups, item) return } - if remain, err = readArrayWith(r, remain, fn); err != nil { + if remain, err = readArrayWith(r, sz, fn); err != nil { return } diff --git a/describegroups_test.go b/describegroups_test.go index cbb86b078..679d69ab7 100644 --- a/describegroups_test.go +++ b/describegroups_test.go @@ -7,17 +7,16 @@ import ( "testing" ) -func TestDescribeGroupsResponseV1(t *testing.T) { - item := describeGroupsResponseV1{ - ThrottleTimeMS: 1, - Groups: []describeGroupsResponseGroupV1{ +func TestDescribeGroupsResponseV0(t *testing.T) { + item := describeGroupsResponseV0{ + Groups: []describeGroupsResponseGroupV0{ { ErrorCode: 2, GroupID: "a", State: "b", ProtocolType: "c", Protocol: "d", - Members: []describeGroupsResponseMemberV1{ + Members: []describeGroupsResponseMemberV0{ { MemberID: "e", ClientID: "f", @@ -35,7 +34,7 @@ func TestDescribeGroupsResponseV1(t *testing.T) { item.writeTo(w) w.Flush() - var found describeGroupsResponseV1 + var found describeGroupsResponseV0 remain, err := (&found).readFrom(bufio.NewReader(buf), buf.Len()) if err != nil { t.Error(err) diff --git a/reader_test.go b/reader_test.go index 5d372028a..5f517f83a 100644 --- a/reader_test.go +++ b/reader_test.go @@ -526,7 +526,7 @@ func TestCloseLeavesGroup(t *testing.T) { groupID := r.Config().GroupID // wait for generationID > 0 so we know our reader has joined the group - membershipTimer := time.After(1 * time.Second) + membershipTimer := time.After(5 * time.Second) for { done := false select { @@ -552,7 +552,7 @@ func TestCloseLeavesGroup(t *testing.T) { if err != nil { t.Fatalf("error dialing: %v", err) } - resp, err := conn.describeGroups(describeGroupsRequestV1{ + resp, err := conn.describeGroups(describeGroupsRequestV0{ GroupIDs: []string{groupID}, }) if err != nil {