Skip to content

Commit

Permalink
add LeastBytes balancer
Browse files Browse the repository at this point in the history
  • Loading branch information
Achille Roussel committed Jun 20, 2017
1 parent 7c60a03 commit 27321f0
Showing 1 changed file with 68 additions and 0 deletions.
68 changes: 68 additions & 0 deletions balancer.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package kafka

import "sort"

// The Balancer interface provides an abstraction of the message distribution
// logic used by Writer instances to route messages to the partitions available
// on a kafka cluster.
Expand All @@ -10,6 +12,11 @@ package kafka
type Balancer interface {
// Balance receives a message and a set of available partitions and
// returns the partition number that the message should be routed to.
//
// An application should refrain from using a balancer to manage multiple
// sets of partitions (from different topics for examples), use one balancer
// instance for each partition set, so the balancer can detect when the
// partitions change and assume that the kafka topic has been rebalanced.
Balance(msg Message, partitions ...int) (partition int)
}

Expand All @@ -35,3 +42,64 @@ func (rr *RoundRobin) Balance(msg Message, partitions ...int) int {
rr.offset++
return partitions[offset%length]
}

// LeastBytes is a Balancer implementation that routes messages to the partition
// that has received the least amount of data.
//
// Note that no coordination is done between multiple producers, having good
// balancing relies on the fact that each producer using a LeastBytes balancer
// should produce well balanced messages.
type LeastBytes struct {
counters []leastBytesCounter
}

type leastBytesCounter struct {
partition int
bytes uint64
}

// Balance satisfies the Balancer interface.
func (lb *LeastBytes) Balance(msg Message, partitions ...int) int {
for _, p := range partitions {
if c := lb.counterOf(p); c == nil {
lb.counters = lb.makeCounters(partitions...)
}
}

minBytes := lb.counters[0].bytes
minIndex := 0

for i, c := range lb.counters[1:] {
if c.bytes < minBytes {
minIndex = i + 1
minBytes = c.bytes
}
}

c := &lb.counters[minIndex]
c.bytes += uint64(len(msg.Key) + len(msg.Value))
return c.partition
}

func (lb *LeastBytes) counterOf(partition int) *leastBytesCounter {
i := sort.Search(len(lb.counters), func(i int) bool {
return lb.counters[i].partition >= partition
})
if i == len(lb.counters) || lb.counters[i].partition != partition {
return nil
}
return &lb.counters[i]
}

func (lb *LeastBytes) makeCounters(partitions ...int) (counters []leastBytesCounter) {
counters = make([]leastBytesCounter, len(partitions))

for i, p := range partitions {
counters[i].partition = p
}

sort.Slice(counters, func(i int, j int) bool {
return counters[i].partition < counters[j].partition
})
return
}

0 comments on commit 27321f0

Please sign in to comment.