forked from segmentio/kafka-go
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathincrementalalterconfigs.go
133 lines (108 loc) · 3.57 KB
/
incrementalalterconfigs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package kafka
import (
"context"
"net"
"github.com/segmentio/kafka-go/protocol/incrementalalterconfigs"
)
type ConfigOperation int8
const (
ConfigOperationSet ConfigOperation = 0
ConfigOperationDelete ConfigOperation = 1
ConfigOperationAppend ConfigOperation = 2
ConfigOperationSubtract ConfigOperation = 3
)
// IncrementalAlterConfigsRequest is a request to the IncrementalAlterConfigs API.
type IncrementalAlterConfigsRequest struct {
// Addr is the address of the kafka broker to send the request to.
Addr net.Addr
// Resources contains the list of resources to update configs for.
Resources []IncrementalAlterConfigsRequestResource
// ValidateOnly indicates whether Kafka should validate the changes without actually
// applying them.
ValidateOnly bool
}
// IncrementalAlterConfigsRequestResource contains the details of a single resource type whose
// configs should be altered.
type IncrementalAlterConfigsRequestResource struct {
// ResourceType is the type of resource to update.
ResourceType ResourceType
// ResourceName is the name of the resource to update (i.e., topic name or broker ID).
ResourceName string
// Configs contains the list of config key/values to update.
Configs []IncrementalAlterConfigsRequestConfig
}
// IncrementalAlterConfigsRequestConfig describes a single config key/value pair that should
// be altered.
type IncrementalAlterConfigsRequestConfig struct {
// Name is the name of the config.
Name string
// Value is the value to set for this config.
Value string
// ConfigOperation indicates how this config should be updated (e.g., add, delete, etc.).
ConfigOperation ConfigOperation
}
// IncrementalAlterConfigsResponse is a response from the IncrementalAlterConfigs API.
type IncrementalAlterConfigsResponse struct {
// Resources contains details of each resource config that was updated.
Resources []IncrementalAlterConfigsResponseResource
}
// IncrementalAlterConfigsResponseResource contains the response details for a single resource
// whose configs were updated.
type IncrementalAlterConfigsResponseResource struct {
// Error is set to a non-nil value if an error occurred while updating this specific
// config.
Error error
// ResourceType is the type of resource that was updated.
ResourceType ResourceType
// ResourceName is the name of the resource that was updated.
ResourceName string
}
func (c *Client) IncrementalAlterConfigs(
ctx context.Context,
req *IncrementalAlterConfigsRequest,
) (*IncrementalAlterConfigsResponse, error) {
apiReq := &incrementalalterconfigs.Request{
ValidateOnly: req.ValidateOnly,
}
for _, res := range req.Resources {
apiRes := incrementalalterconfigs.RequestResource{
ResourceType: int8(res.ResourceType),
ResourceName: res.ResourceName,
}
for _, config := range res.Configs {
apiRes.Configs = append(
apiRes.Configs,
incrementalalterconfigs.RequestConfig{
Name: config.Name,
Value: config.Value,
ConfigOperation: int8(config.ConfigOperation),
},
)
}
apiReq.Resources = append(
apiReq.Resources,
apiRes,
)
}
protoResp, err := c.roundTrip(
ctx,
req.Addr,
apiReq,
)
if err != nil {
return nil, err
}
resp := &IncrementalAlterConfigsResponse{}
apiResp := protoResp.(*incrementalalterconfigs.Response)
for _, res := range apiResp.Responses {
resp.Resources = append(
resp.Resources,
IncrementalAlterConfigsResponseResource{
Error: makeError(res.ErrorCode, res.ErrorMessage),
ResourceType: ResourceType(res.ResourceType),
ResourceName: res.ResourceName,
},
)
}
return resp, nil
}