Skip to content

Commit

Permalink
Add CRC32 and Murmur2 balancers (segmentio#334)
Browse files Browse the repository at this point in the history
  • Loading branch information
Steve van Loben Sels authored Aug 9, 2019
1 parent b4c376a commit 18548e9
Show file tree
Hide file tree
Showing 3 changed files with 376 additions and 4 deletions.
37 changes: 33 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -231,20 +231,49 @@ w.Close()
**Note:** Even though kafka.Message contain ```Topic``` and ```Partition``` fields, they **MUST NOT** be
set when writing messages. They are intended for read use only.

### Compatibility with Sarama
### Compatibility with other clients

#### Sarama

If you're switching from Sarama and need/want to use the same algorithm for message
partitioning, you can use the ```kafka.Hash``` balancer. ```kafka.Hash``` routes
messages to the same partitions that sarama's default partitioner would route to.
messages to the same partitions that Sarama's default partitioner would route to.

```go
w := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"localhost:9092"},
Topic: "topic-A",
Brokers: []string{"localhost:9092"},
Topic: "topic-A",
Balancer: &kafka.Hash{},
})
```

#### librdkafka and confluent-kafka-go

Use the ```kafka.CRC32Balancer``` balancer to get the same behaviour as librdkafka's
default ```consistent_random``` partition strategy.

```go
w := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"localhost:9092"},
Topic: "topic-A",
Balancer: kafka.CRC32Balancer{},
})
```

#### Java

Use the ```kafka.Murmur2Balancer``` balancer to get the same behaviour as the canonical
Java client's default partitioner. Note: the Java class allows you to directly specify
the partition which is not permitted.

```go
w := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"localhost:9092"},
Topic: "topic-A",
Balancer: kafka.Murmur2Balancer{},
})
```

### Compression

Compression can be enabled on the `Writer` by configuring the `CompressionCodec`:
Expand Down
127 changes: 127 additions & 0 deletions balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package kafka

import (
"hash"
"hash/crc32"
"hash/fnv"
"math/rand"
"sort"
"sync"
)
Expand Down Expand Up @@ -158,3 +160,128 @@ func (h *Hash) Balance(msg Message, partitions ...int) (partition int) {

return
}

type randomBalancer struct {
mock int // mocked return value, used for testing
}

func (b randomBalancer) Balance(msg Message, partitions ...int) (partition int) {
if b.mock != 0 {
return b.mock
}
return partitions[rand.Int()%len(partitions)]
}

// CRC32Balancer is a Balancer that uses the CRC32 hash function to determine
// which partition to route messages to. This ensures that messages with the
// same key are routed to the same partition. This balancer is compatible with
// the built-in hash partitioners in librdkafka and the language bindings that
// are built on top of it, including the
// github.com/confluentinc/confluent-kafka-go Go package.
//
// With the Consistent field false (default), this partitioner is equivalent to
// the "consistent_random" setting in librdkafka. When Consistent is true, this
// partitioner is equivalent to the "consistent" setting. The latter will hash
// empty or nil keys into the same partition.
//
// Unless you are absolutely certain that all your messages will have keys, it's
// best to leave the Consistent flag off. Otherwise, you run the risk of
// creating a very hot partition.
type CRC32Balancer struct {
Consistent bool
random randomBalancer
}

func (b CRC32Balancer) Balance(msg Message, partitions ...int) (partition int) {
// NOTE: the crc32 balancers in librdkafka don't differentiate between nil
// and empty keys. both cases are treated as unset.
if len(msg.Key) == 0 && !b.Consistent {
return b.random.Balance(msg, partitions...)
}

idx := crc32.ChecksumIEEE(msg.Key) % uint32(len(partitions))
return partitions[idx]
}

// Murmur2Balancer is a Balancer that uses the Murmur2 hash function to
// determine which partition to route messages to. This ensures that messages
// with the same key are routed to the same partition. This balancer is
// compatible with the partitioner used by the Java library and by librdkafka's
// "murmur2" and "murmur2_random" partitioners. /
//
// With the Consistent field false (default), this partitioner is equivalent to
// the "murmur2_random" setting in librdkafka. When Consistent is true, this
// partitioner is equivalent to the "murmur2" setting. The latter will hash
// nil keys into the same partition. Empty, non-nil keys are always hashed to
// the same partition regardless of configuration.
//
// Unless you are absolutely certain that all your messages will have keys, it's
// best to leave the Consistent flag off. Otherwise, you run the risk of
// creating a very hot partition.
//
// Note that the librdkafka documentation states that the "murmur2_random" is
// functionally equivalent to the default Java partitioner. That's because the
// Java partitioner will use a round robin balancer instead of random on nil
// keys. We choose librdkafka's implementation because it arguably has a larger
// install base.
type Murmur2Balancer struct {
Consistent bool
random randomBalancer
}

func (b Murmur2Balancer) Balance(msg Message, partitions ...int) (partition int) {
// NOTE: the murmur2 balancers in java and librdkafka treat a nil key as
// non-existent while treating an empty slice as a defined value.
if msg.Key == nil && !b.Consistent {
return b.random.Balance(msg, partitions...)
}

idx := (murmur2(msg.Key) & 0x7fffffff) % uint32(len(partitions))
return partitions[idx]
}

// Go port of the Java library's murmur2 function.
// https://github.com/apache/kafka/blob/1.0/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L353
func murmur2(data []byte) uint32 {
length := len(data)
const (
seed uint32 = 0x9747b28c
// 'm' and 'r' are mixing constants generated offline.
// They're not really 'magic', they just happen to work well.
m = 0x5bd1e995
r = 24
)

// Initialize the hash to a random value
h := seed ^ uint32(length)
length4 := length / 4

for i := 0; i < length4; i++ {
i4 := i * 4
k := (uint32(data[i4+0]) & 0xff) + ((uint32(data[i4+1]) & 0xff) << 8) + ((uint32(data[i4+2]) & 0xff) << 16) + ((uint32(data[i4+3]) & 0xff) << 24)
k *= m
k ^= k >> r
k *= m
h *= m
h ^= k
}

// Handle the last few bytes of the input array
extra := length % 4
if extra >= 3 {
h ^= (uint32(data[(length & ^3)+2]) & 0xff) << 16
}
if extra >= 2 {
h ^= (uint32(data[(length & ^3)+1]) & 0xff) << 8
}
if extra >= 1 {
h ^= uint32(data[length & ^3]) & 0xff
h *= m
}

h ^= h >> 13
h *= m
h ^= h >> 15

return h
}
Loading

0 comments on commit 18548e9

Please sign in to comment.