Skip to content

Commit

Permalink
Add support for testing message headers.
Browse files Browse the repository at this point in the history
For v2 message set builders, we serialize message headers, and
then test that the headers are set on the messages read from
the message set reader.
  • Loading branch information
Collin Van Dyck committed Oct 5, 2021
1 parent 8b94ffa commit 68b6956
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 6 deletions.
8 changes: 7 additions & 1 deletion builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,13 @@ func (f v2MessageSetBuilder) bytes() []byte {
wb.Write(msg.Key) // key bytes
wb.writeVarInt(int64(len(msg.Value))) // value len
wb.Write(msg.Value) // value bytes
wb.writeVarInt(0) // number of headers
wb.writeVarInt(int64(len(msg.Headers))) // number of headers
for _, header := range msg.Headers {
wb.writeVarInt(int64(len(header.Key)))
wb.Write([]byte(header.Key))
wb.writeVarInt(int64(len(header.Value)))
wb.Write(header.Value)
}
})
wb.writeVarInt(int64(len(bs)))
wb.Write(bs)
Expand Down
28 changes: 23 additions & 5 deletions message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ func TestMessageSetReader(t *testing.T) {
Offset: int64(i + startOffset),
Key: []byte(fmt.Sprintf("key-%d", i)),
Value: []byte(fmt.Sprintf("val-%d", i)),
Headers: []Header{
{
Key: fmt.Sprintf("header-key-%d", i),
Value: []byte(fmt.Sprintf("header-value-%d", i)),
},
},
}
}
defaultHeader := fetchResponseHeader{
Expand Down Expand Up @@ -280,11 +286,23 @@ func TestMessageSetReader(t *testing.T) {
}
rh.offset = tc.builder.messages()[0].Offset
rh.debug = tc.debug
for _, expected := range tc.builder.messages() {
msg := rh.readMessage()
require.Equal(t, string(expected.Key), string(msg.Key))
require.Equal(t, string(expected.Value), string(msg.Value))
require.Equal(t, expected.Offset, msg.Offset)
for _, messageSet := range tc.builder.msgSets {
for _, expected := range messageSet.messages() {
msg := rh.readMessage()
require.Equal(t, string(expected.Key), string(msg.Key))
require.Equal(t, string(expected.Value), string(msg.Value))
switch messageSet.(type) {
case v0MessageSetBuilder, v1MessageSetBuilder:
// v0 and v1 message sets do not have headers
require.Len(t, msg.Headers, 0)
case v2MessageSetBuilder:
// v2 message sets can have headers
require.EqualValues(t, expected.Headers, msg.Headers)
default:
t.Fatalf("unknown builder: %T", messageSet)
}
require.Equal(t, expected.Offset, msg.Offset)
}
}
// verify the reader stack is empty
require.EqualValues(t, 0, rh.remain)
Expand Down

0 comments on commit 68b6956

Please sign in to comment.