Skip to content

Commit

Permalink
Fix message builder compression code.
Browse files Browse the repository at this point in the history
Due to a bug in the builder code, each compressed message set
was creating a compressed message set for each message. This
fixes that behavior so that the outer message composes N
inner messages.

The offset behavior between the v0 and v1 message set types
is also accounted for.

See: https://kafka.apache.org/documentation/#messages
  • Loading branch information
Collin Van Dyck committed Oct 18, 2021
1 parent de5fea3 commit 92c7911
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 33 deletions.
68 changes: 37 additions & 31 deletions builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (f v0MessageSetBuilder) messages() []Message {
}

func (f v0MessageSetBuilder) bytes() []byte {
return newWB().call(func(wb *kafkaWriteBuffer) {
bs := newWB().call(func(wb *kafkaWriteBuffer) {
for _, msg := range f.msgs {
bs := newWB().call(func(wb *kafkaWriteBuffer) {
wb.writeInt64(msg.Offset) // offset
Expand All @@ -96,22 +96,23 @@ func (f v0MessageSetBuilder) bytes() []byte {
wb.writeBytes(msg.Value)
}))
})
if f.codec != nil {
bs = newWB().call(func(wb *kafkaWriteBuffer) {
wb.writeInt64(msg.Offset) // offset
wb.writeBytes(newWB().call(func(wb *kafkaWriteBuffer) {
compressed := mustCompress(bs, f.codec)
wb.writeInt32(-1) // crc, unused
wb.writeInt8(0) // magic
wb.writeInt8(f.codec.Code()) // attributes
wb.writeBytes(nil) // key is always nil for compressed
wb.writeBytes(compressed) // the value is the compressed message
}))
})
}
wb.Write(bs)
}
})
if f.codec != nil {
bs = newWB().call(func(wb *kafkaWriteBuffer) {
wb.writeInt64(f.msgs[0].Offset) // offset
wb.writeBytes(newWB().call(func(wb *kafkaWriteBuffer) {
compressed := mustCompress(bs, f.codec)
wb.writeInt32(-1) // crc, unused
wb.writeInt8(0) // magic
wb.writeInt8(f.codec.Code()) // attributes
wb.writeBytes(nil) // key is always nil for compressed
wb.writeBytes(compressed) // the value is the compressed message
}))
})
}
return bs
}

type v1MessageSetBuilder struct {
Expand All @@ -124,10 +125,14 @@ func (f v1MessageSetBuilder) messages() []Message {
}

func (f v1MessageSetBuilder) bytes() []byte {
return newWB().call(func(wb *kafkaWriteBuffer) {
for _, msg := range f.msgs {
bs := newWB().call(func(wb *kafkaWriteBuffer) {
for i, msg := range f.msgs {
bs := newWB().call(func(wb *kafkaWriteBuffer) {
wb.writeInt64(msg.Offset) // offset
if f.codec != nil {
wb.writeInt64(int64(i)) // compressed inner message offsets are relative
} else {
wb.writeInt64(msg.Offset) // offset
}
wb.writeBytes(newWB().call(func(wb *kafkaWriteBuffer) {
wb.writeInt32(-1) // crc, unused
wb.writeInt8(1) // magic
Expand All @@ -137,23 +142,24 @@ func (f v1MessageSetBuilder) bytes() []byte {
wb.writeBytes(msg.Value)
}))
})
if f.codec != nil {
bs = newWB().call(func(wb *kafkaWriteBuffer) {
wb.writeInt64(msg.Offset) // offset
wb.writeBytes(newWB().call(func(wb *kafkaWriteBuffer) {
bs := mustCompress(bs, f.codec)
wb.writeInt32(-1) // crc, unused
wb.writeInt8(1) // magic
wb.writeInt8(f.codec.Code()) // attributes
wb.writeInt64(msg.Time.UnixMilli()) // timestamp
wb.writeBytes(nil) // key is always nil for compressed
wb.writeBytes(bs) // the value is the compressed message
}))
})
}
wb.Write(bs)
}
})
if f.codec != nil {
bs = newWB().call(func(wb *kafkaWriteBuffer) {
wb.writeInt64(f.msgs[len(f.msgs)-1].Offset) // offset of the wrapper message is the last offset of the inner messages
wb.writeBytes(newWB().call(func(wb *kafkaWriteBuffer) {
bs := mustCompress(bs, f.codec)
wb.writeInt32(-1) // crc, unused
wb.writeInt8(1) // magic
wb.writeInt8(f.codec.Code()) // attributes
wb.writeInt64(f.msgs[0].Time.UnixMilli()) // timestamp
wb.writeBytes(nil) // key is always nil for compressed
wb.writeBytes(bs) // the value is the compressed message
}))
})
}
return bs
}

type v2MessageSetBuilder struct {
Expand Down
2 changes: 0 additions & 2 deletions message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,7 @@ func TestV1BatchOffsets(t *testing.T) {
return res
}
for _, expected := range tc.expected {
t.Logf("Want [%d] %s:%s", expected.Offset, expected.Key, expected.Value)
msg := filter(r.readMessage())
t.Logf("Read [%d] %s:%s", msg.Offset, msg.Key, msg.Value)
require.EqualValues(t, expected, msg)
}
// finally, verify no more bytes remain
Expand Down

0 comments on commit 92c7911

Please sign in to comment.