diff --git a/describeconfigs.go b/describeconfigs.go index b1fd254e3..0b48e8ae4 100644 --- a/describeconfigs.go +++ b/describeconfigs.go @@ -27,12 +27,10 @@ type DescribeConfigsRequest struct { type ResourceType int8 const ( + // See https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java#L36 ResourceTypeUnknown ResourceType = 0 - ResourceTypeAny ResourceType = 1 ResourceTypeTopic ResourceType = 2 - ResourceTypeGroup ResourceType = 3 - ResourceTypeCluster ResourceType = 4 - ResourceTypeBroker ResourceType = 5 + ResourceTypeBroker ResourceType = 4 ) type DescribeConfigRequestResource struct { diff --git a/protocol/describeconfigs/describeconfigs.go b/protocol/describeconfigs/describeconfigs.go index 9f7fc18b2..acda6eb65 100644 --- a/protocol/describeconfigs/describeconfigs.go +++ b/protocol/describeconfigs/describeconfigs.go @@ -1,6 +1,14 @@ package describeconfigs -import "github.com/segmentio/kafka-go/protocol" +import ( + "strconv" + + "github.com/segmentio/kafka-go/protocol" +) + +const ( + resourceTypeBroker int8 = 4 +) func init() { protocol.Register(&Request{}, &Response{}) @@ -16,13 +24,55 @@ type Request struct { func (r *Request) ApiKey() protocol.ApiKey { return protocol.DescribeConfigs } func (r *Request) Broker(cluster protocol.Cluster) (protocol.Broker, error) { + // Broker metadata requests must be sent to the associated broker + for _, resource := range r.Resources { + if resource.ResourceType == resourceTypeBroker { + brokerID, err := strconv.Atoi(resource.ResourceName) + if err != nil { + return protocol.Broker{}, err + } + + return cluster.Brokers[int32(brokerID)], nil + } + } + return cluster.Brokers[cluster.Controller], nil } +func (r *Request) Split(cluster protocol.Cluster) ( + []protocol.Message, + protocol.Merger, + error, +) { + messages := []protocol.Message{} + topicsMessage := Request{ + Resources: []RequestResource{}, + } + + for _, resource := range r.Resources { + // Split out broker requests to separate brokers + if resource.ResourceType == resourceTypeBroker { + messages = append(messages, &Request{ + Resources: []RequestResource{resource}, + }) + } else { + topicsMessage.Resources = append( + topicsMessage.Resources, resource, + ) + } + } + + if len(topicsMessage.Resources) > 0 { + messages = append(messages, &topicsMessage) + } + + return messages, new(Response), nil +} + type RequestResource struct { ResourceType int8 `kafka:"min=v0,max=v3"` ResourceName string `kafka:"min=v0,max=v3"` - ConfigNames []string `kafka:"min=v0,max=v3"` + ConfigNames []string `kafka:"min=v0,max=v3,nullable"` } type Response struct { @@ -32,6 +82,23 @@ type Response struct { func (r *Response) ApiKey() protocol.ApiKey { return protocol.DescribeConfigs } +func (r *Response) Merge(requests []protocol.Message, results []interface{}) ( + protocol.Message, + error, +) { + response := &Response{} + + for _, result := range results { + brokerResp := result.(*Response) + response.Resources = append( + response.Resources, + brokerResp.Resources..., + ) + } + + return response, nil +} + type ResponseResource struct { ErrorCode int16 `kafka:"min=v0,max=v3"` ErrorMessage string `kafka:"min=v0,max=v3,nullable"`