Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Message writer for record batches #163

Merged
merged 26 commits into from
Jan 28, 2019

Conversation

VictorDenisov
Copy link
Contributor

No description provided.

This was referenced Dec 19, 2018
Copy link
Contributor

@achille-roussel achille-roussel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking great, thanks for your contribution 👍

write.go Outdated
writeInt32(remainderWriter, -1) // partition leader epoch
writeInt8(remainderWriter, 2) // magic byte

//writeInt16(remainderWriter, 0) // TODO write crc
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this expected to remain commented out?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be removed. I calculate crc below.

write.go Outdated
const MaxUint = ^uint64(0)
const MinUint = 0
const MaxInt = int64(MaxUint >> 1)
const MinInt = -MaxInt - 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have to export those constants?

Also why not use math.MaxUint64, math.MaxInt64, etc... ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed. I should use standard values.

write.go Outdated

// Messages with magic >2 are called records. This method writes messages using message format 2.
func writeRecord(w *bufio.Writer, attributes int8, baseTime time.Time, baseOffset int64, msg Message) {
buf := &bytes.Buffer{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This intermediary buffer is a bit worrying performance-wise, but we can follow up with an optimization to compute the size later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I'll send a separate PR for it.

@VictorDenisov
Copy link
Contributor Author

@achille-roussel I addressed the comments.

@VictorDenisov
Copy link
Contributor Author

@achille-roussel I pushed the exact calculation of record size to this branch - it turned out to be trivial enough.

write.go Outdated
w.WriteByte(byte(i))
}

func calcVarIntLen(i int64) (l int) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those calc* function names are a bit uncommon in Go, we could get away with just varIntLen without impacting the readability I think.

Not a big deal tho, those are internal APIs.

write.go Outdated
msgBuf, err = writeRecordBatch(codec, correlationID, clientID, topic, partition, timeout, requiredAcks, msgs...)
if err != nil {
return
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code could be simplified by moving the error check out of the if/else block, it doesn't change the behavior tho.

write.go Outdated
return w.Flush()
buf := &bytes.Buffer{}
buf.Grow(int(size))
bufWriter := bufio.NewWriter(buf)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is likely going to introduce a significant performance regression on producers which use kafka-go. This code is introducing two new temporary buffers on code which we have on hot paths of our data pipelines (I'd assume other companies do as well). Those buffers will result in two extra copies of the message set, and increase the memory footprint of kafka producers. We need to introduce this change without taking the risk to suffer performance regressions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I'll try to do something about it.

@VictorDenisov
Copy link
Contributor Author

@achille-roussel , I got rid of as many extra buffers as possible. The only remaining buffer is crc buffer. bufio.Writer doesn't allow me to jump back and forth the output buffer. In order to calculate crc I would need to leave space for crc value and then write the data, calculate crc, go back and update crc value(as it's actually done in c implementation of kafka client). i thought it would require a bigger refactoring, that would be better off in a separate PR if we decide to go down this route.

Copy link
Contributor

@achille-roussel achille-roussel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me 👍 Thanks for making the changes!

I don't mind too much about new code being a bit less efficient, we can figure out a way to optimize later, I just wanted to avoid introducing performance regressions on existing code. Since this new behavior is opt-in we should be fine.

I'll see if @stevevls can give this one last pass tomorrow, but this is looking good enough to me 💯

write.go Outdated
sz := recordSize(&msg, msg.Time.Sub(baseTime), msg.Offset-baseOffset)

size += int32(sz + varIntLen(int64(sz)))
//size += int32(sz)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we remove this comment?

write_test.go Outdated
bufWriter := bufio.NewWriter(buf)
writeVarInt(bufWriter, tc.tc)
bufWriter.Flush()
if !reflect.DeepEqual(buf.Bytes(), tc.v) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe replace with bytes.Equal?

@VictorDenisov
Copy link
Contributor Author

@achille-roussel don't merge just yet. It looks like this change breaks the tests. I need to look into it.

@VictorDenisov
Copy link
Contributor Author

@achille-roussel I fixed the issue. The tests pass now.

Copy link
Contributor

@achille-roussel achille-roussel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good, let's get this merged! We'll merge #186 to run the new code against more versions of Kafka.

Thanks a ton for your contributions 💯

@achille-roussel achille-roussel merged commit c763f73 into segmentio:master Jan 28, 2019
@VictorDenisov VictorDenisov deleted the v2_message_writer branch January 29, 2019 03:52
achille-roussel pushed a commit that referenced this pull request Feb 11, 2019
* Fix lookup partition (#1)

* Test against multiple Kafka versions

* Message writer for record batches (#163)

* Add branching for record batch writer

* Make key and value from headers public

* Add writeVarInt function

* Implement writeRecord method

* Write record batch method

* Use zigzag encoding for varint

* Drop logging

* Add unit test for v2 record batch

* Remove commented out code

* Use int constants from math package

* Use exact record size instead of extra buffer

* Give a simpler name to calcVarIntLen

* Move error check out of if else block

* Extract message set size in a function

* Give calcRecordSize a shorter name

* Protect recordSize from confusing order of params

* Use recordSize intead of estimatedRecordSize

* Extract record batch size

* Use record size from calculation

* Assign offsets to messages in record batch

* Write message set straight to output writer

* Write record batch straight to output writer

* Get rid of remainder buffer

* Remove commented out code

* Use bytes.Equal for comparing slices of bytes

* Compress messages before calculating size

* Run tests for kafka 0.10 (#187)

Added a utility method to check broker version so that tests for
newer functionality can be skipped.

* fix

* Fix memory leak in LookupPartition

* Fix ReadPartitions
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants