Skip to content

Commit

Permalink
Add TxnOffsetCommit to Client (segmentio#741)
Browse files Browse the repository at this point in the history
  • Loading branch information
rhansen2 authored Sep 20, 2021
1 parent e6f0ae9 commit bedcf4a
Show file tree
Hide file tree
Showing 4 changed files with 632 additions and 0 deletions.
73 changes: 73 additions & 0 deletions protocol/txnoffsetcommit/txnoffsetcommit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package txnoffsetcommit

import "github.com/segmentio/kafka-go/protocol"

func init() {
protocol.Register(&Request{}, &Response{})
}

type Request struct {
// We need at least one tagged field to indicate that this is a "flexible" message
// type.
_ struct{} `kafka:"min=v3,max=v3,tag"`

TransactionalID string `kafka:"min=v0,max=v2|min=v3,max=v3,compact"`
GroupID string `kafka:"min=v0,max=v2|min=v3,max=v3,compact"`
ProducerID int64 `kafka:"min=v0,max=v3"`
ProducerEpoch int16 `kafka:"min=v0,max=v3"`
GenerationID int32 `kafka:"min=v3,max=v3"`
MemberID string `kafka:"min=v3,max=v3,compact"`
GroupInstanceID string `kafka:"min=v3,max=v3,compact,nullable"`
Topics []RequestTopic `kafka:"min=v0,max=v3"`
}

type RequestTopic struct {
// We need at least one tagged field to indicate that this is a "flexible" message
// type.
_ struct{} `kafka:"min=v3,max=v3,tag"`

Name string `kafka:"min=v0,max=v2|min=v3,max=v3,compact"`
Partitions []RequestPartition `kafka:"min=v0,max=v3"`
}

type RequestPartition struct {
// We need at least one tagged field to indicate that this is a "flexible" message
// type.
_ struct{} `kafka:"min=v3,max=v3,tag"`

Partition int32 `kafka:"min=v0,max=v3"`
CommittedOffset int64 `kafka:"min=v0,max=v3"`
CommittedLeaderEpoch int32 `kafka:"min=v2,max=v3"`
CommittedMetadata string `kafka:"min=v0,max=v2|min=v3,max=v3,nullable,compact"`
}

func (r *Request) ApiKey() protocol.ApiKey { return protocol.TxnOffsetCommit }

type Response struct {
// We need at least one tagged field to indicate that this is a "flexible" message
// type.
_ struct{} `kafka:"min=v3,max=v3,tag"`

ThrottleTimeMs int32 `kafka:"min=v0,max=v3"`
Topics []ResponseTopic `kafka:"min=v0,max=v3"`
}

type ResponseTopic struct {
// We need at least one tagged field to indicate that this is a "flexible" message
// type.
_ struct{} `kafka:"min=v3,max=v3,tag"`

Name string `kafka:"min=v0,max=v2|min=v3,max=v3,compact"`
Partitions []ResponsePartition `kafka:"min=v0,max=v3"`
}

type ResponsePartition struct {
// We need at least one tagged field to indicate that this is a "flexible" message
// type.
_ struct{} `kafka:"min=v3,max=v3,tag"`

Partition int32 `kafka:"min=v0,max=v3"`
ErrorCode int16 `kafka:"min=v0,max=v3"`
}

func (r *Response) ApiKey() protocol.ApiKey { return protocol.TxnOffsetCommit }
186 changes: 186 additions & 0 deletions protocol/txnoffsetcommit/txnoffsetcommit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
package txnoffsetcommit_test

import (
"testing"

"github.com/segmentio/kafka-go/protocol/prototest"
"github.com/segmentio/kafka-go/protocol/txnoffsetcommit"
)

func TestTxnOffsetCommitRequest(t *testing.T) {
for _, version := range []int16{0, 1} {
prototest.TestRequest(t, version, &txnoffsetcommit.Request{
TransactionalID: "transactional-id-0",
GroupID: "group-0",
ProducerID: 10,
ProducerEpoch: 100,
Topics: []txnoffsetcommit.RequestTopic{
{
Name: "topic-0",
Partitions: []txnoffsetcommit.RequestPartition{
{
Partition: 0,
CommittedOffset: 10,
CommittedMetadata: "meta-0-0",
},
{
Partition: 1,
CommittedOffset: 10,
CommittedMetadata: "meta-0-1",
},
},
},
{
Name: "topic-1",
Partitions: []txnoffsetcommit.RequestPartition{
{
Partition: 0,
CommittedOffset: 10,
CommittedMetadata: "meta-1-0",
},
{
Partition: 1,
CommittedOffset: 10,
CommittedMetadata: "meta-1-1",
},
},
},
},
})
}

// Version 2 added:
// Topics.RequestTopic.Partitions.CommittedLeaderEpoch
for _, version := range []int16{2} {
prototest.TestRequest(t, version, &txnoffsetcommit.Request{
TransactionalID: "transactional-id-0",
GroupID: "group-0",
ProducerID: 10,
ProducerEpoch: 100,
Topics: []txnoffsetcommit.RequestTopic{
{
Name: "topic-0",
Partitions: []txnoffsetcommit.RequestPartition{
{
Partition: 0,
CommittedOffset: 10,
CommittedLeaderEpoch: 100,
CommittedMetadata: "meta-0-0",
},
{
Partition: 1,
CommittedOffset: 10,
CommittedLeaderEpoch: 100,
CommittedMetadata: "meta-0-1",
},
},
},
{
Name: "topic-1",
Partitions: []txnoffsetcommit.RequestPartition{
{
Partition: 0,
CommittedOffset: 10,
CommittedLeaderEpoch: 100,
CommittedMetadata: "meta-1-0",
},
{
Partition: 1,
CommittedOffset: 10,
CommittedLeaderEpoch: 100,
CommittedMetadata: "meta-1-1",
},
},
},
},
})
}

// Version 3 added:
// GenerationID
// MemberID
// GroupInstanceID
for _, version := range []int16{3} {
prototest.TestRequest(t, version, &txnoffsetcommit.Request{
TransactionalID: "transactional-id-0",
GroupID: "group-0",
ProducerID: 10,
ProducerEpoch: 100,
GenerationID: 2,
MemberID: "member-0",
GroupInstanceID: "group-instance-id-0",
Topics: []txnoffsetcommit.RequestTopic{
{
Name: "topic-0",
Partitions: []txnoffsetcommit.RequestPartition{
{
Partition: 0,
CommittedOffset: 10,
CommittedLeaderEpoch: 100,
CommittedMetadata: "meta-0-0",
},
{
Partition: 1,
CommittedOffset: 10,
CommittedLeaderEpoch: 100,
CommittedMetadata: "meta-0-1",
},
},
},
{
Name: "topic-1",
Partitions: []txnoffsetcommit.RequestPartition{
{
Partition: 0,
CommittedOffset: 10,
CommittedLeaderEpoch: 100,
CommittedMetadata: "meta-1-0",
},
{
Partition: 1,
CommittedOffset: 10,
CommittedLeaderEpoch: 100,
CommittedMetadata: "meta-1-1",
},
},
},
},
})
}
}

func TestTxnOffsetCommitResponse(t *testing.T) {
for _, version := range []int16{0, 1, 2, 3} {
prototest.TestResponse(t, version, &txnoffsetcommit.Response{
ThrottleTimeMs: 10,
Topics: []txnoffsetcommit.ResponseTopic{
{
Name: "topic-0",
Partitions: []txnoffsetcommit.ResponsePartition{
{
Partition: 0,
ErrorCode: 0,
},
{
Partition: 1,
ErrorCode: 10,
},
},
},
{
Name: "topic-1",
Partitions: []txnoffsetcommit.ResponsePartition{
{
Partition: 0,
ErrorCode: 0,
},
{
Partition: 1,
ErrorCode: 10,
},
},
},
},
})
}
}
Loading

0 comments on commit bedcf4a

Please sign in to comment.