Skip to content

Commit

Permalink
Optimize message size calculation (segmentio#329)
Browse files Browse the repository at this point in the history
* optimize message size calculation

* remove benchmark

* create const
  • Loading branch information
sagarkrkv authored and Achille committed Sep 9, 2019
1 parent 0748d0d commit 752cbf8
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 5 deletions.
8 changes: 7 additions & 1 deletion message.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ func (msg Message) message(cw *crc32Writer) message {
return m
}

const timestampSize = 8

func (msg Message) size() int32 {
return 4 + 1 + 1 + sizeofBytes(msg.Key) + sizeofBytes(msg.Value) + timestampSize
}

type message struct {
CRC int32
MagicByte int8
Expand All @@ -62,7 +68,7 @@ func (m message) crc32(cw *crc32Writer) int32 {
func (m message) size() int32 {
size := 4 + 1 + 1 + sizeofBytes(m.Key) + sizeofBytes(m.Value)
if m.MagicByte != 0 {
size += 8 // Timestamp
size += timestampSize
}
return size
}
Expand Down
31 changes: 31 additions & 0 deletions message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package kafka

import (
"bufio"
"math/rand"
"testing"
"time"
)

func TestMessageSetReaderEmpty(t *testing.T) {
Expand Down Expand Up @@ -34,3 +36,32 @@ func TestMessageSetReaderEmpty(t *testing.T) {
t.Errorf("unexpected error from discard(): %v", m.discard())
}
}

func TestMessageSize(t *testing.T) {
rand.Seed(time.Now().UnixNano())
for i := 0; i < 20; i++ {
t.Run("Run", func(t *testing.T) {
msg := Message{
Key: make([]byte, rand.Intn(200)),
Value: make([]byte, rand.Intn(200)),
Time: randate(),
}
expSize := msg.message(nil).size()
gotSize := msg.size()
if expSize != gotSize {
t.Errorf("Expected size %d, but got size %d", expSize, gotSize)
}
})
}

}

// https://stackoverflow.com/questions/43495745/how-to-generate-random-date-in-go-lang/43497333#43497333
func randate() time.Time {
min := time.Date(1970, 1, 0, 0, 0, 0, 0, time.UTC).Unix()
max := time.Date(2070, 1, 0, 0, 0, 0, 0, time.UTC).Unix()
delta := max - min

sec := rand.Int63n(delta) + min
return time.Unix(sec, 0)
}
8 changes: 4 additions & 4 deletions writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error {
}

for i, msg := range msgs {
if int(msg.message(nil).size()) > w.config.BatchBytes {
if int(msg.size()) > w.config.BatchBytes {
err := MessageTooLargeError{
Message: msg,
Remaining: msgs[i+1:],
Expand Down Expand Up @@ -651,7 +651,7 @@ func (w *writer) run() {
if lastMsg.res != nil {
resch = append(resch, lastMsg.res)
}
batchSizeBytes += int(lastMsg.msg.message(nil).size())
batchSizeBytes += int(lastMsg.msg.size())
lastMsg = writerMessage{}
if !batchTimerRunning {
batchTimer.Reset(w.batchTimeout)
Expand All @@ -663,7 +663,7 @@ func (w *writer) run() {
if !ok {
done, mustFlush = true, true
} else {
if int(wm.msg.message(nil).size())+batchSizeBytes > w.maxMessageBytes {
if int(wm.msg.size())+batchSizeBytes > w.maxMessageBytes {
// If the size of the current message puts us over the maxMessageBytes limit,
// store the message but don't send it in this batch.
mustFlush = true
Expand All @@ -674,7 +674,7 @@ func (w *writer) run() {
if wm.res != nil {
resch = append(resch, wm.res)
}
batchSizeBytes += int(wm.msg.message(nil).size())
batchSizeBytes += int(wm.msg.size())
mustFlush = len(batch) >= w.batchSize || batchSizeBytes >= w.maxMessageBytes
}
if !batchTimerRunning {
Expand Down

0 comments on commit 752cbf8

Please sign in to comment.