package kafka import ( "context" "fmt" "net" "time" "github.com/segmentio/kafka-go/protocol/alterclientquotas" ) // AlterClientQuotasRequest represents a request sent to a kafka broker to // alter client quotas. type AlterClientQuotasRequest struct { // Address of the kafka broker to send the request to. Addr net.Addr // List of client quotas entries to alter. Entries []AlterClientQuotaEntry // Whether the alteration should be validated, but not performed. ValidateOnly bool } type AlterClientQuotaEntry struct { // The quota entities to alter. Entities []AlterClientQuotaEntity // An individual quota configuration entry to alter. Ops []AlterClientQuotaOps } type AlterClientQuotaEntity struct { // The quota entity type. EntityType string // The name of the quota entity, or null if the default. EntityName string } type AlterClientQuotaOps struct { // The quota configuration key. Key string // The quota configuration value to set, otherwise ignored if the value is to be removed. Value float64 // Whether the quota configuration value should be removed, otherwise set. Remove bool } type AlterClientQuotaResponseQuotas struct { // Error is set to a non-nil value including the code and message if a top-level // error was encountered when doing the update. Error error // The altered quota entities. Entities []AlterClientQuotaEntity } // AlterClientQuotasResponse represents a response from a kafka broker to an alter client // quotas request. type AlterClientQuotasResponse struct { // The amount of time that the broker throttled the request. Throttle time.Duration // List of altered client quotas responses. Entries []AlterClientQuotaResponseQuotas } // AlterClientQuotas sends client quotas alteration request to a kafka broker and returns // the response. func (c *Client) AlterClientQuotas(ctx context.Context, req *AlterClientQuotasRequest) (*AlterClientQuotasResponse, error) { entries := make([]alterclientquotas.Entry, len(req.Entries)) for entryIdx, entry := range req.Entries { entities := make([]alterclientquotas.Entity, len(entry.Entities)) for entityIdx, entity := range entry.Entities { entities[entityIdx] = alterclientquotas.Entity{ EntityType: entity.EntityType, EntityName: entity.EntityName, } } ops := make([]alterclientquotas.Ops, len(entry.Ops)) for opsIdx, op := range entry.Ops { ops[opsIdx] = alterclientquotas.Ops{ Key: op.Key, Value: op.Value, Remove: op.Remove, } } entries[entryIdx] = alterclientquotas.Entry{ Entities: entities, Ops: ops, } } m, err := c.roundTrip(ctx, req.Addr, &alterclientquotas.Request{ Entries: entries, ValidateOnly: req.ValidateOnly, }) if err != nil { return nil, fmt.Errorf("kafka.(*Client).AlterClientQuotas: %w", err) } res := m.(*alterclientquotas.Response) responseEntries := make([]AlterClientQuotaResponseQuotas, len(res.Results)) for responseEntryIdx, responseEntry := range res.Results { responseEntities := make([]AlterClientQuotaEntity, len(responseEntry.Entities)) for responseEntityIdx, responseEntity := range responseEntry.Entities { responseEntities[responseEntityIdx] = AlterClientQuotaEntity{ EntityType: responseEntity.EntityType, EntityName: responseEntity.EntityName, } } responseEntries[responseEntryIdx] = AlterClientQuotaResponseQuotas{ Error: makeError(responseEntry.ErrorCode, responseEntry.ErrorMessage), Entities: responseEntities, } } ret := &AlterClientQuotasResponse{ Throttle: makeDuration(res.ThrottleTimeMs), Entries: responseEntries, } return ret, nil }