From 18548e996817df17c7f4f9b88c73c3dcb41e4bdf Mon Sep 17 00:00:00 2001 From: Steve van Loben Sels Date: Fri, 9 Aug 2019 13:05:33 -0700 Subject: [PATCH] Add CRC32 and Murmur2 balancers (#334) --- README.md | 37 +++++++- balancer.go | 127 ++++++++++++++++++++++++++++ balancer_test.go | 216 +++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 376 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 4454a2d2a..614584c52 100644 --- a/README.md +++ b/README.md @@ -231,20 +231,49 @@ w.Close() **Note:** Even though kafka.Message contain ```Topic``` and ```Partition``` fields, they **MUST NOT** be set when writing messages. They are intended for read use only. -### Compatibility with Sarama +### Compatibility with other clients + +#### Sarama If you're switching from Sarama and need/want to use the same algorithm for message partitioning, you can use the ```kafka.Hash``` balancer. ```kafka.Hash``` routes -messages to the same partitions that sarama's default partitioner would route to. +messages to the same partitions that Sarama's default partitioner would route to. ```go w := kafka.NewWriter(kafka.WriterConfig{ - Brokers: []string{"localhost:9092"}, - Topic: "topic-A", + Brokers: []string{"localhost:9092"}, + Topic: "topic-A", Balancer: &kafka.Hash{}, }) ``` +#### librdkafka and confluent-kafka-go + +Use the ```kafka.CRC32Balancer``` balancer to get the same behaviour as librdkafka's +default ```consistent_random``` partition strategy. + +```go +w := kafka.NewWriter(kafka.WriterConfig{ + Brokers: []string{"localhost:9092"}, + Topic: "topic-A", + Balancer: kafka.CRC32Balancer{}, +}) +``` + +#### Java + +Use the ```kafka.Murmur2Balancer``` balancer to get the same behaviour as the canonical +Java client's default partitioner. Note: the Java class allows you to directly specify +the partition which is not permitted. + +```go +w := kafka.NewWriter(kafka.WriterConfig{ + Brokers: []string{"localhost:9092"}, + Topic: "topic-A", + Balancer: kafka.Murmur2Balancer{}, +}) +``` + ### Compression Compression can be enabled on the `Writer` by configuring the `CompressionCodec`: diff --git a/balancer.go b/balancer.go index cac92417e..219a61029 100644 --- a/balancer.go +++ b/balancer.go @@ -2,7 +2,9 @@ package kafka import ( "hash" + "hash/crc32" "hash/fnv" + "math/rand" "sort" "sync" ) @@ -158,3 +160,128 @@ func (h *Hash) Balance(msg Message, partitions ...int) (partition int) { return } + +type randomBalancer struct { + mock int // mocked return value, used for testing +} + +func (b randomBalancer) Balance(msg Message, partitions ...int) (partition int) { + if b.mock != 0 { + return b.mock + } + return partitions[rand.Int()%len(partitions)] +} + +// CRC32Balancer is a Balancer that uses the CRC32 hash function to determine +// which partition to route messages to. This ensures that messages with the +// same key are routed to the same partition. This balancer is compatible with +// the built-in hash partitioners in librdkafka and the language bindings that +// are built on top of it, including the +// github.com/confluentinc/confluent-kafka-go Go package. +// +// With the Consistent field false (default), this partitioner is equivalent to +// the "consistent_random" setting in librdkafka. When Consistent is true, this +// partitioner is equivalent to the "consistent" setting. The latter will hash +// empty or nil keys into the same partition. +// +// Unless you are absolutely certain that all your messages will have keys, it's +// best to leave the Consistent flag off. Otherwise, you run the risk of +// creating a very hot partition. +type CRC32Balancer struct { + Consistent bool + random randomBalancer +} + +func (b CRC32Balancer) Balance(msg Message, partitions ...int) (partition int) { + // NOTE: the crc32 balancers in librdkafka don't differentiate between nil + // and empty keys. both cases are treated as unset. + if len(msg.Key) == 0 && !b.Consistent { + return b.random.Balance(msg, partitions...) + } + + idx := crc32.ChecksumIEEE(msg.Key) % uint32(len(partitions)) + return partitions[idx] +} + +// Murmur2Balancer is a Balancer that uses the Murmur2 hash function to +// determine which partition to route messages to. This ensures that messages +// with the same key are routed to the same partition. This balancer is +// compatible with the partitioner used by the Java library and by librdkafka's +// "murmur2" and "murmur2_random" partitioners. / +// +// With the Consistent field false (default), this partitioner is equivalent to +// the "murmur2_random" setting in librdkafka. When Consistent is true, this +// partitioner is equivalent to the "murmur2" setting. The latter will hash +// nil keys into the same partition. Empty, non-nil keys are always hashed to +// the same partition regardless of configuration. +// +// Unless you are absolutely certain that all your messages will have keys, it's +// best to leave the Consistent flag off. Otherwise, you run the risk of +// creating a very hot partition. +// +// Note that the librdkafka documentation states that the "murmur2_random" is +// functionally equivalent to the default Java partitioner. That's because the +// Java partitioner will use a round robin balancer instead of random on nil +// keys. We choose librdkafka's implementation because it arguably has a larger +// install base. +type Murmur2Balancer struct { + Consistent bool + random randomBalancer +} + +func (b Murmur2Balancer) Balance(msg Message, partitions ...int) (partition int) { + // NOTE: the murmur2 balancers in java and librdkafka treat a nil key as + // non-existent while treating an empty slice as a defined value. + if msg.Key == nil && !b.Consistent { + return b.random.Balance(msg, partitions...) + } + + idx := (murmur2(msg.Key) & 0x7fffffff) % uint32(len(partitions)) + return partitions[idx] +} + +// Go port of the Java library's murmur2 function. +// https://github.com/apache/kafka/blob/1.0/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L353 +func murmur2(data []byte) uint32 { + length := len(data) + const ( + seed uint32 = 0x9747b28c + // 'm' and 'r' are mixing constants generated offline. + // They're not really 'magic', they just happen to work well. + m = 0x5bd1e995 + r = 24 + ) + + // Initialize the hash to a random value + h := seed ^ uint32(length) + length4 := length / 4 + + for i := 0; i < length4; i++ { + i4 := i * 4 + k := (uint32(data[i4+0]) & 0xff) + ((uint32(data[i4+1]) & 0xff) << 8) + ((uint32(data[i4+2]) & 0xff) << 16) + ((uint32(data[i4+3]) & 0xff) << 24) + k *= m + k ^= k >> r + k *= m + h *= m + h ^= k + } + + // Handle the last few bytes of the input array + extra := length % 4 + if extra >= 3 { + h ^= (uint32(data[(length & ^3)+2]) & 0xff) << 16 + } + if extra >= 2 { + h ^= (uint32(data[(length & ^3)+1]) & 0xff) << 8 + } + if extra >= 1 { + h ^= uint32(data[length & ^3]) & 0xff + h *= m + } + + h ^= h >> 13 + h *= m + h ^= h >> 15 + + return h +} diff --git a/balancer_test.go b/balancer_test.go index 230a56909..c0f49d0cf 100644 --- a/balancer_test.go +++ b/balancer_test.go @@ -1,6 +1,7 @@ package kafka import ( + "fmt" "hash" "hash/crc32" "testing" @@ -54,3 +55,218 @@ func TestHashBalancer(t *testing.T) { }) } } + +func TestCRC32Balancer(t *testing.T) { + // These tests are taken from the default "consistent_random" partitioner from + // https://github.com/edenhill/librdkafka/blob/master/tests/0048-partitioner.c + partitionCount := 17 + var partitions []int + for i := 0; i < partitionCount; i++ { + partitions = append(partitions, i*i) + } + + testCases := map[string]struct { + Key []byte + Partitions []int + Partition int + }{ + "nil": { + Key: nil, + Partitions: partitions, + Partition: -1, + }, + "empty": { + Key: []byte{}, + Partitions: partitions, + Partition: -1, + }, + "unaligned": { + Key: []byte("23456"), + Partitions: partitions, + Partition: partitions[0xb1b451d7%partitionCount], + }, + "long key": { + Key: []byte("this is another string with more length to it perhaps"), + Partitions: partitions, + Partition: partitions[0xb0150df7%partitionCount], + }, + "short key": { + Key: []byte("hejsan"), + Partitions: partitions, + Partition: partitions[0xd077037e%partitionCount], + }, + } + + t.Run("default", func(t *testing.T) { + for label, test := range testCases { + t.Run(label, func(t *testing.T) { + b := CRC32Balancer{} + b.random.mock = -1 + + msg := Message{Key: test.Key} + partition := b.Balance(msg, test.Partitions...) + if partition != test.Partition { + t.Errorf("expected %v; got %v", test.Partition, partition) + } + }) + } + }) + + t.Run("consistent", func(t *testing.T) { + b := CRC32Balancer{Consistent: true} + b.random.mock = -1 + + p := b.Balance(Message{}, partitions...) + if p < 0 { + t.Fatal("should not have gotten a random partition") + } + for i := 0; i < 10; i++ { + if p != b.Balance(Message{}, partitions...) { + t.Fatal("nil key should always hash consistently") + } + if p != b.Balance(Message{Key: []byte{}}, partitions...) { + t.Fatal("empty key should always hash consistently and have same result as nil key") + } + } + }) +} + +func TestMurmur2(t *testing.T) { + // These tests are taken from the "murmur2" implementation from + // https://github.com/edenhill/librdkafka/blob/master/src/rdmurmur2.c + testCases := []struct { + Key []byte + JavaMurmur2Result uint32 + }{ + {Key: []byte("kafka"), JavaMurmur2Result: 0xd067cf64}, + {Key: []byte("giberish123456789"), JavaMurmur2Result: 0x8f552b0c}, + {Key: []byte("1234"), JavaMurmur2Result: 0x9fc97b14}, + {Key: []byte("234"), JavaMurmur2Result: 0xe7c009ca}, + {Key: []byte("34"), JavaMurmur2Result: 0x873930da}, + {Key: []byte("4"), JavaMurmur2Result: 0x5a4b5ca1}, + {Key: []byte("PreAmbleWillBeRemoved,ThePrePartThatIs"), JavaMurmur2Result: 0x78424f1c}, + {Key: []byte("reAmbleWillBeRemoved,ThePrePartThatIs"), JavaMurmur2Result: 0x4a62b377}, + {Key: []byte("eAmbleWillBeRemoved,ThePrePartThatIs"), JavaMurmur2Result: 0xe0e4e09e}, + {Key: []byte("AmbleWillBeRemoved,ThePrePartThatIs"), JavaMurmur2Result: 0x62b8b43f}, + {Key: []byte(""), JavaMurmur2Result: 0x106e08d9}, + {Key: nil, JavaMurmur2Result: 0x106e08d9}, + } + + for _, test := range testCases { + t.Run(fmt.Sprintf("key:%s", test.Key), func(t *testing.T) { + got := murmur2(test.Key) + if got != test.JavaMurmur2Result { + t.Errorf("expected %v; got %v", test.JavaMurmur2Result, got) + } + }) + } +} + +func TestMurmur2Balancer(t *testing.T) { + // These tests are taken from the "murmur2_random" partitioner from + // https://github.com/edenhill/librdkafka/blob/master/tests/0048-partitioner.c + partitionCount := 17 + librdkafkaPartitions := make([]int, partitionCount) + for i := 0; i < partitionCount; i++ { + librdkafkaPartitions[i] = i * i + } + + // These tests are taken from the Murmur2Partitioner Python class from + // https://github.com/dpkp/kafka-python/blob/master/test/test_partitioner.py + pythonPartitions := make([]int, 1000) + for i := 0; i < 1000; i++ { + pythonPartitions[i] = i + } + + testCases := map[string]struct { + Key []byte + Partitions []int + Partition int + }{ + "librdkafka-nil": { + Key: nil, + Partitions: librdkafkaPartitions, + Partition: 123, + }, + "librdkafka-empty": { + Key: []byte{}, + Partitions: librdkafkaPartitions, + Partition: librdkafkaPartitions[0x106e08d9%partitionCount], + }, + "librdkafka-unaligned": { + Key: []byte("23456"), + Partitions: librdkafkaPartitions, + Partition: librdkafkaPartitions[0x058d780f%partitionCount], + }, + "librdkafka-long key": { + Key: []byte("this is another string with more length to it perhaps"), + Partitions: librdkafkaPartitions, + Partition: librdkafkaPartitions[0x4f7703da%partitionCount], + }, + "librdkafka-short key": { + Key: []byte("hejsan"), + Partitions: librdkafkaPartitions, + Partition: librdkafkaPartitions[0x5ec19395%partitionCount], + }, + "python-empty": { + Key: []byte(""), + Partitions: pythonPartitions, + Partition: 681, + }, + "python-a": { + Key: []byte("a"), + Partitions: pythonPartitions, + Partition: 524, + }, + "python-ab": { + Key: []byte("ab"), + Partitions: pythonPartitions, + Partition: 434, + }, + "python-abc": { + Key: []byte("abc"), + Partitions: pythonPartitions, + Partition: 107, + }, + "python-123456789": { + Key: []byte("123456789"), + Partitions: pythonPartitions, + Partition: 566, + }, + "python-\x00 ": { + Key: []byte{0, 32}, + Partitions: pythonPartitions, + Partition: 742, + }, + } + + t.Run("default", func(t *testing.T) { + for label, test := range testCases { + t.Run(label, func(t *testing.T) { + b := Murmur2Balancer{} + b.random.mock = 123 + + msg := Message{Key: test.Key} + partition := b.Balance(msg, test.Partitions...) + if partition != test.Partition { + t.Errorf("expected %v; got %v", test.Partition, partition) + } + }) + } + }) + + t.Run("consistent", func(t *testing.T) { + b := Murmur2Balancer{Consistent: true} + b.random.mock = -1 + + p := b.Balance(Message{}, librdkafkaPartitions...) + if p < 0 { + t.Fatal("should not have gotten a random partition") + } + for i := 0; i < 10; i++ { + if p != b.Balance(Message{}, librdkafkaPartitions...) { + t.Fatal("nil key should always hash consistently") + } + } + }) +}