Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract record batch structure #314

Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1057,28 +1057,38 @@ func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message)
deadline = adjustDeadlineForRTT(deadline, now, defaultRTT)
switch version := c.apiVersions[produceRequest].MaxVersion; {
case version >= 7:
recordBatch :=
&recordBatch{
codec: codec,
msgs: msgs,
}
recordBatch.init()
return c.wb.writeProduceRequestV7(
codec,
id,
c.clientID,
c.topic,
c.partition,
deadlineToTimeout(deadline, now),
int16(atomic.LoadInt32(&c.requiredAcks)),
c.transactionalID,
msgs...,
recordBatch,
)
case version >= 3:
recordBatch :=
&recordBatch{
codec: codec,
msgs: msgs,
}
recordBatch.init()
return c.wb.writeProduceRequestV3(
codec,
id,
c.clientID,
c.topic,
c.partition,
deadlineToTimeout(deadline, now),
int16(atomic.LoadInt32(&c.requiredAcks)),
c.transactionalID,
msgs...,
recordBatch,
)
default:
return c.wb.writeProduceRequestV2(
Expand Down
104 changes: 104 additions & 0 deletions recordbatch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package kafka

import (
"bytes"
"time"
)

const recordBatchHeaderSize int32 = 0 +
8 + // base offset
4 + // batch length
4 + // partition leader epoch
1 + // magic
4 + // crc
2 + // attributes
4 + // last offset delta
8 + // first timestamp
8 + // max timestamp
8 + // producer id
2 + // producer epoch
4 + // base sequence
4 // msg count

func recordBatchSize(msgs ...Message) (size int32) {
size = recordBatchHeaderSize
baseTime := msgs[0].Time

for i := range msgs {
msg := &msgs[i]
msz := recordSize(msg, msg.Time.Sub(baseTime), int64(i))
size += int32(msz + varIntLen(int64(msz)))
}

return
}

func compressRecordBatch(codec CompressionCodec, msgs ...Message) (compressed *bytes.Buffer, attributes int16, size int32, err error) {
compressed = acquireBuffer()
compressor := codec.NewWriter(compressed)
wb := &writeBuffer{w: compressor}

for i, msg := range msgs {
wb.writeRecord(0, msgs[0].Time, int64(i), msg)
}

if err = compressor.Close(); err != nil {
releaseBuffer(compressed)
return
}

attributes = int16(codec.Code())
size = recordBatchHeaderSize + int32(compressed.Len())
return
}

type recordBatch struct {
// required input parameters
codec CompressionCodec
attributes int16
msgs []Message

// parameters calculated during init
compressed *bytes.Buffer
size int32
}

func (r *recordBatch) init() (err error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like it's never valid to use a recordBatch without calling init, how about making it a constructor instead?

func newRecordBatch(codec CompressionCodec, msgs []Message) *recordBatch {
    ...
}

One challenge is managing the error returned by compressRecordBatch, maybe the recordBatch type can hold the error value like we do for writeBuffer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I was choosing between this option and the option that I implemented. My motivation was to not duplicate the fields in the record batch and in the declaration of the method. I don't have any strong attachment to either solution. Would you prefer your variant?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since a recordBatch value is never fully initialized without calling init I think it would make the code harder to use wrong to have the construction contained in a single function 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if r.codec == nil {
r.size = recordBatchSize(r.msgs...)
} else {
r.compressed, r.attributes, r.size, err = compressRecordBatch(r.codec, r.msgs...)
}
return
}

func (r *recordBatch) writeTo(wb *writeBuffer) {
wb.writeInt32(r.size)

baseTime := r.msgs[0].Time
lastTime := r.msgs[len(r.msgs)-1].Time
if r.compressed != nil {
wb.writeRecordBatch(r.attributes, r.size, len(r.msgs), baseTime, lastTime, func(wb *writeBuffer) {
wb.Write(r.compressed.Bytes())
})
releaseBuffer(r.compressed)
} else {
wb.writeRecordBatch(r.attributes, r.size, len(r.msgs), baseTime, lastTime, func(wb *writeBuffer) {
for i, msg := range r.msgs {
wb.writeRecord(0, r.msgs[0].Time, int64(i), msg)
}
})
}
}

func recordSize(msg *Message, timestampDelta time.Duration, offsetDelta int64) int {
return 1 + // attributes
varIntLen(int64(milliseconds(timestampDelta))) +
varIntLen(offsetDelta) +
varBytesLen(msg.Key) +
varBytesLen(msg.Value) +
varArrayLen(len(msg.Headers), func(i int) int {
h := &msg.Headers[i]
return varStringLen(h.Key) + varBytesLen(h.Value)
})
}
125 changes: 6 additions & 119 deletions write.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,19 +379,7 @@ func (wb *writeBuffer) writeProduceRequestV2(codec CompressionCodec, correlation
return wb.Flush()
}

func (wb *writeBuffer) writeProduceRequestV3(codec CompressionCodec, correlationID int32, clientID, topic string, partition int32, timeout time.Duration, requiredAcks int16, transactionalID *string, msgs ...Message) (err error) {
var size int32
var attributes int16
var compressed *bytes.Buffer

if codec == nil {
size = recordBatchSize(msgs...)
} else {
compressed, attributes, size, err = compressRecordBatch(codec, msgs...)
if err != nil {
return
}
}
func (wb *writeBuffer) writeProduceRequestV3(correlationID int32, clientID, topic string, partition int32, timeout time.Duration, requiredAcks int16, transactionalID *string, recordBatch *recordBatch) (err error) {

h := requestHeader{
ApiKey: int16(produceRequest),
Expand All @@ -409,7 +397,7 @@ func (wb *writeBuffer) writeProduceRequestV3(codec CompressionCodec, correlation
4 + // partition array length
4 + // partition
4 + // message set size
size
recordBatch.size

h.writeTo(wb)
wb.writeNullableString(transactionalID)
Expand All @@ -424,39 +412,12 @@ func (wb *writeBuffer) writeProduceRequestV3(codec CompressionCodec, correlation
wb.writeArrayLen(1)
wb.writeInt32(partition)

wb.writeInt32(size)
baseTime := msgs[0].Time
lastTime := msgs[len(msgs)-1].Time

if compressed != nil {
wb.writeRecordBatch(attributes, size, len(msgs), baseTime, lastTime, func(wb *writeBuffer) {
wb.Write(compressed.Bytes())
})
releaseBuffer(compressed)
} else {
wb.writeRecordBatch(attributes, size, len(msgs), baseTime, lastTime, func(wb *writeBuffer) {
for i, msg := range msgs {
wb.writeRecord(0, msgs[0].Time, int64(i), msg)
}
})
}
recordBatch.writeTo(wb)

return wb.Flush()
}

func (wb *writeBuffer) writeProduceRequestV7(codec CompressionCodec, correlationID int32, clientID, topic string, partition int32, timeout time.Duration, requiredAcks int16, transactionalID *string, msgs ...Message) (err error) {
var size int32
var attributes int16
var compressed *bytes.Buffer

if codec == nil {
size = recordBatchSize(msgs...)
} else {
compressed, attributes, size, err = compressRecordBatch(codec, msgs...)
if err != nil {
return
}
}
func (wb *writeBuffer) writeProduceRequestV7(correlationID int32, clientID, topic string, partition int32, timeout time.Duration, requiredAcks int16, transactionalID *string, recordBatch *recordBatch) (err error) {

h := requestHeader{
ApiKey: int16(produceRequest),
Expand All @@ -473,7 +434,7 @@ func (wb *writeBuffer) writeProduceRequestV7(codec CompressionCodec, correlation
4 + // partition array length
4 + // partition
4 + // message set size
size
recordBatch.size

h.writeTo(wb)
wb.writeNullableString(transactionalID)
Expand All @@ -488,22 +449,7 @@ func (wb *writeBuffer) writeProduceRequestV7(codec CompressionCodec, correlation
wb.writeArrayLen(1)
wb.writeInt32(partition)

wb.writeInt32(size)
baseTime := msgs[0].Time
lastTime := msgs[len(msgs)-1].Time

if compressed != nil {
wb.writeRecordBatch(attributes, size, len(msgs), baseTime, lastTime, func(wb *writeBuffer) {
wb.Write(compressed.Bytes())
})
releaseBuffer(compressed)
} else {
wb.writeRecordBatch(attributes, size, len(msgs), baseTime, lastTime, func(wb *writeBuffer) {
for i, msg := range msgs {
wb.writeRecord(0, msgs[0].Time, int64(i), msg)
}
})
}
recordBatch.writeTo(wb)

return wb.Flush()
}
Expand Down Expand Up @@ -572,25 +518,6 @@ func compressMessageSet(codec CompressionCodec, msgs ...Message) (compressed *by
return
}

func compressRecordBatch(codec CompressionCodec, msgs ...Message) (compressed *bytes.Buffer, attributes int16, size int32, err error) {
compressed = acquireBuffer()
compressor := codec.NewWriter(compressed)
wb := &writeBuffer{w: compressor}

for i, msg := range msgs {
wb.writeRecord(0, msgs[0].Time, int64(i), msg)
}

if err = compressor.Close(); err != nil {
releaseBuffer(compressed)
return
}

attributes = int16(codec.Code())
size = recordBatchHeaderSize + int32(compressed.Len())
return
}

func (wb *writeBuffer) writeMessage(offset int64, attributes int8, time time.Time, key, value []byte, cw *crc32Writer) {
const magicByte = 1 // compatible with kafka 0.10.0.0+

Expand Down Expand Up @@ -685,43 +612,3 @@ func messageSetSize(msgs ...Message) (size int32) {
}
return
}

func recordSize(msg *Message, timestampDelta time.Duration, offsetDelta int64) int {
return 1 + // attributes
varIntLen(int64(milliseconds(timestampDelta))) +
varIntLen(offsetDelta) +
varBytesLen(msg.Key) +
varBytesLen(msg.Value) +
varArrayLen(len(msg.Headers), func(i int) int {
h := &msg.Headers[i]
return varStringLen(h.Key) + varBytesLen(h.Value)
})
}

const recordBatchHeaderSize int32 = 0 +
8 + // base offset
4 + // batch length
4 + // partition leader epoch
1 + // magic
4 + // crc
2 + // attributes
4 + // last offset delta
8 + // first timestamp
8 + // max timestamp
8 + // producer id
2 + // producer epoch
4 + // base sequence
4 // msg count

func recordBatchSize(msgs ...Message) (size int32) {
size = recordBatchHeaderSize
baseTime := msgs[0].Time

for i := range msgs {
msg := &msgs[i]
msz := recordSize(msg, msg.Time.Sub(baseTime), int64(i))
size += int32(msz + varIntLen(int64(msz)))
}

return
}