Skip to content

Commit

Permalink
Clean up APIs and add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
yolken-segment committed Dec 8, 2020
1 parent d66b9dd commit 9c690e1
Show file tree
Hide file tree
Showing 13 changed files with 621 additions and 131 deletions.
37 changes: 30 additions & 7 deletions alterpartitionreassignments.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,36 +8,59 @@ import (
"github.com/segmentio/kafka-go/protocol/alterpartitionreassignments"
)

// AlterPartitionReassignmentsRequest is a request to the AlterPartitionReassignments API.
type AlterPartitionReassignmentsRequest struct {
// Address of the kafka broker to send the request to.
Addr net.Addr

Topic string
// Topic is the name of the topic to alter partitions in.
Topic string

// Assignments is the list of partition reassignments to submit to the API.
Assignments []AlterPartitionReassignmentsRequestAssignment
Timeout time.Duration

// Timeout is the amount of time to wait for the request to complete.
Timeout time.Duration
}

// AlterPartitionReassignmentsRequestAssignment contains the requested reassignments for a single
// partition.
type AlterPartitionReassignmentsRequestAssignment struct {
// PartitionID is the ID of the partition to make the reassignments in.
PartitionID int
BrokerIDs []int

// BrokerIDs is a slice of brokers to set the partition replicas to.
BrokerIDs []int
}

// AlterPartitionReassignmentsResponse is a response from the AlterPartitionReassignments API.
type AlterPartitionReassignmentsResponse struct {
ErrorCode int
// ErrorCode is set to a non-zero value if a top-level error was encountered.
ErrorCode int

// ErrorMessage describes the top-level error that occured.
ErrorMessage string

// PartitionResults contains the specific results for each partition.
PartitionResults []AlterPartitionReassignmentsResponsePartitionResult
}

// AlterPartitionReassignmentsResponsePartitionResult contains the detailed result of
// doing reassignments for a single partition.
type AlterPartitionReassignmentsResponsePartitionResult struct {
PartitionID int
ErrorCode int
// PartitionID is the ID of the partition that was altered.
PartitionID int

// ErrorCode is set to a non-zero value if an error was encountered during the update.
ErrorCode int

// ErrorMessage describes the partition-specific error that occurred.
ErrorMessage string
}

func (c *Client) AlterPartitionReassignments(
ctx context.Context,
req AlterPartitionReassignmentsRequest,
req *AlterPartitionReassignmentsRequest,
) (*AlterPartitionReassignmentsResponse, error) {
apiPartitions := []alterpartitionreassignments.RequestPartition{}

Expand Down
58 changes: 58 additions & 0 deletions alterpartitionreassignments_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package kafka

import (
"context"
"testing"

ktesting "github.com/segmentio/kafka-go/testing"
)

func TestClientAlterPartitionReassignments(t *testing.T) {
if !ktesting.KafkaIsAtLeast("2.4.0") {
return
}

ctx := context.Background()
client, shutdown := newLocalClient()
defer shutdown()

topic := makeTopic()
createTopic(t, topic, 2)
defer deleteTopic(t, topic)

// Local kafka only has 1 broker, so any partition reassignments are really no-ops.
resp, err := client.AlterPartitionReassignments(
ctx,
&AlterPartitionReassignmentsRequest{
Topic: topic,
Assignments: []AlterPartitionReassignmentsRequestAssignment{
{
PartitionID: 0,
BrokerIDs: []int{1},
},
{
PartitionID: 1,
BrokerIDs: []int{1},
},
},
},
)

if err != nil {
t.Fatal(err)
}
if resp.ErrorCode != 0 {
t.Error(
"Bad error code in response",
"expected", 0,
"got", resp.ErrorCode,
)
}
if len(resp.PartitionResults) != 2 {
t.Error(
"Unexpected length of partition results",
"expected", 2,
"got", len(resp.PartitionResults),
)
}
}
21 changes: 17 additions & 4 deletions apiversions.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,39 @@ import (
"github.com/segmentio/kafka-go/protocol/apiversions"
)

// ApiVersionsRequest is a request to the ApiVersions API.
type ApiVersionsRequest struct {
// Address of the kafka broker to send the request to.
Addr net.Addr
}

// ApiVersionsResponse is a response from the ApiVersions API.
type ApiVersionsResponse struct {
// ErrorCode is set to a non-zero value if an error was encountered.
ErrorCode int
ApiKeys []ApiVersionsResponseApiKey

// ApiKeys contains the specific details of each supported API.
ApiKeys []ApiVersionsResponseApiKey
}

// ApiVersionsResponseApiKey includes the details of which versions are supported for a single API.
type ApiVersionsResponseApiKey struct {
ApiKey int
ApiName string
// ApiKey is the ID of the API.
ApiKey int

// ApiName is a human-friendly description of the API.
ApiName string

// MinVersion is the minimum API version supported by the broker.
MinVersion int

// MaxVersion is the maximum API version supported by the broker.
MaxVersion int
}

func (c *Client) ApiVersions(
ctx context.Context,
req ApiVersionsRequest,
req *ApiVersionsRequest,
) (*ApiVersionsResponse, error) {
apiReq := &apiversions.Request{}
protoResp, err := c.roundTrip(
Expand Down
34 changes: 34 additions & 0 deletions apiversions_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package kafka

import (
"context"
"testing"
)

func TestClientApiVersions(t *testing.T) {
ctx := context.Background()

client, shutdown := newLocalClient()
defer shutdown()

resp, err := client.ApiVersions(ctx, &ApiVersionsRequest{})
if err != nil {
t.Fatal(err)
}

if resp.ErrorCode != 0 {
t.Error(
"Unexpected error code",
"expected", 0,
"got", resp.ErrorCode,
)
}

if len(resp.ApiKeys) == 0 {
t.Error(
"Unexpected apiKeys length",
"expected greater than", 0,
"got", 0,
)
}
}
63 changes: 48 additions & 15 deletions describegroups.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,55 +10,88 @@ import (
"github.com/segmentio/kafka-go/protocol/describegroups"
)

// DescribeGroupsRequest is a request to the DescribeGroups API.
type DescribeGroupsRequest struct {
// Address of the kafka broker to send the request to.
Addr net.Addr
// Addr is the address of the kafka broker to send the request to.
Addr net.Addr

// GroupIDs is a slice of groups to get details for.
GroupIDs []string
}

// DescribeGroupsResponse is a response from the DescribeGroups API.
type DescribeGroupsResponse struct {
// Groups is a slice of details for the requested groups.
Groups []DescribeGroupsResponseGroup
}

// DescribeGroupsResponseGroup contains the response details for a single group.
type DescribeGroupsResponseGroup struct {
GroupID string
// GroupID is the ID of the group.
GroupID string

// GroupState is a description of the group state.
GroupState string
Members []DescribeGroupsResponseMember

// Members contains details about each member of the group.
Members []DescribeGroupsResponseMember
}

// MemberInfo represents the membership information for a single group member.
type DescribeGroupsResponseMember struct {
MemberID string
ClientID string
ClientHost string
MemberMetadata DescribeGroupsResponseMemberMetadata
// MemberID is the ID of the group member.
MemberID string

// ClientID is the ID of the client that the group member is using.
ClientID string

// ClientHost is the host of the client that the group member is connecting from.
ClientHost string

// MemberMetadata contains metadata about this group member.
MemberMetadata DescribeGroupsResponseMemberMetadata

// MemberAssignments contains the topic partitions that this member is assigned to.
MemberAssignments DescribeGroupsResponseAssignments
}

// GroupMemberMetadata stores metadata associated with a group member.
type DescribeGroupsResponseMemberMetadata struct {
Version int
Topics []string
// Version is the version of the metadata.
Version int

// Topics is the list of topics that the member is assigned to.
Topics []string

// UserData is the user data for the member.
UserData []byte
}

// GroupMemberAssignmentsInfo stores the topic partition assignment data for a group member.
type DescribeGroupsResponseAssignments struct {
Version int
Topics []GroupMemberTopic
// Version is the version of the assignments data.
Version int

// Topics contains the details of the partition assignments for each topic.
Topics []GroupMemberTopic

// UserData is the user data for the member.
UserData []byte
}

// GroupMemberTopic is a mapping from a topic to a list of partitions in the topic. It is used
// to represent the topic partitions that have been assigned to a group member.
type GroupMemberTopic struct {
Topic string
// Topic is the name of the topic.
Topic string

// Partitions is a slice of partition IDs that this member is assigned to in the topic.
Partitions []int
}

func (c *Client) DescribeGroup(
func (c *Client) DescribeGroups(
ctx context.Context,
req DescribeGroupsRequest,
req *DescribeGroupsRequest,
) (*DescribeGroupsResponse, error) {
protoResp, err := c.roundTrip(
ctx,
Expand Down
Loading

0 comments on commit 9c690e1

Please sign in to comment.