Skip to content

Commit

Permalink
Fix describeconfigs
Browse files Browse the repository at this point in the history
  • Loading branch information
yolken-segment committed Dec 8, 2020
1 parent 02a183b commit d66b9dd
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 6 deletions.
6 changes: 2 additions & 4 deletions describeconfigs.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,10 @@ type DescribeConfigsRequest struct {
type ResourceType int8

const (
// See https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java#L36
ResourceTypeUnknown ResourceType = 0
ResourceTypeAny ResourceType = 1
ResourceTypeTopic ResourceType = 2
ResourceTypeGroup ResourceType = 3
ResourceTypeCluster ResourceType = 4
ResourceTypeBroker ResourceType = 5
ResourceTypeBroker ResourceType = 4
)

type DescribeConfigRequestResource struct {
Expand Down
71 changes: 69 additions & 2 deletions protocol/describeconfigs/describeconfigs.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
package describeconfigs

import "github.com/segmentio/kafka-go/protocol"
import (
"strconv"

"github.com/segmentio/kafka-go/protocol"
)

const (
resourceTypeBroker int8 = 4
)

func init() {
protocol.Register(&Request{}, &Response{})
Expand All @@ -16,13 +24,55 @@ type Request struct {
func (r *Request) ApiKey() protocol.ApiKey { return protocol.DescribeConfigs }

func (r *Request) Broker(cluster protocol.Cluster) (protocol.Broker, error) {
// Broker metadata requests must be sent to the associated broker
for _, resource := range r.Resources {
if resource.ResourceType == resourceTypeBroker {
brokerID, err := strconv.Atoi(resource.ResourceName)
if err != nil {
return protocol.Broker{}, err
}

return cluster.Brokers[int32(brokerID)], nil
}
}

return cluster.Brokers[cluster.Controller], nil
}

func (r *Request) Split(cluster protocol.Cluster) (
[]protocol.Message,
protocol.Merger,
error,
) {
messages := []protocol.Message{}
topicsMessage := Request{
Resources: []RequestResource{},
}

for _, resource := range r.Resources {
// Split out broker requests to separate brokers
if resource.ResourceType == resourceTypeBroker {
messages = append(messages, &Request{
Resources: []RequestResource{resource},
})
} else {
topicsMessage.Resources = append(
topicsMessage.Resources, resource,
)
}
}

if len(topicsMessage.Resources) > 0 {
messages = append(messages, &topicsMessage)
}

return messages, new(Response), nil
}

type RequestResource struct {
ResourceType int8 `kafka:"min=v0,max=v3"`
ResourceName string `kafka:"min=v0,max=v3"`
ConfigNames []string `kafka:"min=v0,max=v3"`
ConfigNames []string `kafka:"min=v0,max=v3,nullable"`
}

type Response struct {
Expand All @@ -32,6 +82,23 @@ type Response struct {

func (r *Response) ApiKey() protocol.ApiKey { return protocol.DescribeConfigs }

func (r *Response) Merge(requests []protocol.Message, results []interface{}) (
protocol.Message,
error,
) {
response := &Response{}

for _, result := range results {
brokerResp := result.(*Response)
response.Resources = append(
response.Resources,
brokerResp.Resources...,
)
}

return response, nil
}

type ResponseResource struct {
ErrorCode int16 `kafka:"min=v0,max=v3"`
ErrorMessage string `kafka:"min=v0,max=v3,nullable"`
Expand Down

0 comments on commit d66b9dd

Please sign in to comment.