Skip to content

Commit

Permalink
change post review
Browse files Browse the repository at this point in the history
  • Loading branch information
Pryz committed Jul 16, 2018
1 parent ee7d654 commit 420836d
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 42 deletions.
2 changes: 1 addition & 1 deletion batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (batch *Batch) ReadMessage() (Message, error) {
if err != nil {
return msg, err
}
return msg.Decode()
return msg.decode()
}

func (batch *Batch) readMessage(
Expand Down
47 changes: 19 additions & 28 deletions compression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,29 @@ func TestCompression(t *testing.T) {
Value: []byte("message"),
}

testEncodeDecode(t, msg, kafka.CompressionNoneCode)
testEncodeDecode(t, msg, gzip.Code)
testEncodeDecode(t, msg, snappy.Code)
testEncodeDecode(t, msg, lz4.Code)
testUnknownCodec(t, msg, 42)
testEncodeDecode(t, msg, nil)
testEncodeDecode(t, msg, gzip.NewCompressionCodec())
testEncodeDecode(t, msg, snappy.NewCompressionCodec())
testEncodeDecode(t, msg, lz4.NewCompressionCodec())
}

func testEncodeDecode(t *testing.T, m kafka.Message, codec int8) {
func testEncodeDecode(t *testing.T, m kafka.Message, codec kafka.CompressionCodec) {
var r1, r2 kafka.Message
var err error
var code int8

t.Run("encode with "+codecToStr(codec), func(t *testing.T) {
if codec != nil {
code = codec.Code()
}

t.Run("encode with "+codecToStr(code), func(t *testing.T) {
m.CompressionCodec = codec
r1, err = m.Encode()
if err != nil {
t.Error(err)
}
})
t.Run("encode with "+codecToStr(codec), func(t *testing.T) {
t.Run("encode with "+codecToStr(code), func(t *testing.T) {
r2, err = r1.Decode()
if err != nil {
t.Error(err)
Expand All @@ -46,19 +50,6 @@ func testEncodeDecode(t *testing.T, m kafka.Message, codec int8) {
})
}

func testUnknownCodec(t *testing.T, m kafka.Message, codec int8) {
t.Run("unknown codec", func(t *testing.T) {
expectedErr := "codec 42 not imported."
m.CompressionCodec = codec
_, err := m.Encode()
if err.Error() != expectedErr {
t.Error("wrong error")
t.Log("got: ", err)
t.Error("expected: ", expectedErr)
}
})
}

func codecToStr(codec int8) string {
switch codec {
case kafka.CompressionNoneCode:
Expand All @@ -77,27 +68,27 @@ func codecToStr(codec int8) string {
func BenchmarkCompression(b *testing.B) {
benchmarks := []struct {
scenario string
codec int8
function func(*testing.B, int8, int, map[int][]byte)
codec kafka.CompressionCodec
function func(*testing.B, kafka.CompressionCodec, int, map[int][]byte)
}{
{
scenario: "None",
codec: kafka.CompressionNoneCode,
codec: nil,
function: benchmarkCompression,
},
{
scenario: "GZIP",
codec: gzip.Code,
codec: gzip.NewCompressionCodec(),
function: benchmarkCompression,
},
{
scenario: "Snappy",
codec: snappy.Code,
codec: snappy.NewCompressionCodec(),
function: benchmarkCompression,
},
{
scenario: "LZ4",
codec: lz4.Code,
codec: lz4.NewCompressionCodec(),
function: benchmarkCompression,
},
}
Expand Down Expand Up @@ -126,7 +117,7 @@ func BenchmarkCompression(b *testing.B) {

}

func benchmarkCompression(b *testing.B, codec int8, payloadSize int, payload map[int][]byte) {
func benchmarkCompression(b *testing.B, codec kafka.CompressionCodec, payloadSize int, payload map[int][]byte) {
msg := kafka.Message{
Value: payload[payloadSize],
CompressionCodec: codec,
Expand Down
2 changes: 1 addition & 1 deletion conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,7 @@ func (c *Conn) WriteMessages(msgs ...Message) (int, error) {
}

var err error
msg, err = msg.Encode()
msg, err = msg.encode()
if err != nil {
return 0, err
}
Expand Down
9 changes: 9 additions & 0 deletions export_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package kafka

func (msg Message) Encode() (Message, error) {
return msg.encode()
}

func (msg Message) Decode() (Message, error) {
return msg.decode()
}
22 changes: 13 additions & 9 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type Message struct {
Time time.Time

// Compression codec used to encode the message value
CompressionCodec int8
CompressionCodec
}

func (msg Message) item() messageSetItem {
Expand All @@ -36,35 +36,39 @@ func (msg Message) item() messageSetItem {
}

func (msg Message) message() message {
var attrs int8
if msg.CompressionCodec != nil {
attrs = int8(msg.CompressionCodec.Code()) & compressionCodecMask
}

m := message{
MagicByte: 1,
Key: msg.Key,
Value: msg.Value,
Timestamp: timestamp(msg.Time),
Attributes: int8(msg.CompressionCodec) & compressionCodecMask,
Attributes: attrs,
}
m.CRC = m.crc32()
return m
}

// Encode encodes the Message using the CompressionCodec.
func (msg Message) Encode() (Message, error) {
if msg.CompressionCodec == CompressionNoneCode {
func (msg Message) encode() (Message, error) {
//if msg.CompressionCodec.Code() == CompressionNoneCode {
if msg.CompressionCodec == nil {
return msg, nil
}

codec, ok := codecs[msg.CompressionCodec]
codec, ok := codecs[msg.CompressionCodec.Code()]
if !ok {
return msg, fmt.Errorf("codec %d not imported.", msg.CompressionCodec)
return msg, fmt.Errorf("codec %d not imported.", msg.CompressionCodec.Code())
}

var err error
msg.Value, err = transform(msg.Value, codec.Encode)
return msg, err
}

// Decode decodes the Message using the CompressionCodec.
func (msg Message) Decode() (Message, error) {
func (msg Message) decode() (Message, error) {
c := msg.message().Attributes & compressionCodecMask
if c == CompressionNoneCode {
return msg, nil
Expand Down
4 changes: 2 additions & 2 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1148,7 +1148,7 @@ func (r *Reader) ReadMessage(ctx context.Context) (Message, error) {
}
}

return m.Decode()
return m.decode()
}

// FetchMessage reads and return the next message from the r. The method call
Expand Down Expand Up @@ -1206,7 +1206,7 @@ func (r *Reader) FetchMessage(ctx context.Context) (Message, error) {
if m.error != nil {
return m.message, m.error
}
return m.message.Decode()
return m.message.decode()
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ type WriterConfig struct {
Async bool

// CompressionCodec set the codec to be used to compress Kafka messages.
CompressionCodec int8
CompressionCodec

newPartitionWriter func(partition int, config WriterConfig, stats *writerStats) partitionWriter
}
Expand Down

0 comments on commit 420836d

Please sign in to comment.