Skip to content

Commit

Permalink
Misc. fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
yolken-segment committed Nov 18, 2020
1 parent cca95af commit 00f7486
Show file tree
Hide file tree
Showing 12 changed files with 265 additions and 85 deletions.
70 changes: 70 additions & 0 deletions alterpartitionreassignments.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package kafka

import (
"context"
"net"
"time"

"github.com/segmentio/kafka-go/protocol/alterpartitionreassignments"
)

type AlterPartitionReassignmentsRequest struct {
// Address of the kafka broker to send the request to.
Addr net.Addr

Topic string
Assignments []AlterPartitionReassignmentsRequestAssignment
Timeout time.Duration
}

type AlterPartitionReassignmentsRequestAssignment struct {
PartitionID int32
BrokerIDs []int32
}

type AlterPartitionReassignmentsResponse struct {
ErrorCode int16
ErrorMessage string
}

func (c *Client) AlterPartitionReassignments(
ctx context.Context,
req AlterPartitionReassignmentsRequest,
) (*AlterPartitionReassignmentsResponse, error) {
apiPartitions := []alterpartitionreassignments.RequestPartition{}

for _, assignment := range req.Assignments {
apiPartitions = append(
apiPartitions,
alterpartitionreassignments.RequestPartition{
PartitionIndex: assignment.PartitionID,
Replicas: assignment.BrokerIDs,
},
)
}

apiReq := &alterpartitionreassignments.Request{
TimeoutMs: int32(req.Timeout.Milliseconds()),
Topics: []alterpartitionreassignments.RequestTopic{
{
Name: req.Topic,
Partitions: apiPartitions,
},
},
}

protoResp, err := c.roundTrip(
ctx,
req.Addr,
apiReq,
)
if err != nil {
return nil, err
}
apiResp := protoResp.(*alterpartitionreassignments.Response)

return &AlterPartitionReassignmentsResponse{
ErrorCode: apiResp.ErrorCode,
ErrorMessage: apiResp.ErrorMessage,
}, nil
}
9 changes: 5 additions & 4 deletions createpartitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net"
"time"

"github.com/segmentio/kafka-go/protocol/createpartitions"
)
Expand All @@ -15,6 +16,7 @@ type CreatePartitionsRequest struct {
Topic string
NewPartitions []CreatePartitionsRequestPartition
TotalCount int32
Timeout time.Duration
}

type CreatePartitionsRequestPartition struct {
Expand Down Expand Up @@ -45,20 +47,19 @@ func (c *Client) CreatePartitions(
Assignments: assignments,
},
},
TimeoutMs: 5000,
TimeoutMs: int32(req.Timeout.Milliseconds()),
}

protocolResp, err := c.roundTrip(
protoResp, err := c.roundTrip(
ctx,
req.Addr,
apiReq,
)
if err != nil {
return nil, err
}
apiResp := protocolResp.(*createpartitions.Response)
apiResp := protoResp.(*createpartitions.Response)

fmt.Printf("Response: %+v", *apiResp)
if len(apiResp.Results) == 0 {
return nil, fmt.Errorf("Empty results")
}
Expand Down
9 changes: 2 additions & 7 deletions describeconfigs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"net"
"strconv"
"strings"

"github.com/segmentio/kafka-go/protocol/describeconfigs"
)
Expand Down Expand Up @@ -60,7 +59,7 @@ func (c *Client) DescribeConfigs(
)
}

protocolResp, err := c.roundTrip(
protoResp, err := c.roundTrip(
ctx,
req.Addr,
&describeconfigs.Request{
Expand All @@ -70,7 +69,7 @@ func (c *Client) DescribeConfigs(
if err != nil {
return nil, err
}
apiResp := protocolResp.(*describeconfigs.Response)
apiResp := protoResp.(*describeconfigs.Response)

resp := &DescribeConfigsResponse{}

Expand All @@ -79,10 +78,6 @@ func (c *Client) DescribeConfigs(
case describeconfigs.ResourceTypeBroker:
configs := map[string]string{}
for _, entry := range resource.ConfigEntries {
if resource.ResourceName == "2" && strings.Contains(entry.ConfigName, "throttled") {
fmt.Printf("Entry: %+v\n", entry)
}

if !req.IncludeDefaults && (entry.IsDefault ||
entry.ConfigSource == describeconfigs.ConfigSourceDefaultConfig ||
entry.ConfigSource == describeconfigs.ConfigSourceStaticBrokerConfig ||
Expand Down
4 changes: 2 additions & 2 deletions describegroups.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (c *Client) DescribeGroup(
ctx context.Context,
req DescribeGroupsRequest,
) (*DescribeGroupsResponse, error) {
protocolResp, err := c.roundTrip(
protoResp, err := c.roundTrip(
ctx,
req.Addr,
&describegroups.Request{
Expand All @@ -70,7 +70,7 @@ func (c *Client) DescribeGroup(
if err != nil {
return nil, err
}
apiResp := protocolResp.(*describegroups.Response)
apiResp := protoResp.(*describegroups.Response)
resp := &DescribeGroupsResponse{
Groups: []DescribeGroupsResponseGroup{},
}
Expand Down
10 changes: 6 additions & 4 deletions electleaders.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kafka
import (
"context"
"net"
"time"

"github.com/segmentio/kafka-go/protocol/electleaders"
)
Expand All @@ -13,7 +14,8 @@ type ElectLeadersRequest struct {

Topic string
Partitions []int32
TimeoutMs int32

Timeout time.Duration
}

type ElectLeadersResponse struct {
Expand All @@ -36,18 +38,18 @@ func (c *Client) ElectLeaders(
)
}

protocolResp, err := c.roundTrip(
protoResp, err := c.roundTrip(
ctx,
req.Addr,
&electleaders.Request{
TopicPartitions: topicPartitions,
TimeoutMs: req.TimeoutMs,
TimeoutMs: int32(req.Timeout.Milliseconds()),
},
)
if err != nil {
return nil, err
}
apiResp := protocolResp.(*electleaders.Response)
apiResp := protoResp.(*electleaders.Response)

return &ElectLeadersResponse{
ErrorCode: apiResp.ErrorCode,
Expand Down
8 changes: 4 additions & 4 deletions incrementalalterconfigs.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,15 @@ func (c *Client) AlterBrokerConfigs(
},
}

protocolResp, err := c.roundTrip(
protoResp, err := c.roundTrip(
ctx,
req.Addr,
apiReq,
)
if err != nil {
return nil, err
}
apiResp := protocolResp.(*incrementalalterconfigs.Response)
apiResp := protoResp.(*incrementalalterconfigs.Response)
if len(apiResp.Responses) == 0 {
return nil, fmt.Errorf("Empty response")
}
Expand All @@ -83,15 +83,15 @@ func (c *Client) AlterTopicConfigs(
},
}

protocolResp, err := c.roundTrip(
protoResp, err := c.roundTrip(
ctx,
req.Addr,
apiReq,
)
if err != nil {
return nil, err
}
apiResp := protocolResp.(*incrementalalterconfigs.Response)
apiResp := protoResp.(*incrementalalterconfigs.Response)

if len(apiResp.Responses) == 0 {
return nil, fmt.Errorf("Empty response")
Expand Down
4 changes: 2 additions & 2 deletions listgroups.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ func (c *Client) ListGroups(
ctx context.Context,
req ListGroupsRequest,
) (*ListGroupsResponse, error) {
protocolResp, err := c.roundTrip(ctx, req.Addr, &listgroups.Request{})
protoResp, err := c.roundTrip(ctx, req.Addr, &listgroups.Request{})
if err != nil {
return nil, err
}
apiResp := protocolResp.(*listgroups.Response)
apiResp := protoResp.(*listgroups.Response)
resp := &ListGroupsResponse{}

for _, apiGroupInfo := range apiResp.Groups {
Expand Down
25 changes: 13 additions & 12 deletions protocol/alterpartitionreassignments/alterpartitionreassignments.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ type Request struct {
}

type RequestTopic struct {
Name string `kafka:"min=v0,max=v0,compact"`
Partitions []RequestPartitions `kafka:"min=v0,max=v0"`
Name string `kafka:"min=v0,max=v0"`
Partitions []RequestPartition `kafka:"min=v0,max=v0"`
}

type RequestPartitions struct {
PartitionIndex int32 `kafka:"min=v0,max=v0"`
Replicas int32 `kafka:"min=v0,max=v0"`
type RequestPartition struct {
PartitionIndex int32 `kafka:"min=v0,max=v0"`
Replicas []int32 `kafka:"min=v0,max=v0"`
}

func (r *Request) ApiKey() protocol.ApiKey {
Expand All @@ -34,24 +34,25 @@ func (r *Request) Broker(cluster protocol.Cluster) (protocol.Broker, error) {
}

type Response struct {
ThrottleTimeMs int32 `kafka:"min=v0,max=v0"`
ErrorCode int16 `kafka:"min=v0,max=v0"`
ErrorMessage string `kafka:"min=v0,max=v0,compact,nullable"`
ThrottleTimeMs int32 `kafka:"min=v0,max=v0"`
ErrorCode int16 `kafka:"min=v0,max=v0"`
ErrorMessage string `kafka:"min=v0,max=v0,nullable"`
Results []ResponseResult `kafka:"min=v0,max=v0"`

// We need at least one tagged field to indicate that this is a "flexible" message
// type.
_ struct{} `kafka:"min=v0,max=v0,tagId=-1"`
}

type ResponseResult struct {
Name string `kafka:"min=v0,max=v0,compact"`
Name string `kafka:"min=v0,max=v0"`
Partitions []ResponsePartition `kafka:"min=v0,max=v0"`
}

type ResponsePartition struct {
PartitionIndex int32
ErrorCode int16
ErrorMessage string `kafka:"min=v0,max=v0,compact,nullable"`
PartitionIndex int32 `kafka:"min=v0,max=v0"`
ErrorCode int16 `kafka:"min=v0,max=v0"`
ErrorMessage string `kafka:"min=v0,max=v0,nullable"`
}

func (r *Response) ApiKey() protocol.ApiKey {
Expand Down
Loading

0 comments on commit 00f7486

Please sign in to comment.