Skip to content

Commit

Permalink
feat: reuse decompressed buffer to avoid allocating buffer whenever a…
Browse files Browse the repository at this point in the history
… batch is read (segmentio#920)
  • Loading branch information
vmihailenco authored May 31, 2022
1 parent 294fbdb commit 4788faf
Showing 1 changed file with 17 additions and 13 deletions.
30 changes: 17 additions & 13 deletions message_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ type messageSetReader struct {
//
// This is used to detect truncation of the response.
lengthRemain int

decompressed bytes.Buffer
}

type readerStack struct {
Expand Down Expand Up @@ -162,14 +164,15 @@ func (r *messageSetReader) readMessageV1(min int64, key readBytesFunc, val readB
if err = r.discardN(4); err != nil {
return
}

// read and decompress the contained message set.
var decompressed bytes.Buffer
if err = r.readBytesWith(func(r *bufio.Reader, sz int, n int) (remain int, err error) {
r.decompressed.Reset()
if err = r.readBytesWith(func(br *bufio.Reader, sz int, n int) (remain int, err error) {
// x4 as a guess that the average compression ratio is near 75%
decompressed.Grow(4 * n)
limitReader := io.LimitedReader{R: r, N: int64(n)}
r.decompressed.Grow(4 * n)
limitReader := io.LimitedReader{R: br, N: int64(n)}
codecReader := codec.NewReader(&limitReader)
_, err = decompressed.ReadFrom(codecReader)
_, err = r.decompressed.ReadFrom(codecReader)
remain = sz - (n - int(limitReader.N))
codecReader.Close()
return
Expand All @@ -184,7 +187,7 @@ func (r *messageSetReader) readMessageV1(min int64, key readBytesFunc, val readB
// messages at offsets 10-13, then the container message will have
// offset 13 and the contained messages will be 0,1,2,3. the base
// offset for the container, then is 13-3=10.
if offset, err = extractOffset(offset, decompressed.Bytes()); err != nil {
if offset, err = extractOffset(offset, r.decompressed.Bytes()); err != nil {
return
}

Expand All @@ -196,8 +199,8 @@ func (r *messageSetReader) readMessageV1(min int64, key readBytesFunc, val readB
// Allocate a buffer of size 0, which gets capped at 16 bytes
// by the bufio package. We are already reading buffered data
// here, no need to reserve another 4KB buffer.
reader: bufio.NewReaderSize(&decompressed, 0),
remain: decompressed.Len(),
reader: bufio.NewReaderSize(&r.decompressed, 0),
remain: r.decompressed.Len(),
base: offset,
parent: r.readerStack,
}
Expand Down Expand Up @@ -263,19 +266,20 @@ func (r *messageSetReader) readMessageV2(_ int64, key readBytesFunc, val readByt
err = fmt.Errorf("batch remain < 0 (%d)", batchRemain)
return
}
var decompressed bytes.Buffer
decompressed.Grow(4 * batchRemain)
r.decompressed.Reset()
// x4 as a guess that the average compression ratio is near 75%
r.decompressed.Grow(4 * batchRemain)
limitReader := io.LimitedReader{R: r.reader, N: int64(batchRemain)}
codecReader := codec.NewReader(&limitReader)
_, err = decompressed.ReadFrom(codecReader)
_, err = r.decompressed.ReadFrom(codecReader)
codecReader.Close()
if err != nil {
return
}
r.remain -= batchRemain - int(limitReader.N)
r.readerStack = &readerStack{
reader: bufio.NewReaderSize(&decompressed, 0), // the new stack reads from the decompressed buffer
remain: decompressed.Len(),
reader: bufio.NewReaderSize(&r.decompressed, 0), // the new stack reads from the decompressed buffer
remain: r.decompressed.Len(),
base: -1, // base is unused here
parent: r.readerStack,
header: r.header,
Expand Down

0 comments on commit 4788faf

Please sign in to comment.