Skip to content

Commit

Permalink
Merge pull request segmentio#753 from segmentio/mixed-mode-reader
Browse files Browse the repository at this point in the history
Allow different batch versions in fetch response.
  • Loading branch information
Collin Van Dyck authored Oct 18, 2021
2 parents d8dd9b0 + ab97659 commit 5d063eb
Show file tree
Hide file tree
Showing 27 changed files with 1,498 additions and 455 deletions.
1 change: 1 addition & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
fixtures/*.hex binary
267 changes: 267 additions & 0 deletions builder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
package kafka

import (
"bytes"
"fmt"
"io"
"time"

"github.com/segmentio/kafka-go/compress"
)

// This file defines builders to assist in creating kafka payloads for unit testing.

// fetchResponseBuilder builds v10 fetch responses. The version of the v10 fetch
// responses are not as important as the message sets contained within, as this
// type is ultimately used to unit test the message set reader that consumes the
// rest of the response once the header has been parsed.
type fetchResponseBuilder struct {
header fetchResponseHeader
msgSets []messageSetBuilder
rendered []byte
}

type fetchResponseHeader struct {
throttle int32
errorCode int16
sessionID int32
topic string
partition int32
partitionErrorCode int16
highWatermarkOffset int64
lastStableOffset int64
logStartOffset int64
}

func (b *fetchResponseBuilder) messages() (res []Message) {
for _, set := range b.msgSets {
res = append(res, set.messages()...)
}
return
}

func (b *fetchResponseBuilder) bytes() []byte {
if b.rendered == nil {
b.rendered = newWB().call(func(wb *kafkaWriteBuffer) {
wb.writeInt32(b.header.throttle)
wb.writeInt16(b.header.errorCode)
wb.writeInt32(b.header.sessionID)
wb.writeInt32(1) // num topics
wb.writeString(b.header.topic)
wb.writeInt32(1) // how many partitions
wb.writeInt32(b.header.partition)
wb.writeInt16(b.header.partitionErrorCode)
wb.writeInt64(b.header.highWatermarkOffset)
wb.writeInt64(b.header.lastStableOffset)
wb.writeInt64(b.header.logStartOffset)
wb.writeInt32(-1) // num aborted tx
wb.writeBytes(newWB().call(func(wb *kafkaWriteBuffer) {
for _, msgSet := range b.msgSets {
wb.Write(msgSet.bytes())
}
}))
})
}
return b.rendered
}

func (b *fetchResponseBuilder) Len() int {
return len(b.bytes())
}

type messageSetBuilder interface {
bytes() []byte
messages() []Message
}

type v0MessageSetBuilder struct {
msgs []Message
codec CompressionCodec
}

func (f v0MessageSetBuilder) messages() []Message {
return f.msgs
}

func (f v0MessageSetBuilder) bytes() []byte {
bs := newWB().call(func(wb *kafkaWriteBuffer) {
for _, msg := range f.msgs {
bs := newWB().call(func(wb *kafkaWriteBuffer) {
wb.writeInt64(msg.Offset) // offset
wb.writeBytes(newWB().call(func(wb *kafkaWriteBuffer) {
wb.writeInt32(-1) // crc, unused
wb.writeInt8(0) // magic
wb.writeInt8(0) // attributes -- zero, no compression for the inner message
wb.writeBytes(msg.Key)
wb.writeBytes(msg.Value)
}))
})
wb.Write(bs)
}
})
if f.codec != nil {
bs = newWB().call(func(wb *kafkaWriteBuffer) {
wb.writeInt64(f.msgs[0].Offset) // offset
wb.writeBytes(newWB().call(func(wb *kafkaWriteBuffer) {
compressed := mustCompress(bs, f.codec)
wb.writeInt32(-1) // crc, unused
wb.writeInt8(0) // magic
wb.writeInt8(f.codec.Code()) // attributes
wb.writeBytes(nil) // key is always nil for compressed
wb.writeBytes(compressed) // the value is the compressed message
}))
})
}
return bs
}

type v1MessageSetBuilder struct {
msgs []Message
codec CompressionCodec
}

func (f v1MessageSetBuilder) messages() []Message {
return f.msgs
}

func (f v1MessageSetBuilder) bytes() []byte {
bs := newWB().call(func(wb *kafkaWriteBuffer) {
for i, msg := range f.msgs {
bs := newWB().call(func(wb *kafkaWriteBuffer) {
if f.codec != nil {
wb.writeInt64(int64(i)) // compressed inner message offsets are relative
} else {
wb.writeInt64(msg.Offset) // offset
}
wb.writeBytes(newWB().call(func(wb *kafkaWriteBuffer) {
wb.writeInt32(-1) // crc, unused
wb.writeInt8(1) // magic
wb.writeInt8(0) // attributes -- zero, no compression for the inner message
wb.writeInt64(msg.Time.UnixMilli()) // timestamp
wb.writeBytes(msg.Key)
wb.writeBytes(msg.Value)
}))
})
wb.Write(bs)
}
})
if f.codec != nil {
bs = newWB().call(func(wb *kafkaWriteBuffer) {
wb.writeInt64(f.msgs[len(f.msgs)-1].Offset) // offset of the wrapper message is the last offset of the inner messages
wb.writeBytes(newWB().call(func(wb *kafkaWriteBuffer) {
bs := mustCompress(bs, f.codec)
wb.writeInt32(-1) // crc, unused
wb.writeInt8(1) // magic
wb.writeInt8(f.codec.Code()) // attributes
wb.writeInt64(f.msgs[0].Time.UnixMilli()) // timestamp
wb.writeBytes(nil) // key is always nil for compressed
wb.writeBytes(bs) // the value is the compressed message
}))
})
}
return bs
}

type v2MessageSetBuilder struct {
msgs []Message
codec CompressionCodec
}

func (f v2MessageSetBuilder) messages() []Message {
return f.msgs
}

func (f v2MessageSetBuilder) bytes() []byte {
attributes := int16(0)
if f.codec != nil {
attributes = int16(f.codec.Code()) // set codec code on attributes
}
return newWB().call(func(wb *kafkaWriteBuffer) {
wb.writeInt64(f.msgs[0].Offset)
wb.writeBytes(newWB().call(func(wb *kafkaWriteBuffer) {
wb.writeInt32(0) // leader epoch
wb.writeInt8(2) // magic = 2
wb.writeInt32(0) // crc, unused
wb.writeInt16(attributes) // record set attributes
wb.writeInt32(0) // record set last offset delta
wb.writeInt64(f.msgs[0].Time.UnixMilli()) // record set first timestamp
wb.writeInt64(f.msgs[0].Time.UnixMilli()) // record set last timestamp
wb.writeInt64(0) // record set producer id
wb.writeInt16(0) // record set producer epoch
wb.writeInt32(0) // record set base sequence
wb.writeInt32(int32(len(f.msgs))) // record set count
bs := newWB().call(func(wb *kafkaWriteBuffer) {
for i, msg := range f.msgs {
wb.Write(newWB().call(func(wb *kafkaWriteBuffer) {
bs := newWB().call(func(wb *kafkaWriteBuffer) {
wb.writeInt8(0) // record attributes, not used here
wb.writeVarInt(time.Now().UnixMilli() - msg.Time.UnixMilli()) // timestamp
wb.writeVarInt(int64(i)) // offset delta
wb.writeVarInt(int64(len(msg.Key))) // key len
wb.Write(msg.Key) // key bytes
wb.writeVarInt(int64(len(msg.Value))) // value len
wb.Write(msg.Value) // value bytes
wb.writeVarInt(int64(len(msg.Headers))) // number of headers
for _, header := range msg.Headers {
wb.writeVarInt(int64(len(header.Key)))
wb.Write([]byte(header.Key))
wb.writeVarInt(int64(len(header.Value)))
wb.Write(header.Value)
}
})
wb.writeVarInt(int64(len(bs)))
wb.Write(bs)
}))
}
})
if f.codec != nil {
bs = mustCompress(bs, f.codec)
}
wb.Write(bs)
}))
})
}

// kafkaWriteBuffer is a write buffer that helps writing fetch responses
type kafkaWriteBuffer struct {
writeBuffer
buf bytes.Buffer
}

func newWB() *kafkaWriteBuffer {
res := kafkaWriteBuffer{}
res.writeBuffer.w = &res.buf
return &res
}

func (f *kafkaWriteBuffer) Bytes() []byte {
return f.buf.Bytes()
}

// call is a convenience method that allows the kafkaWriteBuffer to be used
// in a functional manner. This is helpful when building
// nested structures, as the return value can be fed into
// other fwWB APIs.
func (f *kafkaWriteBuffer) call(cb func(wb *kafkaWriteBuffer)) []byte {
cb(f)
bs := f.Bytes()
if bs == nil {
bs = []byte{}
}
return bs
}

func mustCompress(bs []byte, codec compress.Codec) (res []byte) {
buf := bytes.Buffer{}
codecWriter := codec.NewWriter(&buf)
_, err := io.Copy(codecWriter, bytes.NewReader(bs))
if err != nil {
panic(fmt.Errorf("compress: %w", err))
}
err = codecWriter.Close()
if err != nil {
panic(fmt.Errorf("close codec writer: %w", err))
}
res = buf.Bytes()
return
}
1 change: 1 addition & 0 deletions fixtures/v1-v1.hex
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
000001660000000a00000000000015c79861000000010009746573742d6564677900000001000000000000000000000000000400000000000000040000000000000000ffffffff0000011f00000000000000000000003ca293717501000000017c4f08dc7f00000005616c706861000000217b22636f756e74223a302c2266696c6c6572223a2261616161616161616161227d00000000000000010000003b3d4abab001000000017c4f08dc970000000462657461000000217b22636f756e74223a302c2266696c6c6572223a2262626262626262626262227d00000000000000020000003cbcad5cde01000000017c4f09b16d0000000567616d6d61000000217b22636f756e74223a302c2266696c6c6572223a2263636363636363636363227d00000000000000030000003c8585230b01000000017c4f09b6b20000000564656c7461000000217b22636f756e74223a302c2266696c6c6572223a2264646464646464646464227d
Binary file added fixtures/v1-v1.pcapng
Binary file not shown.
1 change: 1 addition & 0 deletions fixtures/v1-v1c-v2-v2c-v2b-v2b-v2b-v2bc-v1b-v1bc.hex
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
000006b40000000a00000000000021f08796000000010007746573742d383800000001000000000000000000000000001400000000000000140000000000000000ffffffff0000066f00000000000000000000003c42f0d0f101000000017c477ab6a500000005616c706861000000217b22636f756e74223a302c2266696c6c6572223a2261616161616161616161227d00000000000000010000003bf4f7a99e01000000017c477abb610000000462657461000000217b22636f756e74223a302c2266696c6c6572223a2262626262626262626262227d00000000000000020000005fd3cf85ff01010000017c477b3bcbffffffff000000491f8b0800000000000000636080039bba2d51db18810cc61af76aebd340066b624e41462290a158ad949c5f9a57a26465a0a394969993935aa464a59408074ab5001b5f3ee14800000000000000000000030000005e5d1733a801010000017c477b408fffffffff000000481f8b080000000000000063608003eb95673d5f3002198c35eed50efd40064b526a49229056ac564ace2fcd2b51b232d0514acbccc9492d52b2524a8203a55a002737831e4700000000000000000000040000005e00000000020ab23c660000000000000000017c477d995f0000017c477d995fffffffffffffffffffffffffffff00000001580000000a67616d6d61427b22636f756e74223a302c2266696c6c6572223a2263636363636363636363227d0000000000000000050000005e000000000238c0553f0000000000000000017c477d9ec80000017c477d9ec8ffffffffffffffffffffffffffff00000001580000000a64656c7461427b22636f756e74223a302c2266696c6c6572223a2264646464646464646464227d0000000000000000060000006a0000000002188627120001000000000000017c477dd0b70000017c477dd0b7ffffffffffffffffffffffffffff000000011f8b08000000000000008b606060e04a4fcccd4d74aa564ace2fcd2b51b232d0514acbccc9492d52b2524a8603a55a0600788108a12d00000000000000000000070000006a0000000002b08e2b720001000000000000017c477dd7ef0000017c477dd7efffffffffffffffffffffffffffff000000011f8b08000000000000008b606060e04a49cd294974aa564ace2fcd2b51b232d0514acbccc9492d52b2524a8103a55a0600496dfe822d00000000000000000000080000008d00000000023cc016270000000000010000017c4784fe490000017c47850044ffffffffffffffffffffffffffff000000025c0000000e657073696c6f6e427b22636f756e74223a302c2266696c6c6572223a2265656565656565656565227d005800f60702087a657461427b22636f756e74223a302c2266696c6c6572223a2266666666666666666666227d00000000000000000a0000007d00000000026e844d550001000000010000017c4785514b0000017c47855423ffffffffffffffffffffffffffff000000021f8b08000000000000008b616060e04b2d28ceccc9cf73aa564ace2fcd2b51b232d0514acbccc9492d52b2524a8503a55a8608860ddc4c1c55a92589d815a7c1015031003ebb53a15c000000000000000000000c0000008a0000000002e5dfd9e20000000000010000017c4c8f1e1c0000017c4c8f20e8ffffffffffffffffffffffffffff000000025400000006657461427b22636f756e74223a302c2266696c6c6572223a2267676767676767676767227d005a00980b020a7468657461427b22636f756e74223a302c2266696c6c6572223a2268686868686868686868227d00000000000000000e0000007700000000020f80521f0001000000010000017c4c8f4da50000017c4c8f4fb8ffffffffffffffffffffffffffff000000021f8b08000000000000000b616060604b2d4974aa564ace2fcd2b51b232d0514acbccc9492d52b2524a8703a55a862886651c4c5c2519385567c001503500b01e3aa95900000000000000000000100000003a3b6d4cf601000000017c4cadbd7300000003657461000000217b22636f756e74223a302c2266696c6c6572223a2267676767676767676767227d00000000000000110000003c857f5cd501000000017c4cadbd99000000057468657461000000217b22636f756e74223a302c2266696c6c6572223a2268686868686868686868227d000000000000001300000076dbf0a20a01010000017c4cadf305ffffffff000000601f8b080000000000000063608003ab0d4959758c4006638dcfda8fcf810ce6d4924420a558ad949c5f9a57a26465a0a394969993935aa464a5940e074ab55013409a6d9412cf95c14cf9cc0a64b09664e03327030e946a01e34da7538e000000
Binary file not shown.
1 change: 1 addition & 0 deletions fixtures/v1c-v1-v1c.hex
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
000002350000000a0000000000003d15acfe00000001000b746573742d627265657a7900000001000000000000000000000000000600000000000000060000000000000000ffffffff000001ec000000000000000100000079779afa8b01010000017c4f11cdc9ffffffff000000631f8b0800000000000000636080039bf9617b7418810cc61a7fc1b32b810cd6c49c828c442043b15a2939bf34af44c9ca4047292d332727b548c94a29110e946aa16680b45b5b967f780937e72490c192945a82db98243850aa05001ea2107b8f00000000000000000000020000003cda0e410e01000000017c4f1212630000000567616d6d61000000217b22636f756e74223a302c2266696c6c6572223a2263636363636363636363227d00000000000000030000003c0470399301000000017c4f12154e0000000564656c7461000000217b22636f756e74223a302c2266696c6c6572223a2264646464646464646464227d0000000000000004000000613b0e4db601010000017c4f124947ffffffff0000004b1f8b080000000000000063608003bb67b39e743302198c35fe429eee40067b6a4171664e7e1e90a958ad949c5f9a57a26465a0a394969993935aa464a5940a074ab5007d95b7894a00000000000000000000050000005edb50180901010000017c4f124fd0ffffffff000000481f8b080000000000000063608003ebfbf2b32c18810cc61a7f21ff0b40064b556a49229056ac564ace2fcd2b51b232d0514acbccc9492d52b2524a8303a55a005ec594df47000000
Binary file added fixtures/v1c-v1-v1c.pcapng
Binary file not shown.
1 change: 1 addition & 0 deletions fixtures/v1c-v1c.hex
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
000001a20000000a0000000000001abffa5700000001000a746573742d677574737900000001000000000000000000000000000400000000000000040000000000000000ffffffff0000015a0000000000000001000000789125e5e201010000017c4f0ee474ffffffff000000621f8b0800000000000000636080039bfcfd51598c4006638d3fdf131f20833531a7202311c850ac564ace2fcd2b51b232d0514acbccc9492d52b2524a8403a55aa81920edd67a221c2e70734a800c96a4d412dcc624c181522d001d8564f48f00000000000000000000020000005f66e75d9b01010000017c4f0f55f5ffffffff000000491f8b0800000000000000636080039bfd2566fe8c4006638d3f7fe8572083353d31373711c850ac564ace2fcd2b51b232d0514acbccc9492d52b2524a8603a55a008ef7186d4800000000000000000000030000005f3cff26a901010000017c4f0f5d5cffffffff000000491f8b0800000000000000636080031b6f8db3d18c4006638d3f7f6c0c90c19a929a5392086428562b25e797e695285919e828a565e6e4a416295929a5c081522d00dd1f6ff148000000
Binary file added fixtures/v1c-v1c.pcapng
Binary file not shown.
1 change: 1 addition & 0 deletions fixtures/v2-v2.hex
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
000001760000000a0000000000001163921100000001000a746573742d6c7563696400000001000000000000000000000000000400000000000000040000000000000000ffffffff0000012e00000000000000000000008a00000000023978fc3b0000000000010000017c4f173eb90000017c4f173ed2ffffffffffffffffffffffffffff00000002580000000a616c706861427b22636f756e74223a302c2266696c6c6572223a2261616161616161616161227d00560032020862657461427b22636f756e74223a302c2266696c6c6572223a2262626262626262626262227d0000000000000000020000008c0000000002fa7514ab0000000000010000017c4f175fa00000017c4f17631fffffffffffffffffffffffffffff00000002580000000a67616d6d61427b22636f756e74223a302c2266696c6c6572223a2263636363636363636363227d005a00fe0d020a64656c7461427b22636f756e74223a302c2266696c6c6572223a2264646464646464646464227d00
Binary file added fixtures/v2-v2.pcapng
Binary file not shown.
1 change: 1 addition & 0 deletions fixtures/v2b-v1.hex
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
0000016e0000000a00000000000023f24a1a00000001000b746573742d66656973747900000001000000000000000000000000000400000000000000040000000000000000ffffffff0000012500000000000000000000008a000000000267762fd10000000000010000017c4e71efe10000017c4e71effdffffffffffffffffffffffffffff00000002580000000a616c706861427b22636f756e74223a302c2266696c6c6572223a2261616161616161616161227d00560038020862657461427b22636f756e74223a302c2266696c6c6572223a2262626262626262626262227d0000000000000000020000003c0d5ba69301000000017c4e743d2100000005616c706861000000217b22636f756e74223a302c2266696c6c6572223a2261616161616161616161227d00000000000000030000003be6e3d42501000000017c4e743d410000000462657461000000217b22636f756e74223a302c2266696c6c6572223a2262626262626262626262227d
Binary file added fixtures/v2b-v1.pcapng
Binary file not shown.
1 change: 1 addition & 0 deletions fixtures/v2bc-v1-v1c.hex
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
000001e60000000a000000000000530076a100000001000a746573742d686172647900000001000000000000000000000000000600000000000000060000000000000000ffffffff0000019e000000000000000000000079000000000214d2dc1d0001000000010000017c4ead43a90000017c4ead43c3ffffffffffffffffffffffffffff000000021f8b08000000000000008b606060e04acc29c84874aa564ace2fcd2b51b232d0514acbccc9492d52b2524a8403a55a86300613268ea4d4121c6a93e000a81600538562275900000000000000000000020000003c48deb52601000000017c4eae54050000000567616d6d61000000217b22636f756e74223a302c2266696c6c6572223a2263636363636363636363227d00000000000000030000003ca2ba5edc01000000017c4eae5cff0000000564656c7461000000217b22636f756e74223a302c2266696c6c6572223a2264646464646464646464227d00000000000000050000007d2e0ea95201010000017c4eb07250ffffffff000000671f8b080000000000000063608003bb5b69b1958c4006638ddf86fcef40067b6a4171664e7e1e90a958ad949c5f9a57a26465a0a394969993935aa464a5940a074ab550534006587fdc55d00633a92800c860a94a2d49c4694c1a1c28d5020087e0fa5d91000000
Binary file added fixtures/v2bc-v1-v1c.pcapng
Binary file not shown.
1 change: 1 addition & 0 deletions fixtures/v2bc-v1.hex
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
0000015d0000000a0000000000006d36526200000001000a746573742d686172647900000001000000000000000000000000000400000000000000040000000000000000ffffffff00000115000000000000000000000079000000000214d2dc1d0001000000010000017c4ead43a90000017c4ead43c3ffffffffffffffffffffffffffff000000021f8b08000000000000008b606060e04acc29c84874aa564ace2fcd2b51b232d0514acbccc9492d52b2524a8403a55a86300613268ea4d4121c6a93e000a81600538562275900000000000000000000020000003c48deb52601000000017c4eae54050000000567616d6d61000000217b22636f756e74223a302c2266696c6c6572223a2263636363636363636363227d00000000000000030000003ca2ba5edc01000000017c4eae5cff0000000564656c7461000000217b22636f756e74223a302c2266696c6c6572223a2264646464646464646464227d
Binary file added fixtures/v2bc-v1.pcapng
Binary file not shown.
1 change: 1 addition & 0 deletions fixtures/v2bc-v1c.hex
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
000001520000000a0000000000004aa4215500000001000b746573742d6b61726d696300000001000000000000000000000000000400000000000000040000000000000000ffffffff00000109000000000000000000000079000000000218f2e1220001000000010000017c4e8edde60000017c4e8eddffffffffffffffffffffffffffffff000000021f8b08000000000000008b606060e04acc29c84874aa564ace2fcd2b51b232d0514acbccc9492d52b2524a8403a55a86300623268ea4d4121c6a93e000a81600b5931557590000000000000000000003000000785a33562401010000017c4e8f57f1ffffffff000000621f8b0800000000000000636080031b93ef814f19810cc61abffef0ab40066b624e41462290a158ad949c5f9a57a26465a0a394969993935aa464a59408074ab5503340daad5d459b6ec0cdf90864b024a596e03626090e946a016143eac78f000000
Binary file added fixtures/v2bc-v1c.pcapng
Binary file not shown.
1 change: 1 addition & 0 deletions fixtures/v2c-v2-v2c.hex
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
000001ee0000000a000000000000670352ac00000001000a746573742d6e6174747900000001000000000000000000000000000600000000000000060000000000000000ffffffff000001a600000000000000000000007900000000025da9bf740001000000010000017c4f1eea730000017c4f1eea8dffffffffffffffffffffffffffff000000021f8b08000000000000008b606060e04acc29c84874aa564ace2fcd2b51b232d0514acbccc9492d52b2524a8403a55a86300613268ea4d4121c6a93e000a81600538562275900000000000000000000020000008c0000000002f53e2b600000000000010000017c4f1f1a600000017c4f1f1c65ffffffffffffffffffffffffffff00000002580000000a67616d6d61427b22636f756e74223a302c2266696c6c6572223a2263636363636363636363227d005a008a08020a64656c7461427b22636f756e74223a302c2266696c6c6572223a2264646464646464646464227d0000000000000000040000007d000000000268a8ca640001000000010000017c4f1f49f90000017c4f1f4db4ffffffffffffffffffffffffffff000000021f8b08000000000000008b616060e04b2d28ceccc9cf73aa564ace2fcd2b51b232d0514acbccc9492d52b2524a8503a55a8608866f7c4c1c55a92589d815a7c101503100ebf4f0655c000000
Binary file added fixtures/v2c-v2-v2c.pcapng
Binary file not shown.
Loading

0 comments on commit 5d063eb

Please sign in to comment.