Skip to content

Commit

Permalink
Added compression support for RecordBatch (#234)
Browse files Browse the repository at this point in the history
Includes upgrading to Produce Request v3 when supported.
  • Loading branch information
stevevls authored Mar 16, 2019
1 parent 1f95bef commit 1c2c0d0
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 103 deletions.
13 changes: 13 additions & 0 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -980,6 +980,19 @@ func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message)
func(deadline time.Time, id int32) error {
now := time.Now()
deadline = adjustDeadlineForRTT(deadline, now, defaultRTT)
if c.apiVersions[produceRequest].MaxVersion >= 3 {
return writeProduceRequestV3(
&c.wbuf,
codec,
id,
c.clientID,
c.topic,
c.partition,
deadlineToTimeout(deadline, now),
int16(atomic.LoadInt32(&c.requiredAcks)),
msgs...,
)
}
return writeProduceRequestV2(
&c.wbuf,
codec,
Expand Down
195 changes: 96 additions & 99 deletions write.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"encoding/binary"
"fmt"
"hash/crc32"
"math"
"time"
)

Expand Down Expand Up @@ -242,31 +241,16 @@ func writeListOffsetRequestV1(w *bufio.Writer, correlationID int32, clientID, to
return w.Flush()
}

func hasHeaders(msgs ...Message) bool {
for _, msg := range msgs {
if len(msg.Headers) > 0 {
return true
}
}
return false
}

func writeProduceRequestV2(w *bufio.Writer, codec CompressionCodec, correlationID int32, clientID, topic string, partition int32, timeout time.Duration, requiredAcks int16, msgs ...Message) (err error) {

attributes := int8(CompressionNoneCode)

var size int32
if hasHeaders(msgs...) {
size = recordBatchSize(msgs...)
} else {
if codec != nil {
if msgs, err = compress(codec, msgs...); err != nil {
return err
}
attributes = codec.Code()
if codec != nil {
if msgs, err = compress(codec, msgs...); err != nil {
return err
}
size = messageSetSize(msgs...)
attributes = codec.Code()
}
size := messageSetSize(msgs...)

h := requestHeader{
ApiKey: int16(produceRequest),
Expand Down Expand Up @@ -295,19 +279,85 @@ func writeProduceRequestV2(w *bufio.Writer, codec CompressionCodec, correlationI
// partition array
writeArrayLen(w, 1)
writeInt32(w, partition)

writeInt32(w, size)
for _, msg := range msgs {
writeMessage(w, msg.Offset, attributes, msg.Time, msg.Key, msg.Value)
}
return w.Flush()
}

if hasHeaders(msgs...) {
err = writeRecordBatch(w, codec, correlationID, clientID, topic, partition, timeout, requiredAcks, msgs...)
} else {
for _, msg := range msgs {
writeMessage(w, msg.Offset, attributes, msg.Time, msg.Key, msg.Value)
func writeProduceRequestV3(w *bufio.Writer, codec CompressionCodec, correlationID int32, clientID, topic string, partition int32, timeout time.Duration, requiredAcks int16, msgs ...Message) (err error) {

var size int32
var compressed []byte
var attributes int16
if codec != nil {
attributes = int16(codec.Code())
recordBuf := &bytes.Buffer{}
recordBuf.Grow(int(recordBatchSize(msgs...)))
compressedWriter := bufio.NewWriter(recordBuf)
for i, msg := range msgs {
writeRecord(compressedWriter, 0, msgs[0].Time, int64(i), msg)
}
compressedWriter.Flush()

compressed, err = codec.Encode(recordBuf.Bytes())
if err != nil {
return
}
attributes = int16(codec.Code())
size = recordBatchHeaderSize() + int32(len(compressed))
} else {
size = recordBatchSize(msgs...)
}

h := requestHeader{
ApiKey: int16(produceRequest),
ApiVersion: int16(v3),
CorrelationID: correlationID,
ClientID: clientID,
}
h.Size = (h.size() - 4) +
2 + // transactional_id
2 + // required acks
4 + // timeout
4 + // topic array length
sizeofString(topic) + // topic
4 + // partition array length
4 + // partition
4 + // message set size
size

h.writeTo(w)
writeInt16(w, -1) // null transactional_id
writeInt16(w, requiredAcks) // required acks
writeInt32(w, milliseconds(timeout))

// topic array
writeArrayLen(w, 1)
writeString(w, topic)

// partition array
writeArrayLen(w, 1)
writeInt32(w, partition)

writeInt32(w, size)
if codec != nil {
err = writeRecordBatch(w, attributes, size, func(w *bufio.Writer) {
w.Write(compressed)
}, msgs...)
} else {
err = writeRecordBatch(w, attributes, size, func(w *bufio.Writer) {
for i, msg := range msgs {
writeRecord(w, 0, msgs[0].Time, int64(i), msg)
}
}, msgs...)
}
if err != nil {
return
}

return w.Flush()
}

Expand All @@ -325,8 +375,8 @@ func messageSetSize(msgs ...Message) (size int32) {
return
}

func recordBatchSize(msgs ...Message) (size int32) {
size = 8 + // base offset
func recordBatchHeaderSize() int32 {
return 8 + // base offset
4 + // batch length
4 + // partition leader epoch
1 + // magic
Expand All @@ -339,33 +389,27 @@ func recordBatchSize(msgs ...Message) (size int32) {
2 + // producer epoch
4 + // base sequence
4 // msg count
}

baseTime := msgs[0].Time
func recordBatchSize(msgs ...Message) (size int32) {
size = recordBatchHeaderSize()

baseOffset := baseOffset(msgs...)
baseTime := msgs[0].Time

for _, msg := range msgs {
for i, msg := range msgs {

sz := recordSize(&msg, msg.Time.Sub(baseTime), msg.Offset-baseOffset)
sz := recordSize(&msg, msg.Time.Sub(baseTime), int64(i))

size += int32(sz + varIntLen(int64(sz)))
}
return
}

func writeRecordBatch(w *bufio.Writer, codec CompressionCodec, correlationID int32, clientId, topic string, partition int32, timeout time.Duration, requiredAcks int16, msgs ...Message) error {
func writeRecordBatch(w *bufio.Writer, attributes int16, size int32, write func(*bufio.Writer), msgs ...Message) error {

baseTime := msgs[0].Time

baseOffset := int64(0)

for i := 0; i < len(msgs); i++ {
msgs[i].Offset = int64(i)
}

size := recordBatchSize(msgs...)

writeInt64(w, baseOffset)
writeInt64(w, int64(0))

writeInt32(w, int32(size-12)) // 12 = batch length + base offset sizes

Expand All @@ -376,7 +420,7 @@ func writeRecordBatch(w *bufio.Writer, codec CompressionCodec, correlationID int
crcBuf.Grow(int(size - 12)) // 12 = batch length + base offset sizes
crcWriter := bufio.NewWriter(crcBuf)

writeInt16(crcWriter, 0) // attributes no compression, timestamp type 0 - create time, not part of a transaction, no control messages
writeInt16(crcWriter, attributes) // attributes, timestamp type 0 - create time, not part of a transaction, no control messages
writeInt32(crcWriter, int32(len(msgs)-1)) // max offset
writeInt64(crcWriter, timestamp(baseTime))
lastTime := timestamp(msgs[len(msgs)-1].Time)
Expand All @@ -386,70 +430,23 @@ func writeRecordBatch(w *bufio.Writer, codec CompressionCodec, correlationID int
writeInt32(crcWriter, -1) // default base sequence
writeInt32(crcWriter, int32(len(msgs))) // record count

for _, msg := range msgs {
writeRecord(crcWriter, CompressionNoneCode, baseTime, baseOffset, msg)
write(crcWriter)
if err := crcWriter.Flush(); err != nil {
return err
}
crcWriter.Flush()

crcTable := crc32.MakeTable(crc32.Castagnoli)
crcChecksum := crc32.Checksum(crcBuf.Bytes(), crcTable)

writeInt32(w, int32(crcChecksum))
w.Write(crcBuf.Bytes())

return nil
}

var maxDate time.Time = time.Date(5000, time.January, 0, 0, 0, 0, 0, time.UTC)

func baseTime(msgs ...Message) (baseTime time.Time) {
baseTime = maxDate
for _, msg := range msgs {
if msg.Time.Before(baseTime) {
baseTime = msg.Time
}
if _, err := w.Write(crcBuf.Bytes()); err != nil {
return err
}
return
}

func baseOffset(msgs ...Message) (baseOffset int64) {
baseOffset = math.MaxInt64
for _, msg := range msgs {
if msg.Offset < baseOffset {
baseOffset = msg.Offset
}
}
return
}

func maxOffsetDelta(baseOffset int64, msgs ...Message) (maxDelta int64) {
maxDelta = 0
for _, msg := range msgs {
curDelta := msg.Offset - baseOffset
if maxDelta > curDelta {
maxDelta = curDelta
}
}
return
return nil
}

func estimatedRecordSize(msg *Message) (size int32) {
size += 8 + // length
1 + // attributes
8 + // timestamp delta
8 + // offset delta
8 + // key length
int32(len(msg.Key)) +
8 + // value length
int32(len(msg.Value))
for _, h := range msg.Headers {
size += 8 + // header key length
int32(len(h.Key)) +
8 + // header value length
int32(len(h.Value))
}
return
}
var maxDate = time.Date(5000, time.January, 0, 0, 0, 0, 0, time.UTC)

func recordSize(msg *Message, timestampDelta time.Duration, offsetDelta int64) (size int) {
size += 1 + // attributes
Expand Down Expand Up @@ -517,10 +514,10 @@ func msgSize(key, value []byte) int32 {
}

// Messages with magic >2 are called records. This method writes messages using message format 2.
func writeRecord(w *bufio.Writer, attributes int8, baseTime time.Time, baseOffset int64, msg Message) {
func writeRecord(w *bufio.Writer, attributes int8, baseTime time.Time, offset int64, msg Message) {

timestampDelta := msg.Time.Sub(baseTime)
offsetDelta := int64(msg.Offset - baseOffset)
offsetDelta := int64(offset)

writeVarInt(w, int64(recordSize(&msg, timestampDelta, offsetDelta)))

Expand Down
18 changes: 14 additions & 4 deletions write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,19 +191,28 @@ func testWriteOptimization(t *testing.T, h requestHeader, r request, f func(*buf
}

func TestWriteV2RecordBatch(t *testing.T) {

if !KafkaIsAtLeast("0.11.0") {
t.Skip("RecordBatch was added in kafka 0.11.0")
return
}

topic := CreateTopic(t, 1)
msgs := make([]Message, 15)
for i := range msgs {
value := fmt.Sprintf("Sample message content: %d!", i)
msgs[i] = Message{Key: []byte("Key"), Value: []byte(value), Headers: []Header{Header{Key: "hk", Value: []byte("hv")}}}
}
w := NewWriter(WriterConfig{
Brokers: []string{"localhost:9092"},
Topic: topic,
BatchSize: 5,
Brokers: []string{"localhost:9092"},
Topic: topic,
BatchTimeout: 100 * time.Millisecond,
BatchSize: 5,
})

ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

if err := w.WriteMessages(ctx, msgs...); err != nil {
t.Errorf("Failed to write v2 messages to kafka: %v", err)
return
Expand All @@ -213,6 +222,7 @@ func TestWriteV2RecordBatch(t *testing.T) {
r := NewReader(ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: topic,
MaxWait: 100 * time.Millisecond,
})
defer r.Close()

Expand Down

0 comments on commit 1c2c0d0

Please sign in to comment.