Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Message writer for record batches #163

Merged
merged 26 commits into from
Jan 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
55ba9d6
Add branching for record batch writer
VictorDenisov Dec 2, 2018
1c90763
Make key and value from headers public
VictorDenisov Dec 2, 2018
a8cd851
Add writeVarInt function
VictorDenisov Dec 3, 2018
d18779b
Implement writeRecord method
VictorDenisov Dec 4, 2018
582f1d1
Write record batch method
VictorDenisov Dec 9, 2018
29674e8
Use zigzag encoding for varint
VictorDenisov Dec 10, 2018
0784aa5
Drop logging
VictorDenisov Dec 18, 2018
faddbdc
Add unit test for v2 record batch
VictorDenisov Dec 19, 2018
a57d973
Remove commented out code
VictorDenisov Jan 6, 2019
57693cf
Use int constants from math package
VictorDenisov Jan 6, 2019
a96cefa
Use exact record size instead of extra buffer
VictorDenisov Jan 12, 2019
60e7246
Give a simpler name to calcVarIntLen
VictorDenisov Jan 15, 2019
8128763
Move error check out of if else block
VictorDenisov Jan 15, 2019
fc101c3
Extract message set size in a function
VictorDenisov Jan 15, 2019
eca66e2
Give calcRecordSize a shorter name
VictorDenisov Jan 15, 2019
f4bc280
Protect recordSize from confusing order of params
VictorDenisov Jan 15, 2019
3da360b
Use recordSize intead of estimatedRecordSize
VictorDenisov Jan 15, 2019
d4b332e
Extract record batch size
VictorDenisov Jan 15, 2019
8c48777
Use record size from calculation
VictorDenisov Jan 18, 2019
36e6a36
Assign offsets to messages in record batch
VictorDenisov Jan 21, 2019
8798908
Write message set straight to output writer
VictorDenisov Jan 21, 2019
6464be6
Write record batch straight to output writer
VictorDenisov Jan 21, 2019
60de28c
Get rid of remainder buffer
VictorDenisov Jan 21, 2019
e22b69b
Remove commented out code
VictorDenisov Jan 23, 2019
d2cd545
Use bytes.Equal for comparing slices of bytes
VictorDenisov Jan 23, 2019
93017f4
Compress messages before calculating size
VictorDenisov Jan 27, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type Message struct {
Offset int64
Key []byte
Value []byte
Headers []Header

// If not set at the creation, Time will be automatically set when
// writing the message.
Expand Down Expand Up @@ -251,3 +252,8 @@ func extractOffset(base int64, msgSet []byte) (offset int64, err error) {
offset = base - offset
return
}

type Header struct {
Key string
Value []byte
}
257 changes: 235 additions & 22 deletions write.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"bytes"
"encoding/binary"
"fmt"
"hash/crc32"
"math"
"time"
)

Expand Down Expand Up @@ -45,6 +47,25 @@ func writeInt64(w *bufio.Writer, i int64) {
w.WriteByte(b[7])
}

func writeVarInt(w *bufio.Writer, i int64) {
i = i<<1 ^ i>>63
for i&0x7f != i {
w.WriteByte(byte(i&0x7f | 0x80))
i >>= 7
}
w.WriteByte(byte(i))
}

func varIntLen(i int64) (l int) {
i = i<<1 ^ i>>63
for i&0x7f != i {
l++
i >>= 7
}
l++
return l
}

func writeString(w *bufio.Writer, s string) {
writeInt16(w, int16(len(s)))
w.WriteString(s)
Expand Down Expand Up @@ -179,29 +200,30 @@ func writeListOffsetRequestV1(w *bufio.Writer, correlationID int32, clientID, to
return w.Flush()
}

func writeProduceRequestV2(w *bufio.Writer, codec CompressionCodec, correlationID int32, clientID, topic string, partition int32, timeout time.Duration, requiredAcks int16, msgs ...Message) error {
var size int32
attributes := int8(CompressionNoneCode)

// if compressing, replace the slice of messages with a single compressed
// message set.
if codec != nil {
var err error
if msgs, err = compress(codec, msgs...); err != nil {
return err
func hasHeaders(msgs ...Message) bool {
for _, msg := range msgs {
if len(msg.Headers) > 0 {
return true
}
attributes = codec.Code()
}
return false
}

for _, msg := range msgs {
size += 8 + // offset
4 + // message size
4 + // crc
1 + // magic byte
1 + // attributes
8 + // timestamp
sizeofBytes(msg.Key) +
sizeofBytes(msg.Value)
func writeProduceRequestV2(w *bufio.Writer, codec CompressionCodec, correlationID int32, clientID, topic string, partition int32, timeout time.Duration, requiredAcks int16, msgs ...Message) (err error) {

attributes := int8(CompressionNoneCode)

var size int32
if hasHeaders(msgs...) {
size = recordBatchSize(msgs...)
} else {
if codec != nil {
if msgs, err = compress(codec, msgs...); err != nil {
return err
}
attributes = codec.Code()
}
size = messageSetSize(msgs...)
}

h := requestHeader{
Expand Down Expand Up @@ -233,11 +255,176 @@ func writeProduceRequestV2(w *bufio.Writer, codec CompressionCodec, correlationI
writeInt32(w, partition)
writeInt32(w, size)

if hasHeaders(msgs...) {
err = writeRecordBatch(w, codec, correlationID, clientID, topic, partition, timeout, requiredAcks, msgs...)
} else {
for _, msg := range msgs {
writeMessage(w, msg.Offset, attributes, msg.Time, msg.Key, msg.Value)
}

}
if err != nil {
return
}
return w.Flush()
}

func messageSetSize(msgs ...Message) (size int32) {
for _, msg := range msgs {
writeMessage(w, msg.Offset, attributes, msg.Time, msg.Key, msg.Value)
size += 8 + // offset
4 + // message size
4 + // crc
1 + // magic byte
1 + // attributes
8 + // timestamp
sizeofBytes(msg.Key) +
sizeofBytes(msg.Value)
}
return
}

return w.Flush()
func recordBatchSize(msgs ...Message) (size int32) {
size = 8 + // base offset
4 + // batch length
4 + // partition leader epoch
1 + // magic
4 + // crc
2 + // attributes
4 + // last offset delta
8 + // first timestamp
8 + // max timestamp
8 + // producer id
2 + // producer epoch
4 + // base sequence
4 // msg count

baseTime := msgs[0].Time

baseOffset := baseOffset(msgs...)

for _, msg := range msgs {

sz := recordSize(&msg, msg.Time.Sub(baseTime), msg.Offset-baseOffset)

size += int32(sz + varIntLen(int64(sz)))
}
return
}

func writeRecordBatch(w *bufio.Writer, codec CompressionCodec, correlationID int32, clientId, topic string, partition int32, timeout time.Duration, requiredAcks int16, msgs ...Message) error {

baseTime := msgs[0].Time

baseOffset := int64(0)

for i := 0; i < len(msgs); i++ {
msgs[i].Offset = int64(i)
}

size := recordBatchSize(msgs...)

writeInt64(w, baseOffset)

writeInt32(w, int32(size-12)) // 12 = batch length + base offset sizes

writeInt32(w, -1) // partition leader epoch
writeInt8(w, 2) // magic byte

crcBuf := &bytes.Buffer{}
crcBuf.Grow(int(size - 12)) // 12 = batch length + base offset sizes
crcWriter := bufio.NewWriter(crcBuf)

writeInt16(crcWriter, 0) // attributes no compression, timestamp type 0 - create time, not part of a transaction, no control messages
writeInt32(crcWriter, int32(len(msgs)-1)) // max offset
writeInt64(crcWriter, timestamp(baseTime))
lastTime := timestamp(msgs[len(msgs)-1].Time)
writeInt64(crcWriter, int64(lastTime))
writeInt64(crcWriter, -1) // default producer id for now
writeInt16(crcWriter, -1) // default producer epoch for now
writeInt32(crcWriter, -1) // default base sequence
writeInt32(crcWriter, int32(len(msgs))) // record count

for _, msg := range msgs {
writeRecord(crcWriter, CompressionNoneCode, baseTime, baseOffset, msg)
}
crcWriter.Flush()

crcTable := crc32.MakeTable(crc32.Castagnoli)
crcChecksum := crc32.Checksum(crcBuf.Bytes(), crcTable)

writeInt32(w, int32(crcChecksum))
w.Write(crcBuf.Bytes())

return nil
}

var maxDate time.Time = time.Date(5000, time.January, 0, 0, 0, 0, 0, time.UTC)

func baseTime(msgs ...Message) (baseTime time.Time) {
baseTime = maxDate
for _, msg := range msgs {
if msg.Time.Before(baseTime) {
baseTime = msg.Time
}
}
return
}

func baseOffset(msgs ...Message) (baseOffset int64) {
baseOffset = math.MaxInt64
for _, msg := range msgs {
if msg.Offset < baseOffset {
baseOffset = msg.Offset
}
}
return
}

func maxOffsetDelta(baseOffset int64, msgs ...Message) (maxDelta int64) {
maxDelta = 0
for _, msg := range msgs {
curDelta := msg.Offset - baseOffset
if maxDelta > curDelta {
maxDelta = curDelta
}
}
return
}

func estimatedRecordSize(msg *Message) (size int32) {
size += 8 + // length
1 + // attributes
8 + // timestamp delta
8 + // offset delta
8 + // key length
int32(len(msg.Key)) +
8 + // value length
int32(len(msg.Value))
for _, h := range msg.Headers {
size += 8 + // header key length
int32(len(h.Key)) +
8 + // header value length
int32(len(h.Value))
}
return
}

func recordSize(msg *Message, timestampDelta time.Duration, offsetDelta int64) (size int) {
size += 1 + // attributes
varIntLen(int64(timestampDelta)) +
varIntLen(offsetDelta) +
varIntLen(int64(len(msg.Key))) +
len(msg.Key) +
varIntLen(int64(len(msg.Value))) +
len(msg.Value) +
varIntLen(int64(len(msg.Headers)))
for _, h := range msg.Headers {
size += varIntLen(int64(len([]byte(h.Key)))) +
len([]byte(h.Key)) +
varIntLen(int64(len(h.Value))) +
len(h.Value)
}
return
}

func compress(codec CompressionCodec, msgs ...Message) ([]Message, error) {
Expand Down Expand Up @@ -286,3 +473,29 @@ func msgSize(key, value []byte) int32 {
sizeofBytes(key) +
sizeofBytes(value)
}

// Messages with magic >2 are called records. This method writes messages using message format 2.
func writeRecord(w *bufio.Writer, attributes int8, baseTime time.Time, baseOffset int64, msg Message) {

timestampDelta := msg.Time.Sub(baseTime)
offsetDelta := int64(msg.Offset - baseOffset)

writeVarInt(w, int64(recordSize(&msg, timestampDelta, offsetDelta)))

writeInt8(w, attributes)
writeVarInt(w, int64(timestampDelta))
writeVarInt(w, offsetDelta)

writeVarInt(w, int64(len(msg.Key)))
w.Write(msg.Key)
writeVarInt(w, int64(len(msg.Value)))
w.Write(msg.Value)
writeVarInt(w, int64(len(msg.Headers)))

for _, h := range msg.Headers {
writeVarInt(w, int64(len(h.Key)))
w.Write([]byte(h.Key))
writeVarInt(w, int64(len(h.Value)))
w.Write(h.Value)
}
}
54 changes: 52 additions & 2 deletions write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package kafka
import (
"bufio"
"bytes"
"context"
"fmt"
"testing"
"time"
)
Expand All @@ -14,6 +16,34 @@ const (
testPartition = 42
)

type WriteVarIntTestCase struct {
v []byte
tc int64
}

func TestWriteVarInt(t *testing.T) {
testCases := []*WriteVarIntTestCase{
&WriteVarIntTestCase{v: []byte{0}, tc: 0},
&WriteVarIntTestCase{v: []byte{2}, tc: 1},
&WriteVarIntTestCase{v: []byte{1}, tc: -1},
&WriteVarIntTestCase{v: []byte{3}, tc: -2},
&WriteVarIntTestCase{v: []byte{128, 2}, tc: 128},
&WriteVarIntTestCase{v: []byte{254, 1}, tc: 127},
&WriteVarIntTestCase{v: []byte{142, 6}, tc: 391},
&WriteVarIntTestCase{v: []byte{142, 134, 6}, tc: 49543},
}

for _, tc := range testCases {
buf := &bytes.Buffer{}
bufWriter := bufio.NewWriter(buf)
writeVarInt(bufWriter, tc.tc)
bufWriter.Flush()
if !bytes.Equal(buf.Bytes(), tc.v) {
t.Errorf("Expected %v; got %v", tc.v, buf.Bytes())
}
}
}

func TestWriteOptimizations(t *testing.T) {
t.Parallel()
t.Run("writeFetchRequestV2", testWriteFetchRequestV2)
Expand Down Expand Up @@ -108,8 +138,7 @@ func testWriteProduceRequestV2(t *testing.T) {
TopicName: testTopic,
Partitions: []produceRequestPartitionV2{{
Partition: testPartition,
MessageSetSize: msg.size(),
MessageSet: messageSet{msg},
MessageSetSize: msg.size(), MessageSet: messageSet{msg},
}},
}},
},
Expand Down Expand Up @@ -160,3 +189,24 @@ func testWriteOptimization(t *testing.T, h requestHeader, r request, f func(*buf
}
}
}

func testWriteV2RecordBatch(t *testing.T) {
msgs := make([]Message, 3)
for i := range msgs {
value := fmt.Sprintf("Sample message content: %d!", i)
msgs[i] = Message{Key: []byte("Key"), Value: []byte(value), Headers: []Header{Header{Key: "hk", Value: []byte("hv")}}}
}
w := NewWriter(WriterConfig{
Brokers: []string{"localhost:9092"},
Topic: "test-topic",
BatchSize: 1,
})
defer w.Close()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := w.WriteMessages(ctx, msgs...); err != nil {
t.Error("Failed to write v2 messages to kafka")
return
}
}