-
Notifications
You must be signed in to change notification settings - Fork 798
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Message writer for record batches #163
Message writer for record batches #163
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking great, thanks for your contribution 👍
write.go
Outdated
writeInt32(remainderWriter, -1) // partition leader epoch | ||
writeInt8(remainderWriter, 2) // magic byte | ||
|
||
//writeInt16(remainderWriter, 0) // TODO write crc |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this expected to remain commented out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be removed. I calculate crc below.
write.go
Outdated
const MaxUint = ^uint64(0) | ||
const MinUint = 0 | ||
const MaxInt = int64(MaxUint >> 1) | ||
const MinInt = -MaxInt - 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have to export those constants?
Also why not use math.MaxUint64
, math.MaxInt64
, etc... ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed. I should use standard values.
write.go
Outdated
|
||
// Messages with magic >2 are called records. This method writes messages using message format 2. | ||
func writeRecord(w *bufio.Writer, attributes int8, baseTime time.Time, baseOffset int64, msg Message) { | ||
buf := &bytes.Buffer{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This intermediary buffer is a bit worrying performance-wise, but we can follow up with an optimization to compute the size later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 I'll send a separate PR for it.
@achille-roussel I addressed the comments. |
@achille-roussel I pushed the exact calculation of record size to this branch - it turned out to be trivial enough. |
2979f06
to
a9b1d80
Compare
a9b1d80
to
a96cefa
Compare
write.go
Outdated
w.WriteByte(byte(i)) | ||
} | ||
|
||
func calcVarIntLen(i int64) (l int) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Those calc*
function names are a bit uncommon in Go, we could get away with just varIntLen
without impacting the readability I think.
Not a big deal tho, those are internal APIs.
write.go
Outdated
msgBuf, err = writeRecordBatch(codec, correlationID, clientID, topic, partition, timeout, requiredAcks, msgs...) | ||
if err != nil { | ||
return | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code could be simplified by moving the error check out of the if/else block, it doesn't change the behavior tho.
write.go
Outdated
return w.Flush() | ||
buf := &bytes.Buffer{} | ||
buf.Grow(int(size)) | ||
bufWriter := bufio.NewWriter(buf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is likely going to introduce a significant performance regression on producers which use kafka-go. This code is introducing two new temporary buffers on code which we have on hot paths of our data pipelines (I'd assume other companies do as well). Those buffers will result in two extra copies of the message set, and increase the memory footprint of kafka producers. We need to introduce this change without taking the risk to suffer performance regressions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. I'll try to do something about it.
@achille-roussel , I got rid of as many extra buffers as possible. The only remaining buffer is crc buffer. bufio.Writer doesn't allow me to jump back and forth the output buffer. In order to calculate crc I would need to leave space for crc value and then write the data, calculate crc, go back and update crc value(as it's actually done in c implementation of kafka client). i thought it would require a bigger refactoring, that would be better off in a separate PR if we decide to go down this route. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me 👍 Thanks for making the changes!
I don't mind too much about new code being a bit less efficient, we can figure out a way to optimize later, I just wanted to avoid introducing performance regressions on existing code. Since this new behavior is opt-in we should be fine.
I'll see if @stevevls can give this one last pass tomorrow, but this is looking good enough to me 💯
write.go
Outdated
sz := recordSize(&msg, msg.Time.Sub(baseTime), msg.Offset-baseOffset) | ||
|
||
size += int32(sz + varIntLen(int64(sz))) | ||
//size += int32(sz) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we remove this comment?
write_test.go
Outdated
bufWriter := bufio.NewWriter(buf) | ||
writeVarInt(bufWriter, tc.tc) | ||
bufWriter.Flush() | ||
if !reflect.DeepEqual(buf.Bytes(), tc.v) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe replace with bytes.Equal
?
@achille-roussel don't merge just yet. It looks like this change breaks the tests. I need to look into it. |
@achille-roussel I fixed the issue. The tests pass now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good, let's get this merged! We'll merge #186 to run the new code against more versions of Kafka.
Thanks a ton for your contributions 💯
* Fix lookup partition (#1) * Test against multiple Kafka versions * Message writer for record batches (#163) * Add branching for record batch writer * Make key and value from headers public * Add writeVarInt function * Implement writeRecord method * Write record batch method * Use zigzag encoding for varint * Drop logging * Add unit test for v2 record batch * Remove commented out code * Use int constants from math package * Use exact record size instead of extra buffer * Give a simpler name to calcVarIntLen * Move error check out of if else block * Extract message set size in a function * Give calcRecordSize a shorter name * Protect recordSize from confusing order of params * Use recordSize intead of estimatedRecordSize * Extract record batch size * Use record size from calculation * Assign offsets to messages in record batch * Write message set straight to output writer * Write record batch straight to output writer * Get rid of remainder buffer * Remove commented out code * Use bytes.Equal for comparing slices of bytes * Compress messages before calculating size * Run tests for kafka 0.10 (#187) Added a utility method to check broker version so that tests for newer functionality can be skipped. * fix * Fix memory leak in LookupPartition * Fix ReadPartitions
No description provided.