forked from segmentio/kafka-go
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add protocol versions of listgroups and describegroups
- Loading branch information
1 parent
9069f0e
commit bc7cd50
Showing
4 changed files
with
368 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
package describegroups | ||
|
||
import ( | ||
"github.com/segmentio/kafka-go/protocol" | ||
) | ||
|
||
func init() { | ||
protocol.Register(&Request{}, &Response{}) | ||
} | ||
|
||
type Request struct { | ||
Groups []string `kafka:"min=v0,max=v4"` | ||
IncludeAuthorizedOperations bool `kafka:"min=v3,max=v4"` | ||
} | ||
|
||
func (r *Request) ApiKey() protocol.ApiKey { return protocol.DescribeGroups } | ||
|
||
func (r *Request) Group() string { | ||
// TODO: Handle other groups too (via splitter). | ||
return r.Groups[0] | ||
} | ||
|
||
type Response struct { | ||
ThrottleTimeMs int32 `kafka:"min=v1,max=v4"` | ||
Groups []ResponseGroup `kafka:"min=v0,max=v4"` | ||
} | ||
|
||
type ResponseGroup struct { | ||
ErrorCode int16 `kafka:"min=v0,max=v4"` | ||
GroupID string `kafka:"min=v0,max=v4"` | ||
GroupState string `kafka:"min=v0,max=v4"` | ||
ProtocolType string `kafka:"min=v0,max=v4"` | ||
ProtocolData string `kafka:"min=v0,max=v4"` | ||
Members []ResponseGroupMember `kafka:"min=v0,max=v4"` | ||
AuthorizedOperations int32 `kafka:"min=v3,max=v4"` | ||
} | ||
|
||
type ResponseGroupMember struct { | ||
MemberID string `kafka:"min=v0,max=v4"` | ||
GroupInstanceID string `kafka:"min=v4,max=v4,nullable"` | ||
ClientID string `kafka:"min=v0,max=v4"` | ||
ClientHost string `kafka:"min=v0,max=v4"` | ||
MemberMetadata []byte `kafka:"min=v0,max=v4"` | ||
MemberAssignment []byte `kafka:"min=v0,max=v4"` | ||
} | ||
|
||
func (r *Response) ApiKey() protocol.ApiKey { return protocol.DescribeGroups } |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
package listgroups | ||
|
||
import ( | ||
"errors" | ||
|
||
"github.com/segmentio/kafka-go/protocol" | ||
) | ||
|
||
func init() { | ||
protocol.Register(&Request{}, &Response{}) | ||
} | ||
|
||
type Request struct { | ||
brokerID int32 | ||
} | ||
|
||
func (r *Request) ApiKey() protocol.ApiKey { return protocol.ListGroups } | ||
|
||
func (r *Request) Broker(cluster protocol.Cluster) (protocol.Broker, error) { | ||
return cluster.Brokers[r.brokerID], nil | ||
} | ||
|
||
func (r *Request) Split(cluster protocol.Cluster) ( | ||
[]protocol.Message, | ||
protocol.Merger, | ||
error, | ||
) { | ||
messages := []protocol.Message{} | ||
|
||
for _, broker := range cluster.Brokers { | ||
messages = append(messages, &Request{broker.ID}) | ||
} | ||
|
||
return nil, new(Response), nil | ||
} | ||
|
||
type Response struct { | ||
ThrottleTimeMs int32 `kafka:"min=v1,max=v2"` | ||
ErrorCode int16 `kafka:"min=v0,max=v2"` | ||
Groups []ResponseGroup `kafka:"min=v0,max=v2"` | ||
} | ||
|
||
type ResponseGroup struct { | ||
GroupID string `kafka:"min=v0,max=v2"` | ||
ProtocolType string `kafka:"min=v0,max=v2"` | ||
|
||
// Use this to store which broker returned the response | ||
BrokerID int32 `kafka:"-"` | ||
} | ||
|
||
func (r *Response) ApiKey() protocol.ApiKey { return protocol.ListGroups } | ||
|
||
func (r *Response) Merge(requests []protocol.Message, results []interface{}) ( | ||
protocol.Message, | ||
error, | ||
) { | ||
response := &Response{} | ||
|
||
for r, result := range results { | ||
brokerResp, ok := result.(*Response) | ||
if !ok { | ||
return nil, errors.New("Unexpected response type") | ||
} | ||
|
||
respGroups := []ResponseGroup{} | ||
|
||
for _, brokerResp := range brokerResp.Groups { | ||
respGroups = append( | ||
respGroups, | ||
ResponseGroup{ | ||
GroupID: brokerResp.GroupID, | ||
ProtocolType: brokerResp.ProtocolType, | ||
BrokerID: requests[r].(*Request).brokerID, | ||
}, | ||
) | ||
} | ||
|
||
response.Groups = append(response.Groups, brokerResp.Groups...) | ||
} | ||
|
||
return response, nil | ||
} |