Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Message writer for record batches #163

Merged
merged 26 commits into from
Jan 28, 2019
Merged
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
55ba9d6
Add branching for record batch writer
VictorDenisov Dec 2, 2018
1c90763
Make key and value from headers public
VictorDenisov Dec 2, 2018
a8cd851
Add writeVarInt function
VictorDenisov Dec 3, 2018
d18779b
Implement writeRecord method
VictorDenisov Dec 4, 2018
582f1d1
Write record batch method
VictorDenisov Dec 9, 2018
29674e8
Use zigzag encoding for varint
VictorDenisov Dec 10, 2018
0784aa5
Drop logging
VictorDenisov Dec 18, 2018
faddbdc
Add unit test for v2 record batch
VictorDenisov Dec 19, 2018
a57d973
Remove commented out code
VictorDenisov Jan 6, 2019
57693cf
Use int constants from math package
VictorDenisov Jan 6, 2019
a96cefa
Use exact record size instead of extra buffer
VictorDenisov Jan 12, 2019
60e7246
Give a simpler name to calcVarIntLen
VictorDenisov Jan 15, 2019
8128763
Move error check out of if else block
VictorDenisov Jan 15, 2019
fc101c3
Extract message set size in a function
VictorDenisov Jan 15, 2019
eca66e2
Give calcRecordSize a shorter name
VictorDenisov Jan 15, 2019
f4bc280
Protect recordSize from confusing order of params
VictorDenisov Jan 15, 2019
3da360b
Use recordSize intead of estimatedRecordSize
VictorDenisov Jan 15, 2019
d4b332e
Extract record batch size
VictorDenisov Jan 15, 2019
8c48777
Use record size from calculation
VictorDenisov Jan 18, 2019
36e6a36
Assign offsets to messages in record batch
VictorDenisov Jan 21, 2019
8798908
Write message set straight to output writer
VictorDenisov Jan 21, 2019
6464be6
Write record batch straight to output writer
VictorDenisov Jan 21, 2019
60de28c
Get rid of remainder buffer
VictorDenisov Jan 21, 2019
e22b69b
Remove commented out code
VictorDenisov Jan 23, 2019
d2cd545
Use bytes.Equal for comparing slices of bytes
VictorDenisov Jan 23, 2019
93017f4
Compress messages before calculating size
VictorDenisov Jan 27, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Write record batch straight to output writer
  • Loading branch information
VictorDenisov committed Jan 21, 2019
commit 6464be641f436b73d84a5df566de41701afdf9e2
19 changes: 6 additions & 13 deletions write.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,10 +247,8 @@ func writeProduceRequestV2(w *bufio.Writer, codec CompressionCodec, correlationI
writeInt32(w, partition)
writeInt32(w, size)

var msgBuf []byte
if hasHeaders(msgs...) {
msgBuf, err = writeRecordBatch(codec, correlationID, clientID, topic, partition, timeout, requiredAcks, msgs...)
w.Write(msgBuf)
err = writeRecordBatch(w, codec, correlationID, clientID, topic, partition, timeout, requiredAcks, msgs...)
} else {
err = writeMessageSet(w, codec, correlationID, clientID, topic, partition, timeout, requiredAcks, msgs...)
}
Expand Down Expand Up @@ -323,7 +321,7 @@ func recordBatchSize(msgs ...Message) (size int32) {
return
}

func writeRecordBatch(codec CompressionCodec, correlationID int32, clientId, topic string, partition int32, timeout time.Duration, requiredAcks int16, msgs ...Message) ([]byte, error) {
func writeRecordBatch(w *bufio.Writer, codec CompressionCodec, correlationID int32, clientId, topic string, partition int32, timeout time.Duration, requiredAcks int16, msgs ...Message) error {

baseTime := msgs[0].Time

Expand All @@ -335,11 +333,7 @@ func writeRecordBatch(codec CompressionCodec, correlationID int32, clientId, top

size := recordBatchSize(msgs...)

buf := &bytes.Buffer{}
buf.Grow(int(size))
bufWriter := bufio.NewWriter(buf)

writeInt64(bufWriter, baseOffset)
writeInt64(w, baseOffset)

remainderBuf := &bytes.Buffer{}
remainderBuf.Grow(int(size - 12)) // 12 = batch length + base offset sizes
Expand Down Expand Up @@ -378,11 +372,10 @@ func writeRecordBatch(codec CompressionCodec, correlationID int32, clientId, top
if remainderBuf.Len() != int(size-12) {
panic(fmt.Sprintf("Buf len doesn't match: size - %v, buf - %v", size-12, remainderBuf.Len()))
}
writeInt32(bufWriter, int32(remainderBuf.Len()))
bufWriter.Write(remainderBuf.Bytes())
bufWriter.Flush()
writeInt32(w, int32(remainderBuf.Len()))
w.Write(remainderBuf.Bytes())

return buf.Bytes(), nil
return nil
}

var maxDate time.Time = time.Date(5000, time.January, 0, 0, 0, 0, 0, time.UTC)
Expand Down