Skip to content

Commit

Permalink
Downgraded DescribeGroups to v0 for compatibility with 0.10.0 (segme…
Browse files Browse the repository at this point in the history
  • Loading branch information
stevevls authored Feb 6, 2019
1 parent abc7b5c commit 6009fd3
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 45 deletions.
10 changes: 5 additions & 5 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,12 +160,12 @@ func (c *Conn) DeleteTopics(topics ...string) error {
// describeGroups retrieves the specified groups
//
// See http://kafka.apache.org/protocol.html#The_Messages_DescribeGroups
func (c *Conn) describeGroups(request describeGroupsRequestV1) (describeGroupsResponseV1, error) {
var response describeGroupsResponseV1
func (c *Conn) describeGroups(request describeGroupsRequestV0) (describeGroupsResponseV0, error) {
var response describeGroupsResponseV0

err := c.readOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(describeGroupsRequest, v1, id, request)
return c.writeRequest(describeGroupsRequest, v0, id, request)
},
func(deadline time.Time, size int) error {
return expectZeroSize(func() (remain int, err error) {
Expand All @@ -174,11 +174,11 @@ func (c *Conn) describeGroups(request describeGroupsRequestV1) (describeGroupsRe
},
)
if err != nil {
return describeGroupsResponseV1{}, err
return describeGroupsResponseV0{}, err
}
for _, group := range response.Groups {
if group.ErrorCode != 0 {
return describeGroupsResponseV1{}, Error(group.ErrorCode)
return describeGroupsResponseV0{}, Error(group.ErrorCode)
}
}

Expand Down
2 changes: 1 addition & 1 deletion conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ func testConnDescribeGroupRetrievesAllGroups(t *testing.T, conn *Conn) {
_, _, stop1 := createGroup(t, conn, groupID)
defer stop1()

out, err := conn.describeGroups(describeGroupsRequestV1{
out, err := conn.describeGroups(describeGroupsRequestV0{
GroupIDs: []string{groupID},
})
if err != nil {
Expand Down
52 changes: 21 additions & 31 deletions describegroups.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,21 @@ package kafka
import "bufio"

// See http://kafka.apache.org/protocol.html#The_Messages_DescribeGroups
type describeGroupsRequestV1 struct {
type describeGroupsRequestV0 struct {
// List of groupIds to request metadata for (an empty groupId array
// will return empty group metadata).
GroupIDs []string
}

func (t describeGroupsRequestV1) size() int32 {
func (t describeGroupsRequestV0) size() int32 {
return sizeofStringArray(t.GroupIDs)
}

func (t describeGroupsRequestV1) writeTo(w *bufio.Writer) {
func (t describeGroupsRequestV0) writeTo(w *bufio.Writer) {
writeStringArray(w, t.GroupIDs)
}

type describeGroupsResponseMemberV1 struct {
type describeGroupsResponseMemberV0 struct {
// MemberID assigned by the group coordinator
MemberID string

Expand All @@ -39,23 +39,23 @@ type describeGroupsResponseMemberV1 struct {
MemberAssignments []byte
}

func (t describeGroupsResponseMemberV1) size() int32 {
func (t describeGroupsResponseMemberV0) size() int32 {
return sizeofString(t.MemberID) +
sizeofString(t.ClientID) +
sizeofString(t.ClientHost) +
sizeofBytes(t.MemberMetadata) +
sizeofBytes(t.MemberAssignments)
}

func (t describeGroupsResponseMemberV1) writeTo(w *bufio.Writer) {
func (t describeGroupsResponseMemberV0) writeTo(w *bufio.Writer) {
writeString(w, t.MemberID)
writeString(w, t.ClientID)
writeString(w, t.ClientHost)
writeBytes(w, t.MemberMetadata)
writeBytes(w, t.MemberAssignments)
}

func (t *describeGroupsResponseMemberV1) readFrom(r *bufio.Reader, size int) (remain int, err error) {
func (t *describeGroupsResponseMemberV0) readFrom(r *bufio.Reader, size int) (remain int, err error) {
if remain, err = readString(r, size, &t.MemberID); err != nil {
return
}
Expand All @@ -74,7 +74,7 @@ func (t *describeGroupsResponseMemberV1) readFrom(r *bufio.Reader, size int) (re
return
}

type describeGroupsResponseGroupV1 struct {
type describeGroupsResponseGroupV0 struct {
// ErrorCode holds response error code
ErrorCode int16

Expand All @@ -93,10 +93,10 @@ type describeGroupsResponseGroupV1 struct {
Protocol string

// Members contains the current group members (only provided if the group is not Dead)
Members []describeGroupsResponseMemberV1
Members []describeGroupsResponseMemberV0
}

func (t describeGroupsResponseGroupV1) size() int32 {
func (t describeGroupsResponseGroupV0) size() int32 {
return sizeofInt16(t.ErrorCode) +
sizeofString(t.GroupID) +
sizeofString(t.State) +
Expand All @@ -105,7 +105,7 @@ func (t describeGroupsResponseGroupV1) size() int32 {
sizeofArray(len(t.Members), func(i int) int32 { return t.Members[i].size() })
}

func (t describeGroupsResponseGroupV1) writeTo(w *bufio.Writer) {
func (t describeGroupsResponseGroupV0) writeTo(w *bufio.Writer) {
writeInt16(w, t.ErrorCode)
writeString(w, t.GroupID)
writeString(w, t.State)
Expand All @@ -114,7 +114,7 @@ func (t describeGroupsResponseGroupV1) writeTo(w *bufio.Writer) {
writeArray(w, len(t.Members), func(i int) { t.Members[i].writeTo(w) })
}

func (t *describeGroupsResponseGroupV1) readFrom(r *bufio.Reader, size int) (remain int, err error) {
func (t *describeGroupsResponseGroupV0) readFrom(r *bufio.Reader, size int) (remain int, err error) {
if remain, err = readInt16(r, size, &t.ErrorCode); err != nil {
return
}
Expand All @@ -132,7 +132,7 @@ func (t *describeGroupsResponseGroupV1) readFrom(r *bufio.Reader, size int) (rem
}

fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
item := describeGroupsResponseMemberV1{}
item := describeGroupsResponseMemberV0{}
if fnRemain, fnErr = (&item).readFrom(r, size); err != nil {
return
}
Expand All @@ -146,39 +146,29 @@ func (t *describeGroupsResponseGroupV1) readFrom(r *bufio.Reader, size int) (rem
return
}

type describeGroupsResponseV1 struct {
// Duration in milliseconds for which the request was throttled due
// to quota violation (Zero if the request did not violate any quota)
ThrottleTimeMS int32

type describeGroupsResponseV0 struct {
// Groups holds selected group information
Groups []describeGroupsResponseGroupV1
Groups []describeGroupsResponseGroupV0
}

func (t describeGroupsResponseV1) size() int32 {
return sizeofInt32(t.ThrottleTimeMS) +
sizeofArray(len(t.Groups), func(i int) int32 { return t.Groups[i].size() })
func (t describeGroupsResponseV0) size() int32 {
return sizeofArray(len(t.Groups), func(i int) int32 { return t.Groups[i].size() })
}

func (t describeGroupsResponseV1) writeTo(w *bufio.Writer) {
writeInt32(w, t.ThrottleTimeMS)
func (t describeGroupsResponseV0) writeTo(w *bufio.Writer) {
writeArray(w, len(t.Groups), func(i int) { t.Groups[i].writeTo(w) })
}

func (t *describeGroupsResponseV1) readFrom(r *bufio.Reader, size int) (remain int, err error) {
if remain, err = readInt32(r, size, &t.ThrottleTimeMS); err != nil {
return
}

func (t *describeGroupsResponseV0) readFrom(r *bufio.Reader, sz int) (remain int, err error) {
fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
item := describeGroupsResponseGroupV1{}
item := describeGroupsResponseGroupV0{}
if fnRemain, fnErr = (&item).readFrom(r, size); fnErr != nil {
return
}
t.Groups = append(t.Groups, item)
return
}
if remain, err = readArrayWith(r, remain, fn); err != nil {
if remain, err = readArrayWith(r, sz, fn); err != nil {
return
}

Expand Down
11 changes: 5 additions & 6 deletions describegroups_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,16 @@ import (
"testing"
)

func TestDescribeGroupsResponseV1(t *testing.T) {
item := describeGroupsResponseV1{
ThrottleTimeMS: 1,
Groups: []describeGroupsResponseGroupV1{
func TestDescribeGroupsResponseV0(t *testing.T) {
item := describeGroupsResponseV0{
Groups: []describeGroupsResponseGroupV0{
{
ErrorCode: 2,
GroupID: "a",
State: "b",
ProtocolType: "c",
Protocol: "d",
Members: []describeGroupsResponseMemberV1{
Members: []describeGroupsResponseMemberV0{
{
MemberID: "e",
ClientID: "f",
Expand All @@ -35,7 +34,7 @@ func TestDescribeGroupsResponseV1(t *testing.T) {
item.writeTo(w)
w.Flush()

var found describeGroupsResponseV1
var found describeGroupsResponseV0
remain, err := (&found).readFrom(bufio.NewReader(buf), buf.Len())
if err != nil {
t.Error(err)
Expand Down
4 changes: 2 additions & 2 deletions reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ func TestCloseLeavesGroup(t *testing.T) {
groupID := r.Config().GroupID

// wait for generationID > 0 so we know our reader has joined the group
membershipTimer := time.After(1 * time.Second)
membershipTimer := time.After(5 * time.Second)
for {
done := false
select {
Expand All @@ -552,7 +552,7 @@ func TestCloseLeavesGroup(t *testing.T) {
if err != nil {
t.Fatalf("error dialing: %v", err)
}
resp, err := conn.describeGroups(describeGroupsRequestV1{
resp, err := conn.describeGroups(describeGroupsRequestV0{
GroupIDs: []string{groupID},
})
if err != nil {
Expand Down

0 comments on commit 6009fd3

Please sign in to comment.