Skip to content

Commit

Permalink
Revert "Simplify member metadata"
Browse files Browse the repository at this point in the history
This reverts commit fed2c53.
  • Loading branch information
yolken-segment committed Dec 9, 2020
1 parent 1d9a9a4 commit a153940
Showing 1 changed file with 54 additions and 6 deletions.
60 changes: 54 additions & 6 deletions describegroups.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,24 @@ type DescribeGroupsResponseMember struct {
ClientHost string

// MemberMetadata contains metadata about this group member.
MemberMetadata []byte
MemberMetadata DescribeGroupsResponseMemberMetadata

// MemberAssignments contains the topic partitions that this member is assigned to.
MemberAssignments DescribeGroupsResponseAssignments
}

// GroupMemberMetadata stores metadata associated with a group member.
type DescribeGroupsResponseMemberMetadata struct {
// Version is the version of the metadata.
Version int

// Topics is the list of topics that the member is assigned to.
Topics []string

// UserData is the user data for the member.
UserData []byte
}

// GroupMemberAssignmentsInfo stores the topic partition assignment data for a group member.
type DescribeGroupsResponseAssignments struct {
// Version is the version of the assignments data.
Expand Down Expand Up @@ -104,6 +116,10 @@ func (c *Client) DescribeGroups(
}

for _, member := range apiGroup.Members {
decodedMetadata, err := decodeMemberMetadata(member.MemberMetadata)
if err != nil {
return nil, err
}
decodedAssignments, err := decodeMemberAssignments(member.MemberAssignment)
if err != nil {
return nil, err
Expand All @@ -114,7 +130,7 @@ func (c *Client) DescribeGroups(
ClientID: member.ClientID,
ClientHost: member.ClientHost,
MemberAssignments: decodedAssignments,
MemberMetadata: member.MemberMetadata,
MemberMetadata: decodedMetadata,
})
}
resp.Groups = append(resp.Groups, group)
Expand Down Expand Up @@ -298,6 +314,41 @@ func (t *describeGroupsResponseV0) readFrom(r *bufio.Reader, sz int) (remain int
return
}

// decodeMemberMetadata converts raw metadata bytes to a
// DescribeGroupsResponseMemberMetadata struct.
func decodeMemberMetadata(rawMetadata []byte) (DescribeGroupsResponseMemberMetadata, error) {
mm := DescribeGroupsResponseMemberMetadata{}

if len(rawMetadata) == 0 {
return mm, nil
}

buf := bytes.NewBuffer(rawMetadata)
bufReader := bufio.NewReader(buf)
remain := len(rawMetadata)

var err error
var version16 int16

if remain, err = readInt16(bufReader, remain, &version16); err != nil {
return mm, err
}
mm.Version = int(version16)

if remain, err = readStringArray(bufReader, remain, &mm.Topics); err != nil {
return mm, err
}
if remain, err = readBytes(bufReader, remain, &mm.UserData); err != nil {
return mm, err
}

if remain != 0 {
return mm, fmt.Errorf("Got non-zero number of bytes remaining: %d", remain)
}

return mm, nil
}

// decodeMemberAssignments converts raw assignment bytes to a DescribeGroupsResponseAssignments
// struct.
//
Expand Down Expand Up @@ -350,10 +401,7 @@ func decodeMemberAssignments(rawAssignments []byte) (DescribeGroupsResponseAssig
}

if remain != 0 {
return ma, fmt.Errorf(
"Got non-zero number of bytes remaining in member assignments: %d",
remain,
)
return ma, fmt.Errorf("Got non-zero number of bytes remaining: %d", remain)
}

return ma, nil
Expand Down

0 comments on commit a153940

Please sign in to comment.