Skip to content

Commit

Permalink
Add protocol versions of listgroups and describegroups
Browse files Browse the repository at this point in the history
  • Loading branch information
yolken-segment committed Nov 12, 2020
1 parent 9069f0e commit bc7cd50
Show file tree
Hide file tree
Showing 4 changed files with 368 additions and 1 deletion.
196 changes: 195 additions & 1 deletion describegroups.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,100 @@
package kafka

import "bufio"
import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"net"

"github.com/segmentio/kafka-go/protocol/describegroups"
)

type DescribeGroupRequest struct {
// Address of the kafka broker to send the request to.
Addr net.Addr
GroupID string
}

type DescribeGroupResponse struct {
Members []MemberInfo
}

// MemberInfo represents the membership information for a single group member.
type MemberInfo struct {
MemberID string
ClientID string
ClientHost string
MemberMetadata GroupMemberMetadata
MemberAssignments GroupMemberAssignmentsInfo
}

// GroupMemberMetadata stores metadata associated with a group member.
type GroupMemberMetadata struct {
Version int16
Topics []string
UserData []byte
}

// GroupMemberAssignmentsInfo stores the topic partition assignment data for a group member.
type GroupMemberAssignmentsInfo struct {
Version int16
Topics []GroupMemberTopic
UserData []byte
}

// GroupMemberTopic is a mapping from a topic to a list of partitions in the topic. It is used
// to represent the topic partitions that have been assigned to a group member.
type GroupMemberTopic struct {
Topic string
Partitions []int32
}

func (c *Client) DescribeGroup(
req DescribeGroupRequest,
ctx context.Context,
) (*DescribeGroupResponse, error) {
protocolResp, err := c.roundTrip(
ctx,
req.Addr,
&describegroups.Request{
Groups: []string{req.GroupID},
},
)
if err != nil {
return nil, err
}
apiResp, ok := protocolResp.(*describegroups.Response)
if !ok {
return nil, errors.New("Unexpected response type")
}

resp := &DescribeGroupResponse{}

for _, apiGroupInfo := range apiResp.Groups {
for _, member := range apiGroupInfo.Members {
decodedMetadata, err := decodeMemberMetadata(member.MemberMetadata)
if err != nil {
return nil, err
}
decodedAssignments, err := decodeMemberAssignments(member.MemberAssignment)
if err != nil {
return nil, err
}

resp.Members = append(resp.Members, MemberInfo{
MemberID: member.MemberID,
ClientID: member.ClientID,
ClientHost: member.ClientHost,
MemberAssignments: decodedAssignments,
MemberMetadata: decodedMetadata,
})
}
}

return resp, nil
}

// See http://kafka.apache.org/protocol.html#The_Messages_DescribeGroups
type describeGroupsRequestV0 struct {
Expand Down Expand Up @@ -174,3 +268,103 @@ func (t *describeGroupsResponseV0) readFrom(r *bufio.Reader, sz int) (remain int

return
}

// decodeMemberMetadata converts raw metadata bytes to a GroupMemberMetadata struct.
func decodeMemberMetadata(rawMetadata []byte) (GroupMemberMetadata, error) {
mm := GroupMemberMetadata{}

if len(rawMetadata) == 0 {
return mm, nil
}

buf := bytes.NewBuffer(rawMetadata)
bufReader := bufio.NewReader(buf)
remain := len(rawMetadata)

var err error

if remain, err = readInt16(bufReader, remain, &mm.Version); err != nil {
return mm, err
}
if remain, err = readStringArray(bufReader, remain, &mm.Topics); err != nil {
return mm, err
}
if remain, err = readBytes(bufReader, remain, &mm.UserData); err != nil {
return mm, err
}

if remain != 0 {
return mm, fmt.Errorf("Got non-zero number of bytes remaining: %d", remain)
}

return mm, nil
}

// decodeMemberAssignments converts raw assignment bytes to a GroupMemberAssignmentsInfo struct.
func decodeMemberAssignments(rawAssignments []byte) (GroupMemberAssignmentsInfo, error) {
ma := GroupMemberAssignmentsInfo{}

if len(rawAssignments) == 0 {
return ma, nil
}

buf := bytes.NewBuffer(rawAssignments)
bufReader := bufio.NewReader(buf)
remain := len(rawAssignments)

var err error

if remain, err = readInt16(bufReader, remain, &ma.Version); err != nil {
return ma, err
}

fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
item := GroupMemberTopic{
Partitions: []int32{},
}

if fnRemain, fnErr = readString(r, size, &item.Topic); fnErr != nil {
return
}

if fnRemain, fnErr = readInt32Array(r, fnRemain, &item.Partitions); fnErr != nil {
return
}

ma.Topics = append(ma.Topics, item)
return
}
if remain, err = readArrayWith(bufReader, remain, fn); err != nil {
return ma, err
}

if remain, err = readBytes(bufReader, remain, &ma.UserData); err != nil {
return ma, err
}

if remain != 0 {
return ma, fmt.Errorf("Got non-zero number of bytes remaining: %d", remain)
}

return ma, nil
}

// readInt32Array reads an array of int32s. It's adapted from the implementation of
// readStringArray.
func readInt32Array(r *bufio.Reader, sz int, v *[]int32) (remain int, err error) {
var content []int32
fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
var value int32
if fnRemain, fnErr = readInt32(r, size, &value); fnErr != nil {
return
}
content = append(content, value)
return
}
if remain, err = readArrayWith(r, sz, fn); err != nil {
return
}

*v = content
return
}
44 changes: 44 additions & 0 deletions listgroups.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,52 @@ package kafka

import (
"bufio"
"context"
"errors"
"net"

"github.com/segmentio/kafka-go/protocol/listgroups"
)

type ListGroupsRequest struct {
// Address of the kafka broker to send the request to.
Addr net.Addr
}

type ListGroupsResponse struct {
Groups []ConsumerGroupInfo
}

type ConsumerGroupInfo struct {
GroupID string
Coordinator int32
}

func (c *Client) ListGroups(
req ListGroupsRequest,
ctx context.Context,
) (*ListGroupsResponse, error) {
protocolResp, err := c.roundTrip(ctx, req.Addr, &listgroups.Request{})
if err != nil {
return nil, err
}
apiResp, ok := protocolResp.(*listgroups.Response)
if !ok {
return nil, errors.New("Unexpected response type")
}

resp := &ListGroupsResponse{}

for _, apiGroupInfo := range apiResp.Groups {
resp.Groups = append(resp.Groups, ConsumerGroupInfo{
GroupID: apiGroupInfo.GroupID,
Coordinator: apiGroupInfo.BrokerID,
})
}

return resp, nil
}

type listGroupsRequestV1 struct {
}

Expand Down
47 changes: 47 additions & 0 deletions protocol/describegroups/describegroups.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package describegroups

import (
"github.com/segmentio/kafka-go/protocol"
)

func init() {
protocol.Register(&Request{}, &Response{})
}

type Request struct {
Groups []string `kafka:"min=v0,max=v4"`
IncludeAuthorizedOperations bool `kafka:"min=v3,max=v4"`
}

func (r *Request) ApiKey() protocol.ApiKey { return protocol.DescribeGroups }

func (r *Request) Group() string {
// TODO: Handle other groups too (via splitter).
return r.Groups[0]
}

type Response struct {
ThrottleTimeMs int32 `kafka:"min=v1,max=v4"`
Groups []ResponseGroup `kafka:"min=v0,max=v4"`
}

type ResponseGroup struct {
ErrorCode int16 `kafka:"min=v0,max=v4"`
GroupID string `kafka:"min=v0,max=v4"`
GroupState string `kafka:"min=v0,max=v4"`
ProtocolType string `kafka:"min=v0,max=v4"`
ProtocolData string `kafka:"min=v0,max=v4"`
Members []ResponseGroupMember `kafka:"min=v0,max=v4"`
AuthorizedOperations int32 `kafka:"min=v3,max=v4"`
}

type ResponseGroupMember struct {
MemberID string `kafka:"min=v0,max=v4"`
GroupInstanceID string `kafka:"min=v4,max=v4,nullable"`
ClientID string `kafka:"min=v0,max=v4"`
ClientHost string `kafka:"min=v0,max=v4"`
MemberMetadata []byte `kafka:"min=v0,max=v4"`
MemberAssignment []byte `kafka:"min=v0,max=v4"`
}

func (r *Response) ApiKey() protocol.ApiKey { return protocol.DescribeGroups }
82 changes: 82 additions & 0 deletions protocol/listgroups/listgroups.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package listgroups

import (
"errors"

"github.com/segmentio/kafka-go/protocol"
)

func init() {
protocol.Register(&Request{}, &Response{})
}

type Request struct {
brokerID int32
}

func (r *Request) ApiKey() protocol.ApiKey { return protocol.ListGroups }

func (r *Request) Broker(cluster protocol.Cluster) (protocol.Broker, error) {
return cluster.Brokers[r.brokerID], nil
}

func (r *Request) Split(cluster protocol.Cluster) (
[]protocol.Message,
protocol.Merger,
error,
) {
messages := []protocol.Message{}

for _, broker := range cluster.Brokers {
messages = append(messages, &Request{broker.ID})
}

return nil, new(Response), nil
}

type Response struct {
ThrottleTimeMs int32 `kafka:"min=v1,max=v2"`
ErrorCode int16 `kafka:"min=v0,max=v2"`
Groups []ResponseGroup `kafka:"min=v0,max=v2"`
}

type ResponseGroup struct {
GroupID string `kafka:"min=v0,max=v2"`
ProtocolType string `kafka:"min=v0,max=v2"`

// Use this to store which broker returned the response
BrokerID int32 `kafka:"-"`
}

func (r *Response) ApiKey() protocol.ApiKey { return protocol.ListGroups }

func (r *Response) Merge(requests []protocol.Message, results []interface{}) (
protocol.Message,
error,
) {
response := &Response{}

for r, result := range results {
brokerResp, ok := result.(*Response)
if !ok {
return nil, errors.New("Unexpected response type")
}

respGroups := []ResponseGroup{}

for _, brokerResp := range brokerResp.Groups {
respGroups = append(
respGroups,
ResponseGroup{
GroupID: brokerResp.GroupID,
ProtocolType: brokerResp.ProtocolType,
BrokerID: requests[r].(*Request).brokerID,
},
)
}

response.Groups = append(response.Groups, brokerResp.Groups...)
}

return response, nil
}

0 comments on commit bc7cd50

Please sign in to comment.