Skip to content

Commit

Permalink
Simplify client APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
yolken-segment committed Nov 25, 2020
1 parent 7b6f290 commit 63239f1
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 47 deletions.
25 changes: 15 additions & 10 deletions alterpartitionreassignments.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,20 @@ type AlterPartitionReassignmentsRequest struct {
}

type AlterPartitionReassignmentsRequestAssignment struct {
PartitionID int32
BrokerIDs []int32
PartitionID int
BrokerIDs []int
}

type AlterPartitionReassignmentsResponse struct {
ErrorCode int16
ErrorCode int
ErrorMessage string

PartitionResults []AlterPartitionReassignmentsResponsePartitionResult
}

type AlterPartitionReassignmentsResponsePartitionResult struct {
PartitionID int32
ErrorCode int16
PartitionID int
ErrorCode int
ErrorMessage string
}

Expand All @@ -42,11 +42,16 @@ func (c *Client) AlterPartitionReassignments(
apiPartitions := []alterpartitionreassignments.RequestPartition{}

for _, assignment := range req.Assignments {
replicas := []int32{}
for _, brokerID := range assignment.BrokerIDs {
replicas = append(replicas, int32(brokerID))
}

apiPartitions = append(
apiPartitions,
alterpartitionreassignments.RequestPartition{
PartitionIndex: assignment.PartitionID,
Replicas: assignment.BrokerIDs,
PartitionIndex: int32(assignment.PartitionID),
Replicas: replicas,
},
)
}
Expand All @@ -72,7 +77,7 @@ func (c *Client) AlterPartitionReassignments(
apiResp := protoResp.(*alterpartitionreassignments.Response)

resp := &AlterPartitionReassignmentsResponse{
ErrorCode: apiResp.ErrorCode,
ErrorCode: int(apiResp.ErrorCode),
ErrorMessage: apiResp.ErrorMessage,
}

Expand All @@ -81,8 +86,8 @@ func (c *Client) AlterPartitionReassignments(
resp.PartitionResults = append(
resp.PartitionResults,
AlterPartitionReassignmentsResponsePartitionResult{
PartitionID: partitionResult.PartitionIndex,
ErrorCode: partitionResult.ErrorCode,
PartitionID: int(partitionResult.PartitionIndex),
ErrorCode: int(partitionResult.ErrorCode),
ErrorMessage: partitionResult.ErrorMessage,
},
)
Expand Down
19 changes: 11 additions & 8 deletions apiversions.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ type ApiVersionsRequest struct {
}

type ApiVersionsResponse struct {
ErrorCode int16
ErrorCode int
ApiKeys []ApiVersionsResponseApiKey
}

type ApiVersionsResponseApiKey struct {
ApiKey int16
ApiKey int
ApiName string
MinVersion int16
MaxVersion int16
MinVersion int
MaxVersion int
}

func (c *Client) ApiVersions(
Expand All @@ -35,19 +35,22 @@ func (c *Client) ApiVersions(
req.Addr,
apiReq,
)
if err != nil {
return nil, err
}
apiResp := protoResp.(*apiversions.Response)

resp := &ApiVersionsResponse{
ErrorCode: apiResp.ErrorCode,
ErrorCode: int(apiResp.ErrorCode),
}
for _, apiKey := range apiResp.ApiKeys {
resp.ApiKeys = append(
resp.ApiKeys,
ApiVersionsResponseApiKey{
ApiKey: apiKey.ApiKey,
ApiKey: int(apiKey.ApiKey),
ApiName: protocol.ApiKey(apiKey.ApiKey).String(),
MinVersion: apiKey.MinVersion,
MaxVersion: apiKey.MaxVersion,
MinVersion: int(apiKey.MinVersion),
MaxVersion: int(apiKey.MaxVersion),
},
)
}
Expand Down
17 changes: 11 additions & 6 deletions createpartitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@ type CreatePartitionsRequest struct {

Topic string
NewPartitions []CreatePartitionsRequestPartition
TotalCount int32
TotalCount int
Timeout time.Duration
}

type CreatePartitionsRequestPartition struct {
BrokerIDs []int32
BrokerIDs []int
}

type CreatePartitionsResponse struct {
ErrorCode int16
ErrorCode int
ErrorMessage string
}

Expand All @@ -34,16 +34,21 @@ func (c *Client) CreatePartitions(
) (*CreatePartitionsResponse, error) {
assignments := []createpartitions.RequestAssignment{}
for _, partition := range req.NewPartitions {
brokerIDs32 := []int32{}
for _, brokerID := range partition.BrokerIDs {
brokerIDs32 = append(brokerIDs32, int32(brokerID))
}

assignments = append(assignments, createpartitions.RequestAssignment{
BrokerIDs: partition.BrokerIDs,
BrokerIDs: brokerIDs32,
})
}

apiReq := &createpartitions.Request{
Topics: []createpartitions.RequestTopic{
{
Name: req.Topic,
Count: req.TotalCount,
Count: int32(req.TotalCount),
Assignments: assignments,
},
},
Expand All @@ -65,7 +70,7 @@ func (c *Client) CreatePartitions(
}

return &CreatePartitionsResponse{
ErrorCode: apiResp.Results[0].ErrorCode,
ErrorCode: int(apiResp.Results[0].ErrorCode),
ErrorMessage: apiResp.Results[0].ErrorMessage,
}, nil
}
26 changes: 17 additions & 9 deletions describegroups.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ type DescribeGroupsResponseMember struct {

// GroupMemberMetadata stores metadata associated with a group member.
type DescribeGroupsResponseMemberMetadata struct {
Version int16
Version int
Topics []string
UserData []byte
}

// GroupMemberAssignmentsInfo stores the topic partition assignment data for a group member.
type DescribeGroupsResponseAssignments struct {
Version int16
Version int
Topics []GroupMemberTopic
UserData []byte
}
Expand All @@ -53,7 +53,7 @@ type DescribeGroupsResponseAssignments struct {
// to represent the topic partitions that have been assigned to a group member.
type GroupMemberTopic struct {
Topic string
Partitions []int32
Partitions []int
}

func (c *Client) DescribeGroup(
Expand Down Expand Up @@ -295,10 +295,13 @@ func decodeMemberMetadata(rawMetadata []byte) (DescribeGroupsResponseMemberMetad
remain := len(rawMetadata)

var err error
var version16 int16

if remain, err = readInt16(bufReader, remain, &mm.Version); err != nil {
if remain, err = readInt16(bufReader, remain, &version16); err != nil {
return mm, err
}
mm.Version = int(version16)

if remain, err = readStringArray(bufReader, remain, &mm.Topics); err != nil {
return mm, err
}
Expand Down Expand Up @@ -327,23 +330,28 @@ func decodeMemberAssignments(rawAssignments []byte) (DescribeGroupsResponseAssig
remain := len(rawAssignments)

var err error
var version16 int16

if remain, err = readInt16(bufReader, remain, &ma.Version); err != nil {
if remain, err = readInt16(bufReader, remain, &version16); err != nil {
return ma, err
}
ma.Version = int(version16)

fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
item := GroupMemberTopic{
Partitions: []int32{},
}
item := GroupMemberTopic{}

if fnRemain, fnErr = readString(r, size, &item.Topic); fnErr != nil {
return
}

if fnRemain, fnErr = readInt32Array(r, fnRemain, &item.Partitions); fnErr != nil {
partitions := []int32{}

if fnRemain, fnErr = readInt32Array(r, fnRemain, &partitions); fnErr != nil {
return
}
for _, partition := range partitions {
item.Partitions = append(item.Partitions, int(partition))
}

ma.Topics = append(ma.Topics, item)
return
Expand Down
21 changes: 13 additions & 8 deletions electleaders.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,34 +13,39 @@ type ElectLeadersRequest struct {
Addr net.Addr

Topic string
Partitions []int32
Partitions []int

Timeout time.Duration
}

type ElectLeadersResponse struct {
ErrorCode int16
ErrorCode int
PartitionResults []ElectLeadersResponsePartitionResult
}

type ElectLeadersResponsePartitionResult struct {
Partition int32
ErrorCode int16
Partition int
ErrorCode int
ErrorMessage string
}

func (c *Client) ElectLeaders(
ctx context.Context,
req ElectLeadersRequest,
) (*ElectLeadersResponse, error) {
partitions32 := []int32{}
for _, partition := range req.Partitions {
partitions32 = append(partitions32, int32(partition))
}

protoResp, err := c.roundTrip(
ctx,
req.Addr,
&electleaders.Request{
TopicPartitions: []electleaders.RequestTopicPartitions{
{
Topic: req.Topic,
PartitionIDs: req.Partitions,
PartitionIDs: partitions32,
},
},
TimeoutMs: int32(req.Timeout.Milliseconds()),
Expand All @@ -52,16 +57,16 @@ func (c *Client) ElectLeaders(
apiResp := protoResp.(*electleaders.Response)

resp := &ElectLeadersResponse{
ErrorCode: apiResp.ErrorCode,
ErrorCode: int(apiResp.ErrorCode),
}

for _, topicResult := range apiResp.ReplicaElectionResults {
for _, partitionResult := range topicResult.PartitionResults {
resp.PartitionResults = append(
resp.PartitionResults,
ElectLeadersResponsePartitionResult{
Partition: partitionResult.PartitionID,
ErrorCode: partitionResult.ErrorCode,
Partition: int(partitionResult.PartitionID),
ErrorCode: int(partitionResult.ErrorCode),
ErrorMessage: partitionResult.ErrorMessage,
},
)
Expand Down
8 changes: 4 additions & 4 deletions incrementalalterconfigs.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type AlterBrokerConfigsRequest struct {
}

type AlterBrokerConfigsResponse struct {
ErrorCode int16
ErrorCode int
ErrorMessage string
}

Expand All @@ -30,7 +30,7 @@ type AlterTopicConfigsRequest struct {
}

type AlterTopicConfigsResponse struct {
ErrorCode int16
ErrorCode int
ErrorMessage string
}

Expand Down Expand Up @@ -63,7 +63,7 @@ func (c *Client) AlterBrokerConfigs(
}

return &AlterBrokerConfigsResponse{
ErrorCode: apiResp.Responses[0].ErrorCode,
ErrorCode: int(apiResp.Responses[0].ErrorCode),
ErrorMessage: apiResp.Responses[0].ErrorMessage,
}, nil
}
Expand Down Expand Up @@ -98,7 +98,7 @@ func (c *Client) AlterTopicConfigs(
}

return &AlterTopicConfigsResponse{
ErrorCode: apiResp.Responses[0].ErrorCode,
ErrorCode: int(apiResp.Responses[0].ErrorCode),
ErrorMessage: apiResp.Responses[0].ErrorMessage,
}, nil
}
Expand Down
4 changes: 2 additions & 2 deletions listgroups.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type ListGroupsResponse struct {

type ListGroupsResponseGroup struct {
GroupID string
Coordinator int32
Coordinator int
}

func (c *Client) ListGroups(
Expand All @@ -36,7 +36,7 @@ func (c *Client) ListGroups(
for _, apiGroupInfo := range apiResp.Groups {
resp.Groups = append(resp.Groups, ListGroupsResponseGroup{
GroupID: apiGroupInfo.GroupID,
Coordinator: apiGroupInfo.BrokerID,
Coordinator: int(apiGroupInfo.BrokerID),
})
}

Expand Down

0 comments on commit 63239f1

Please sign in to comment.