forked from segmentio/kafka-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgroupbalancer.go
339 lines (292 loc) · 10.1 KB
/
groupbalancer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
package kafka
import (
"sort"
)
// GroupMember describes a single participant in a consumer group.
type GroupMember struct {
// ID is the unique ID for this member as taken from the JoinGroup response.
ID string
// Topics is a list of topics that this member is consuming.
Topics []string
// UserData contains any information that the GroupBalancer sent to the
// consumer group coordinator.
UserData []byte
}
// GroupMemberAssignments holds MemberID => topic => partitions
type GroupMemberAssignments map[string]map[string][]int
// GroupBalancer encapsulates the client side rebalancing logic
type GroupBalancer interface {
// ProtocolName of the GroupBalancer
ProtocolName() string
// UserData provides the GroupBalancer an opportunity to embed custom
// UserData into the metadata.
//
// Will be used by JoinGroup to begin the consumer group handshake.
//
// See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-JoinGroupRequest
UserData() ([]byte, error)
// DefineMemberships returns which members will be consuming
// which topic partitions
AssignGroups(members []GroupMember, partitions []Partition) GroupMemberAssignments
}
// RangeGroupBalancer groups consumers by partition
//
// Example: 5 partitions, 2 consumers
// C0: [0, 1, 2]
// C1: [3, 4]
//
// Example: 6 partitions, 3 consumers
// C0: [0, 1]
// C1: [2, 3]
// C2: [4, 5]
//
type RangeGroupBalancer struct{}
func (r RangeGroupBalancer) ProtocolName() string {
return "range"
}
func (r RangeGroupBalancer) UserData() ([]byte, error) {
return nil, nil
}
func (r RangeGroupBalancer) AssignGroups(members []GroupMember, topicPartitions []Partition) GroupMemberAssignments {
groupAssignments := GroupMemberAssignments{}
membersByTopic := findMembersByTopic(members)
for topic, members := range membersByTopic {
partitions := findPartitions(topic, topicPartitions)
partitionCount := len(partitions)
memberCount := len(members)
for memberIndex, member := range members {
assignmentsByTopic, ok := groupAssignments[member.ID]
if !ok {
assignmentsByTopic = map[string][]int{}
groupAssignments[member.ID] = assignmentsByTopic
}
minIndex := memberIndex * partitionCount / memberCount
maxIndex := (memberIndex + 1) * partitionCount / memberCount
for partitionIndex, partition := range partitions {
if partitionIndex >= minIndex && partitionIndex < maxIndex {
assignmentsByTopic[topic] = append(assignmentsByTopic[topic], partition)
}
}
}
}
return groupAssignments
}
// RoundrobinGroupBalancer divides partitions evenly among consumers
//
// Example: 5 partitions, 2 consumers
// C0: [0, 2, 4]
// C1: [1, 3]
//
// Example: 6 partitions, 3 consumers
// C0: [0, 3]
// C1: [1, 4]
// C2: [2, 5]
//
type RoundRobinGroupBalancer struct{}
func (r RoundRobinGroupBalancer) ProtocolName() string {
return "roundrobin"
}
func (r RoundRobinGroupBalancer) UserData() ([]byte, error) {
return nil, nil
}
func (r RoundRobinGroupBalancer) AssignGroups(members []GroupMember, topicPartitions []Partition) GroupMemberAssignments {
groupAssignments := GroupMemberAssignments{}
membersByTopic := findMembersByTopic(members)
for topic, members := range membersByTopic {
partitionIDs := findPartitions(topic, topicPartitions)
memberCount := len(members)
for memberIndex, member := range members {
assignmentsByTopic, ok := groupAssignments[member.ID]
if !ok {
assignmentsByTopic = map[string][]int{}
groupAssignments[member.ID] = assignmentsByTopic
}
for partitionIndex, partition := range partitionIDs {
if (partitionIndex % memberCount) == memberIndex {
assignmentsByTopic[topic] = append(assignmentsByTopic[topic], partition)
}
}
}
}
return groupAssignments
}
// RackAffinityGroupBalancer makes a best effort to pair up consumers with
// partitions whose leader is in the same rack. This strategy can have
// performance benefits by minimizing round trip latency between the consumer
// and the broker. In environments where network traffic across racks incurs
// charges (such as cross AZ data transfer in AWS), this strategy is also a cost
// optimization measure because it keeps network traffic within the local rack
// where possible.
//
// The primary objective is to spread partitions evenly across consumers with a
// secondary focus on maximizing the number of partitions where the leader and
// the consumer are in the same rack. For best affinity, it's recommended to
// have a balanced spread of consumers and partition leaders across racks.
//
// This balancer requires Kafka version 0.10.0.0+ or later. Earlier versions do
// not return the brokers' racks in the metadata request.
type RackAffinityGroupBalancer struct {
// Rack is the name of the rack where this consumer is running. It will be
// communicated to the consumer group leader via the UserData so that
// assignments can be made with affinity to the partition leader.
Rack string
}
func (r RackAffinityGroupBalancer) ProtocolName() string {
return "rack-affinity"
}
func (r RackAffinityGroupBalancer) AssignGroups(members []GroupMember, partitions []Partition) GroupMemberAssignments {
membersByTopic := make(map[string][]GroupMember)
for _, m := range members {
for _, t := range m.Topics {
membersByTopic[t] = append(membersByTopic[t], m)
}
}
partitionsByTopic := make(map[string][]Partition)
for _, p := range partitions {
partitionsByTopic[p.Topic] = append(partitionsByTopic[p.Topic], p)
}
assignments := GroupMemberAssignments{}
for topic := range membersByTopic {
topicAssignments := r.assignTopic(membersByTopic[topic], partitionsByTopic[topic])
for member, parts := range topicAssignments {
memberAssignments, ok := assignments[member]
if !ok {
memberAssignments = make(map[string][]int)
assignments[member] = memberAssignments
}
memberAssignments[topic] = parts
}
}
return assignments
}
func (r RackAffinityGroupBalancer) UserData() ([]byte, error) {
return []byte(r.Rack), nil
}
func (r *RackAffinityGroupBalancer) assignTopic(members []GroupMember, partitions []Partition) map[string][]int {
zonedPartitions := make(map[string][]int)
for _, part := range partitions {
zone := part.Leader.Rack
zonedPartitions[zone] = append(zonedPartitions[zone], part.ID)
}
zonedConsumers := make(map[string][]string)
for _, member := range members {
zone := string(member.UserData)
zonedConsumers[zone] = append(zonedConsumers[zone], member.ID)
}
targetPerMember := len(partitions) / len(members)
remainder := len(partitions) % len(members)
assignments := make(map[string][]int)
// assign as many as possible in zone. this will assign up to partsPerMember
// to each consumer. it will also prefer to allocate remainder partitions
// in zone if possible.
for zone, parts := range zonedPartitions {
consumers := zonedConsumers[zone]
if len(consumers) == 0 {
continue
}
// don't over-allocate. cap partition assignments at the calculated
// target.
partsPerMember := len(parts) / len(consumers)
if partsPerMember > targetPerMember {
partsPerMember = targetPerMember
}
for _, consumer := range consumers {
assignments[consumer] = append(assignments[consumer], parts[:partsPerMember]...)
parts = parts[partsPerMember:]
}
// if we had enough partitions for each consumer in this zone to hit its
// target, attempt to use any leftover partitions to satisfy the total
// remainder by adding at most 1 partition per consumer.
leftover := len(parts)
if partsPerMember == targetPerMember {
if leftover > remainder {
leftover = remainder
}
if leftover > len(consumers) {
leftover = len(consumers)
}
remainder -= leftover
}
// this loop covers the case where we're assigning extra partitions or
// if there weren't enough to satisfy the targetPerMember and the zoned
// partitions didn't divide evenly.
for i := 0; i < leftover; i++ {
assignments[consumers[i]] = append(assignments[consumers[i]], parts[i])
}
parts = parts[leftover:]
if len(parts) == 0 {
delete(zonedPartitions, zone)
} else {
zonedPartitions[zone] = parts
}
}
// assign out remainders regardless of zone.
var remaining []int
for _, partitions := range zonedPartitions {
remaining = append(remaining, partitions...)
}
for _, member := range members {
assigned := assignments[member.ID]
delta := targetPerMember - len(assigned)
// if it were possible to assign the remainder in zone, it's been taken
// care of already. now we will portion out any remainder to a member
// that can take it.
if delta >= 0 && remainder > 0 {
delta++
remainder--
}
if delta > 0 {
assignments[member.ID] = append(assigned, remaining[:delta]...)
remaining = remaining[delta:]
}
}
return assignments
}
// findPartitions extracts the partition ids associated with the topic from the
// list of Partitions provided
func findPartitions(topic string, partitions []Partition) []int {
var ids []int
for _, partition := range partitions {
if partition.Topic == topic {
ids = append(ids, partition.ID)
}
}
return ids
}
// findMembersByTopic groups the memberGroupMetadata by topic
func findMembersByTopic(members []GroupMember) map[string][]GroupMember {
membersByTopic := map[string][]GroupMember{}
for _, member := range members {
for _, topic := range member.Topics {
membersByTopic[topic] = append(membersByTopic[topic], member)
}
}
// normalize ordering of members to enabling grouping across topics by partitions
//
// Want:
// C0 [T0/P0, T1/P0]
// C1 [T0/P1, T1/P1]
//
// Not:
// C0 [T0/P0, T1/P1]
// C1 [T0/P1, T1/P0]
//
// Even though the later is still round robin, the partitions are crossed
//
for _, members := range membersByTopic {
sort.Slice(members, func(i, j int) bool {
return members[i].ID < members[j].ID
})
}
return membersByTopic
}
// findGroupBalancer returns the GroupBalancer with the specified protocolName
// from the slice provided
func findGroupBalancer(protocolName string, balancers []GroupBalancer) (GroupBalancer, bool) {
for _, balancer := range balancers {
if balancer.ProtocolName() == protocolName {
return balancer, true
}
}
return nil, false
}