Skip to content

Commit

Permalink
add public api
Browse files Browse the repository at this point in the history
  • Loading branch information
petedannemann committed Apr 19, 2023
1 parent f697fff commit 21f84b0
Showing 2 changed files with 167 additions and 0 deletions.
118 changes: 118 additions & 0 deletions alterclientquotas.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package kafka

import (
"context"
"fmt"
"net"
"time"

"github.com/segmentio/kafka-go/protocol/alterclientquotas"
)

// AlterClientQuotasRequest represents a request sent to a kafka broker to add
// alter client quotas.
type AlterClientQuotasRequest struct {
// Address of the kafka broker to send the request to.
Addr net.Addr

// List of client quotas entries to alter.
Entries []AlterClientQuotaEntry

// Whether the alteration should be validated, but not performed.
ValidateOnly bool
}

type AlterClientQuotaEntry struct {
Entities []AlterClientQuotaEntity
Ops []AlterClientQuotaOps
}

type AlterClientQuotaEntity struct {
EntityType string
EntityName string
}

type AlterClientQuotaOps struct {
Key string
Value float64
Remove bool
}

type AlterClientQuotaResponseQuotas struct {
ErrorCode int16
ErrorMessage string
Entities []AlterClientQuotaEntity
}

// AlterClientQuotasResponse represents a response from a kafka broker to an alter client
// quotas request.
type AlterClientQuotasResponse struct {
// The amount of time that the broker throttled the request.
Throttle time.Duration

// List of altered client quotas responses.
Entries []AlterClientQuotaResponseQuotas
}

// AlterClientQuotas sends client quotas alteration request to a kafka broker and returns
// the response.
func (c *Client) AlterClientQuotas(ctx context.Context, req *AlterClientQuotasRequest) (*AlterClientQuotasResponse, error) {
entries := make([]alterclientquotas.Entry, len(req.Entries))

for entryIdx, entry := range req.Entries {
entities := make([]alterclientquotas.Entity, len(entry.Entities))
for entityIdx, entity := range entry.Entities {
entities[entityIdx] = alterclientquotas.Entity{
EntityType: entity.EntityType,
EntityName: entity.EntityName,
}
}

ops := make([]alterclientquotas.Ops, len(entry.Ops))
for opsIdx, op := range entry.Ops {
ops[opsIdx] = alterclientquotas.Ops{
Key: op.Key,
Value: op.Value,
Remove: op.Remove,
}
}

entries[entryIdx] = alterclientquotas.Entry{
Entities: entities,
Ops: ops,
}
}

m, err := c.roundTrip(ctx, req.Addr, &alterclientquotas.Request{
Entries: entries,
ValidateOnly: req.ValidateOnly,
})
if err != nil {
return nil, fmt.Errorf("kafka.(*Client).AlterClientQuotas: %w", err)
}

res := m.(*alterclientquotas.Response)
responseEntries := make([]AlterClientQuotaResponseQuotas, len(res.Results))

for responseEntryIdx, responseEntry := range res.Results {
responseEntities := make([]AlterClientQuotaEntity, len(responseEntry.Entities))
for responseEntityIdx, responseEntity := range responseEntry.Entities {
responseEntities[responseEntityIdx] = AlterClientQuotaEntity{
EntityType: responseEntity.EntityType,
EntityName: responseEntity.EntityName,
}
}

responseEntries[responseEntryIdx] = AlterClientQuotaResponseQuotas{
ErrorCode: responseEntry.ErrorCode,
ErrorMessage: responseEntry.ErrorMessage,
Entities: responseEntities,
}
}
ret := &AlterClientQuotasResponse{
Throttle: makeDuration(res.ThrottleTimeMs),
Entries: responseEntries,
}

return ret, nil
}
49 changes: 49 additions & 0 deletions alterclientquotas_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package kafka

import (
"context"
"testing"

ktesting "github.com/segmentio/kafka-go/testing"
)

func TestClientAlterClientQuotas(t *testing.T) {
// Added in Version 2.6.0 https://issues.apache.org/jira/browse/KAFKA-7740
if !ktesting.KafkaIsAtLeast("2.6.0") {
return
}

const (
entityType = "client-id"
entityName = "my-client-id"
key = "producer_byte_rate"
value = 500000.0
)

client, shutdown := newLocalClient()
defer shutdown()

resp, err := client.AlterClientQuotas(context.Background(), &AlterClientQuotasRequest{
Entries: []AlterClientQuotaEntry{
AlterClientQuotaEntry{
Entities: []AlterClientQuotaEntity{
AlterClientQuotaEntity{
EntityType: entityType,
EntityName: entityName,
},
},
Ops: []AlterClientQuotaOps{
AlterClientQuotaOps{
Key: key,
Value: value,
Remove: false,
},
},
},
},
})

if err != nil {
t.Fatal(err)
}
}

0 comments on commit 21f84b0

Please sign in to comment.