forked from segmentio/kafka-go
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add CreateAcls Admin API support (segmentio#839)
This adds a CreateACLs method to the kafka Client to supports the CreateAcls Admin API. Signed-off-by: Guillaume Fillon <guillaume.fillon@auth0.com>
- Loading branch information
Showing
7 changed files
with
273 additions
and
17 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
package kafka | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
|
||
ktesting "github.com/segmentio/kafka-go/testing" | ||
) | ||
|
||
func TestClientCreateACLs(t *testing.T) { | ||
if !ktesting.KafkaIsAtLeast("2.0.1") { | ||
return | ||
} | ||
|
||
client, shutdown := newLocalClient() | ||
defer shutdown() | ||
|
||
res, err := client.CreateACLs(context.Background(), &CreateACLsRequest{ | ||
ACLs: []ACLEntry{ | ||
{ | ||
Principal: "User:alice", | ||
PermissionType: ACLPermissionTypeAllow, | ||
Operation: ACLOperationTypeRead, | ||
ResourceType: ResourceTypeTopic, | ||
ResourcePatternType: PatternTypeLiteral, | ||
ResourceName: "fake-topic-for-alice", | ||
Host: "*", | ||
}, | ||
{ | ||
Principal: "User:bob", | ||
PermissionType: ACLPermissionTypeAllow, | ||
Operation: ACLOperationTypeRead, | ||
ResourceType: ResourceTypeGroup, | ||
ResourcePatternType: PatternTypeLiteral, | ||
ResourceName: "fake-group-for-bob", | ||
Host: "*", | ||
}, | ||
}, | ||
}) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
for _, err := range res.Errors { | ||
if err != nil { | ||
t.Error(err) | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
package kafka | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"net" | ||
"time" | ||
|
||
"github.com/segmentio/kafka-go/protocol/createacls" | ||
) | ||
|
||
// CreateACLsRequest represents a request sent to a kafka broker to add | ||
// new ACLs. | ||
type CreateACLsRequest struct { | ||
// Address of the kafka broker to send the request to. | ||
Addr net.Addr | ||
|
||
// List of ACL to create. | ||
ACLs []ACLEntry | ||
} | ||
|
||
// CreateACLsResponse represents a response from a kafka broker to an ACL | ||
// creation request. | ||
type CreateACLsResponse struct { | ||
// The amount of time that the broker throttled the request. | ||
Throttle time.Duration | ||
|
||
// List of errors that occurred while attempting to create | ||
// the ACLs. | ||
// | ||
// The errors contain the kafka error code. Programs may use the standard | ||
// errors.Is function to test the error against kafka error codes. | ||
Errors []error | ||
} | ||
|
||
type ACLPermissionType int8 | ||
|
||
const ( | ||
ACLPermissionTypeUnknown ACLPermissionType = 0 | ||
ACLPermissionTypeAny ACLPermissionType = 1 | ||
ACLPermissionTypeDeny ACLPermissionType = 2 | ||
ACLPermissionTypeAllow ACLPermissionType = 3 | ||
) | ||
|
||
type ACLOperationType int8 | ||
|
||
const ( | ||
ACLOperationTypeUnknown ACLOperationType = 0 | ||
ACLOperationTypeAny ACLOperationType = 1 | ||
ACLOperationTypeAll ACLOperationType = 2 | ||
ACLOperationTypeRead ACLOperationType = 3 | ||
ACLOperationTypeWrite ACLOperationType = 4 | ||
ACLOperationTypeCreate ACLOperationType = 5 | ||
ACLOperationTypeDelete ACLOperationType = 6 | ||
ACLOperationTypeAlter ACLOperationType = 7 | ||
ACLOperationTypeDescribe ACLOperationType = 8 | ||
ACLOperationTypeClusterAction ACLOperationType = 9 | ||
ACLOperationTypeDescribeConfigs ACLOperationType = 10 | ||
ACLOperationTypeAlterConfigs ACLOperationType = 11 | ||
ACLOperationTypeIdempotentWrite ACLOperationType = 12 | ||
) | ||
|
||
type ACLEntry struct { | ||
ResourceType ResourceType | ||
ResourceName string | ||
ResourcePatternType PatternType | ||
Principal string | ||
Host string | ||
Operation ACLOperationType | ||
PermissionType ACLPermissionType | ||
} | ||
|
||
// CreateACLs sends ACLs creation request to a kafka broker and returns the | ||
// response. | ||
func (c *Client) CreateACLs(ctx context.Context, req *CreateACLsRequest) (*CreateACLsResponse, error) { | ||
acls := make([]createacls.RequestACLs, 0, len(req.ACLs)) | ||
|
||
for _, acl := range req.ACLs { | ||
acls = append(acls, createacls.RequestACLs{ | ||
ResourceType: int8(acl.ResourceType), | ||
ResourceName: acl.ResourceName, | ||
ResourcePatternType: int8(acl.ResourcePatternType), | ||
Principal: acl.Principal, | ||
Host: acl.Host, | ||
Operation: int8(acl.Operation), | ||
PermissionType: int8(acl.PermissionType), | ||
}) | ||
} | ||
|
||
m, err := c.roundTrip(ctx, req.Addr, &createacls.Request{ | ||
Creations: acls, | ||
}) | ||
if err != nil { | ||
return nil, fmt.Errorf("kafka.(*Client).CreateACLs: %w", err) | ||
} | ||
|
||
res := m.(*createacls.Response) | ||
ret := &CreateACLsResponse{ | ||
Throttle: makeDuration(res.ThrottleTimeMs), | ||
Errors: make([]error, 0, len(res.Results)), | ||
} | ||
|
||
for _, t := range res.Results { | ||
ret.Errors = append(ret.Errors, makeError(t.ErrorCode, t.ErrorMessage)) | ||
} | ||
|
||
return ret, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
package createacls | ||
|
||
import "github.com/segmentio/kafka-go/protocol" | ||
|
||
func init() { | ||
protocol.Register(&Request{}, &Response{}) | ||
} | ||
|
||
type Request struct { | ||
// We need at least one tagged field to indicate that v2+ uses "flexible" | ||
// messages. | ||
_ struct{} `kafka:"min=v2,max=v2,tag"` | ||
|
||
Creations []RequestACLs `kafka:"min=v0,max=v2"` | ||
} | ||
|
||
func (r *Request) ApiKey() protocol.ApiKey { return protocol.CreateAcls } | ||
|
||
func (r *Request) Broker(cluster protocol.Cluster) (protocol.Broker, error) { | ||
return cluster.Brokers[cluster.Controller], nil | ||
} | ||
|
||
type RequestACLs struct { | ||
ResourceType int8 `kafka:"min=v0,max=v2"` | ||
ResourceName string `kafka:"min=v0,max=v2"` | ||
ResourcePatternType int8 `kafka:"min=v0,max=v2"` | ||
Principal string `kafka:"min=v0,max=v2"` | ||
Host string `kafka:"min=v0,max=v2"` | ||
Operation int8 `kafka:"min=v0,max=v2"` | ||
PermissionType int8 `kafka:"min=v0,max=v2"` | ||
} | ||
|
||
type Response struct { | ||
// We need at least one tagged field to indicate that v2+ uses "flexible" | ||
// messages. | ||
_ struct{} `kafka:"min=v2,max=v2,tag"` | ||
|
||
ThrottleTimeMs int32 `kafka:"min=v0,max=v2"` | ||
Results []ResponseACLs `kafka:"min=v0,max=v2"` | ||
} | ||
|
||
func (r *Response) ApiKey() protocol.ApiKey { return protocol.CreateAcls } | ||
|
||
type ResponseACLs struct { | ||
ErrorCode int16 `kafka:"min=v0,max=v2"` | ||
ErrorMessage string `kafka:"min=v0,max=v2,nullable"` | ||
} | ||
|
||
var _ protocol.BrokerMessage = (*Request)(nil) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
package kafka | ||
|
||
// https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java | ||
type ResourceType int8 | ||
|
||
const ( | ||
ResourceTypeUnknown ResourceType = 0 | ||
ResourceTypeAny ResourceType = 1 | ||
ResourceTypeTopic ResourceType = 2 | ||
ResourceTypeGroup ResourceType = 3 | ||
// See https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java#L36 | ||
ResourceTypeBroker ResourceType = 4 | ||
ResourceTypeCluster ResourceType = 4 | ||
ResourceTypeTransactionalID ResourceType = 5 | ||
ResourceTypeDelegationToken ResourceType = 6 | ||
) | ||
|
||
// https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/resource/PatternType.java | ||
type PatternType int8 | ||
|
||
const ( | ||
// PatternTypeUnknown represents any PatternType which this client cannot | ||
// understand. | ||
PatternTypeUnknown PatternType = 0 | ||
// PatternTypeAny matches any resource pattern type. | ||
PatternTypeAny PatternType = 1 | ||
// PatternTypeMatch perform pattern matching. | ||
PatternTypeMatch PatternType = 2 | ||
// PatternTypeLiteral represents a literal name. | ||
// A literal name defines the full name of a resource, e.g. topic with name | ||
// 'foo', or group with name 'bob'. | ||
PatternTypeLiteral PatternType = 3 | ||
// PatternTypePrefixed represents a prefixed name. | ||
// A prefixed name defines a prefix for a resource, e.g. topics with names | ||
// that start with 'foo'. | ||
PatternTypePrefixed PatternType = 4 | ||
) |