forked from segmentio/kafka-go
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
f697fff
commit 21f84b0
Showing
2 changed files
with
167 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
package kafka | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"net" | ||
"time" | ||
|
||
"github.com/segmentio/kafka-go/protocol/alterclientquotas" | ||
) | ||
|
||
// AlterClientQuotasRequest represents a request sent to a kafka broker to add | ||
// alter client quotas. | ||
type AlterClientQuotasRequest struct { | ||
// Address of the kafka broker to send the request to. | ||
Addr net.Addr | ||
|
||
// List of client quotas entries to alter. | ||
Entries []AlterClientQuotaEntry | ||
|
||
// Whether the alteration should be validated, but not performed. | ||
ValidateOnly bool | ||
} | ||
|
||
type AlterClientQuotaEntry struct { | ||
Entities []AlterClientQuotaEntity | ||
Ops []AlterClientQuotaOps | ||
} | ||
|
||
type AlterClientQuotaEntity struct { | ||
EntityType string | ||
EntityName string | ||
} | ||
|
||
type AlterClientQuotaOps struct { | ||
Key string | ||
Value float64 | ||
Remove bool | ||
} | ||
|
||
type AlterClientQuotaResponseQuotas struct { | ||
ErrorCode int16 | ||
ErrorMessage string | ||
Entities []AlterClientQuotaEntity | ||
} | ||
|
||
// AlterClientQuotasResponse represents a response from a kafka broker to an alter client | ||
// quotas request. | ||
type AlterClientQuotasResponse struct { | ||
// The amount of time that the broker throttled the request. | ||
Throttle time.Duration | ||
|
||
// List of altered client quotas responses. | ||
Entries []AlterClientQuotaResponseQuotas | ||
} | ||
|
||
// AlterClientQuotas sends client quotas alteration request to a kafka broker and returns | ||
// the response. | ||
func (c *Client) AlterClientQuotas(ctx context.Context, req *AlterClientQuotasRequest) (*AlterClientQuotasResponse, error) { | ||
entries := make([]alterclientquotas.Entry, len(req.Entries)) | ||
|
||
for entryIdx, entry := range req.Entries { | ||
entities := make([]alterclientquotas.Entity, len(entry.Entities)) | ||
for entityIdx, entity := range entry.Entities { | ||
entities[entityIdx] = alterclientquotas.Entity{ | ||
EntityType: entity.EntityType, | ||
EntityName: entity.EntityName, | ||
} | ||
} | ||
|
||
ops := make([]alterclientquotas.Ops, len(entry.Ops)) | ||
for opsIdx, op := range entry.Ops { | ||
ops[opsIdx] = alterclientquotas.Ops{ | ||
Key: op.Key, | ||
Value: op.Value, | ||
Remove: op.Remove, | ||
} | ||
} | ||
|
||
entries[entryIdx] = alterclientquotas.Entry{ | ||
Entities: entities, | ||
Ops: ops, | ||
} | ||
} | ||
|
||
m, err := c.roundTrip(ctx, req.Addr, &alterclientquotas.Request{ | ||
Entries: entries, | ||
ValidateOnly: req.ValidateOnly, | ||
}) | ||
if err != nil { | ||
return nil, fmt.Errorf("kafka.(*Client).AlterClientQuotas: %w", err) | ||
} | ||
|
||
res := m.(*alterclientquotas.Response) | ||
responseEntries := make([]AlterClientQuotaResponseQuotas, len(res.Results)) | ||
|
||
for responseEntryIdx, responseEntry := range res.Results { | ||
responseEntities := make([]AlterClientQuotaEntity, len(responseEntry.Entities)) | ||
for responseEntityIdx, responseEntity := range responseEntry.Entities { | ||
responseEntities[responseEntityIdx] = AlterClientQuotaEntity{ | ||
EntityType: responseEntity.EntityType, | ||
EntityName: responseEntity.EntityName, | ||
} | ||
} | ||
|
||
responseEntries[responseEntryIdx] = AlterClientQuotaResponseQuotas{ | ||
ErrorCode: responseEntry.ErrorCode, | ||
ErrorMessage: responseEntry.ErrorMessage, | ||
Entities: responseEntities, | ||
} | ||
} | ||
ret := &AlterClientQuotasResponse{ | ||
Throttle: makeDuration(res.ThrottleTimeMs), | ||
Entries: responseEntries, | ||
} | ||
|
||
return ret, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
package kafka | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
|
||
ktesting "github.com/segmentio/kafka-go/testing" | ||
) | ||
|
||
func TestClientAlterClientQuotas(t *testing.T) { | ||
// Added in Version 2.6.0 https://issues.apache.org/jira/browse/KAFKA-7740 | ||
if !ktesting.KafkaIsAtLeast("2.6.0") { | ||
return | ||
} | ||
|
||
const ( | ||
entityType = "client-id" | ||
entityName = "my-client-id" | ||
key = "producer_byte_rate" | ||
value = 500000.0 | ||
) | ||
|
||
client, shutdown := newLocalClient() | ||
defer shutdown() | ||
|
||
resp, err := client.AlterClientQuotas(context.Background(), &AlterClientQuotasRequest{ | ||
Entries: []AlterClientQuotaEntry{ | ||
AlterClientQuotaEntry{ | ||
Entities: []AlterClientQuotaEntity{ | ||
AlterClientQuotaEntity{ | ||
EntityType: entityType, | ||
EntityName: entityName, | ||
}, | ||
}, | ||
Ops: []AlterClientQuotaOps{ | ||
AlterClientQuotaOps{ | ||
Key: key, | ||
Value: value, | ||
Remove: false, | ||
}, | ||
}, | ||
}, | ||
}, | ||
}) | ||
|
||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
} |