Skip to content

Commit

Permalink
add consumer group apis to client (segmentio#943)
Browse files Browse the repository at this point in the history
* add consumer group apis to client

JoinGroup,SyncGroup,LeaveGroup
  • Loading branch information
rhansen2 authored Jul 4, 2022
1 parent da91759 commit f0d5443
Show file tree
Hide file tree
Showing 17 changed files with 1,441 additions and 5 deletions.
6 changes: 5 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,8 @@ linters:
disable:
# Temporarily disabling so it can be addressed in a dedicated PR.
- errcheck
- goerr113
- goerr113

linters-settings:
goconst:
ignore-tests: true
4 changes: 4 additions & 0 deletions error.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,8 @@ func (e Error) Title() string {
return "Unknown Leader Epoch"
case UnsupportedCompressionType:
return "Unsupported Compression Type"
case MemberIDRequired:
return "Member ID Required"
case EligibleLeadersNotAvailable:
return "Eligible Leader Not Available"
case ElectionNotNeeded:
Expand Down Expand Up @@ -534,6 +536,8 @@ func (e Error) Description() string {
return "the leader epoch in the request is newer than the epoch on the broker"
case UnsupportedCompressionType:
return "the requesting client does not support the compression type of given partition"
case MemberIDRequired:
return "the group member needs to have a valid member id before actually entering a consumer group"
case EligibleLeadersNotAvailable:
return "eligible topic partition leaders are not available"
case ElectionNotNeeded:
Expand Down
3 changes: 0 additions & 3 deletions findcoordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"bytes"
"context"
"errors"
"fmt"
"reflect"
"strings"
"testing"
Expand Down Expand Up @@ -53,7 +52,6 @@ func TestClientFindCoordinator(t *testing.T) {
Key: "TransactionalID-1",
KeyType: CoordinatorKeyTypeTransaction,
})

if err != nil {
t.Fatal(err)
}
Expand All @@ -65,7 +63,6 @@ func TestClientFindCoordinator(t *testing.T) {

// WaitForCoordinatorIndefinitely is a blocking call till a coordinator is found.
func waitForCoordinatorIndefinitely(ctx context.Context, c *Client, req *FindCoordinatorRequest) (*FindCoordinatorResponse, error) {
fmt.Println("Trying to find Coordinator.")
resp, err := c.FindCoordinator(ctx, req)

for shouldRetryfindingCoordinator(resp, err) && ctx.Err() == nil {
Expand Down
184 changes: 184 additions & 0 deletions joingroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,192 @@ package kafka
import (
"bufio"
"bytes"
"context"
"fmt"
"net"
"time"

"github.com/segmentio/kafka-go/protocol"
"github.com/segmentio/kafka-go/protocol/consumer"
"github.com/segmentio/kafka-go/protocol/joingroup"
)

// JoinGroupRequest is the request structure for the JoinGroup function.
type JoinGroupRequest struct {
// Address of the kafka broker to send the request to.
Addr net.Addr

// GroupID of the group to join.
GroupID string

// The duration after which the coordinator considers the consumer dead
// if it has not received a heartbeat.
SessionTimeout time.Duration

// The duration the coordination will wait for each member to rejoin when rebalancing the group.
RebalanceTimeout time.Duration

// The ID assigned by the group coordinator.
MemberID string

// The unique identifier for the consumer instance.
GroupInstanceID string

// The name for the class of protocols implemented by the group being joined.
ProtocolType string

// The list of protocols the member supports.
Protocols []GroupProtocol
}

// GroupProtocol represents a consumer group protocol.
type GroupProtocol struct {
// The protocol name.
Name string

// The protocol metadata.
Metadata GroupProtocolSubscription
}

type GroupProtocolSubscription struct {
// The Topics to subscribe to.
Topics []string

// UserData assosiated with the subscription for the given protocol
UserData []byte

// Partitions owned by this consumer.
OwnedPartitions map[string][]int
}

// JoinGroupResponse is the response structure for the JoinGroup function.
type JoinGroupResponse struct {
// An error that may have occurred when attempting to join the group.
//
// The errors contain the kafka error code. Programs may use the standard
// errors.Is function to test the error against kafka error codes.
Error error

// The amount of time that the broker throttled the request.
Throttle time.Duration

// The generation ID of the group.
GenerationID int

// The group protocol selected by the coordinatior.
ProtocolName string

// The group protocol name.
ProtocolType string

// The leader of the group.
LeaderID string

// The group member ID.
MemberID string

// The members of the group.
Members []JoinGroupResponseMember
}

// JoinGroupResponseMember represents a group memmber in a reponse to a JoinGroup request.
type JoinGroupResponseMember struct {
// The group memmber ID.
ID string

// The unique identifier of the consumer instance.
GroupInstanceID string

// The group member metadata.
Metadata GroupProtocolSubscription
}

// JoinGroup sends a join group request to the coordinator and returns the response.
func (c *Client) JoinGroup(ctx context.Context, req *JoinGroupRequest) (*JoinGroupResponse, error) {
joinGroup := joingroup.Request{
GroupID: req.GroupID,
SessionTimeoutMS: int32(req.SessionTimeout.Milliseconds()),
RebalanceTimeoutMS: int32(req.RebalanceTimeout.Milliseconds()),
MemberID: req.MemberID,
GroupInstanceID: req.GroupInstanceID,
ProtocolType: req.ProtocolType,
Protocols: make([]joingroup.RequestProtocol, 0, len(req.Protocols)),
}

for _, proto := range req.Protocols {
protoMeta := consumer.Subscription{
Version: consumer.MaxVersionSupported,
Topics: proto.Metadata.Topics,
UserData: proto.Metadata.UserData,
OwnedPartitions: make([]consumer.TopicPartition, 0, len(proto.Metadata.OwnedPartitions)),
}
for topic, partitions := range proto.Metadata.OwnedPartitions {
tp := consumer.TopicPartition{
Topic: topic,
Partitions: make([]int32, 0, len(partitions)),
}
for _, partition := range partitions {
tp.Partitions = append(tp.Partitions, int32(partition))
}
protoMeta.OwnedPartitions = append(protoMeta.OwnedPartitions, tp)
}

metaBytes, err := protocol.Marshal(consumer.MaxVersionSupported, protoMeta)
if err != nil {
return nil, fmt.Errorf("kafka.(*Client).JoinGroup: %w", err)
}

joinGroup.Protocols = append(joinGroup.Protocols, joingroup.RequestProtocol{
Name: proto.Name,
Metadata: metaBytes,
})
}

m, err := c.roundTrip(ctx, req.Addr, &joinGroup)
if err != nil {
return nil, fmt.Errorf("kafka.(*Client).JoinGroup: %w", err)
}

r := m.(*joingroup.Response)

res := &JoinGroupResponse{
Error: makeError(r.ErrorCode, ""),
Throttle: makeDuration(r.ThrottleTimeMS),
GenerationID: int(r.GenerationID),
ProtocolName: r.ProtocolName,
ProtocolType: r.ProtocolType,
LeaderID: r.LeaderID,
MemberID: r.MemberID,
Members: make([]JoinGroupResponseMember, 0, len(r.Members)),
}

for _, member := range r.Members {
var meta consumer.Subscription
err = protocol.Unmarshal(member.Metadata, consumer.MaxVersionSupported, &meta)
if err != nil {
return nil, fmt.Errorf("kafka.(*Client).JoinGroup: %w", err)
}
subscription := GroupProtocolSubscription{
Topics: meta.Topics,
UserData: meta.UserData,
OwnedPartitions: make(map[string][]int, len(meta.OwnedPartitions)),
}
for _, owned := range meta.OwnedPartitions {
subscription.OwnedPartitions[owned.Topic] = make([]int, 0, len(owned.Partitions))
for _, partition := range owned.Partitions {
subscription.OwnedPartitions[owned.Topic] = append(subscription.OwnedPartitions[owned.Topic], int(partition))
}
}
res.Members = append(res.Members, JoinGroupResponseMember{
ID: member.MemberID,
GroupInstanceID: member.GroupInstanceID,
Metadata: subscription,
})
}

return res, nil
}

type groupMetadata struct {
Version int16
Topics []string
Expand Down
117 changes: 117 additions & 0 deletions joingroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,127 @@ package kafka
import (
"bufio"
"bytes"
"context"
"errors"
"reflect"
"testing"
"time"

ktesting "github.com/segmentio/kafka-go/testing"
)

func TestClientJoinGroup(t *testing.T) {
topic := makeTopic()
client, shutdown := newLocalClient()
defer shutdown()

err := clientCreateTopic(client, topic, 3)
if err != nil {
t.Fatal(err)
}

groupID := makeGroupID()

ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
respc, err := waitForCoordinatorIndefinitely(ctx, client, &FindCoordinatorRequest{
Addr: client.Addr,
Key: groupID,
KeyType: CoordinatorKeyTypeConsumer,
})
if err != nil {
t.Fatal(err)
}

if respc.Error != nil {
t.Fatal(err)
}

groupInstanceID := "group-instance-id"
if !ktesting.KafkaIsAtLeast("2.4.1") {
groupInstanceID = ""
}
const userData = "user-data"

req := &JoinGroupRequest{
GroupID: groupID,
GroupInstanceID: groupInstanceID,
ProtocolType: "consumer",
SessionTimeout: time.Minute,
RebalanceTimeout: time.Minute,
Protocols: []GroupProtocol{
{
Name: RoundRobinGroupBalancer{}.ProtocolName(),
Metadata: GroupProtocolSubscription{
Topics: []string{topic},
UserData: []byte(userData),
OwnedPartitions: map[string][]int{
topic: {0, 1, 2},
},
},
},
},
}

var resp *JoinGroupResponse

for {
resp, err = client.JoinGroup(ctx, req)
if err != nil {
t.Fatal(err)
}

if errors.Is(resp.Error, MemberIDRequired) {
req.MemberID = resp.MemberID
time.Sleep(time.Second)
continue
}

if resp.Error != nil {
t.Fatal(resp.Error)
}
break
}

if resp.GenerationID != 1 {
t.Fatalf("expected generation ID to be 1 but got %v", resp.GenerationID)
}

if resp.MemberID == "" {
t.Fatal("expected a member ID in response")
}

if resp.LeaderID != resp.MemberID {
t.Fatalf("expected to be group leader but got %v", resp.LeaderID)
}

if len(resp.Members) != 1 {
t.Fatalf("expected 1 member got %v", resp.Members)
}

member := resp.Members[0]

if member.ID != resp.MemberID {
t.Fatal("expected to be the only group memmber")
}

if member.GroupInstanceID != groupInstanceID {
t.Fatalf("expected the group instance ID to be %v, got %v", groupInstanceID, member.GroupInstanceID)
}

expectedMetadata := GroupProtocolSubscription{
Topics: []string{topic},
UserData: []byte(userData),
OwnedPartitions: map[string][]int{
topic: {0, 1, 2},
},
}

if !reflect.DeepEqual(member.Metadata, expectedMetadata) {
t.Fatalf("\nexpected assignment to be \n%v\nbut got\n%v", expectedMetadata, member.Metadata)
}
}

func TestSaramaCompatibility(t *testing.T) {
var (
// sample data from github.com/Shopify/sarama
Expand Down
Loading

0 comments on commit f0d5443

Please sign in to comment.