Skip to content

Commit

Permalink
Start modification based on review
Browse files Browse the repository at this point in the history
  • Loading branch information
Pryz committed Jul 2, 2018
1 parent dc05c0a commit 8000f96
Show file tree
Hide file tree
Showing 10 changed files with 217 additions and 122 deletions.
2 changes: 1 addition & 1 deletion batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (batch *Batch) ReadMessage() (Message, error) {
if err != nil {
return msg, err
}
return msg.decode()
return msg.Decode()
}

func (batch *Batch) readMessage(
Expand Down
40 changes: 40 additions & 0 deletions compression.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package kafka

const (
CompressionNone int8 = iota
CompressionGZIP
CompressionSnappy
CompressionLZ4
)

type CompressionCodec struct {
str func() string
encode func(src []byte) ([]byte, error)
decode func(src []byte) ([]byte, error)
}

const compressionCodecMask int8 = 0x03
const defaultCompressionLevel int = -1

func init() {
RegisterCompressionCodec(0,
func() string { return "none" },
func(src []byte) ([]byte, error) { return src, nil },
func(src []byte) ([]byte, error) { return src, nil },
)
}

func codecToStr(codec int8) string {
switch codec {
case CompressionNone:
return "none"
case CompressionGZIP:
return "gzip"
case CompressionSnappy:
return "snappy"
case CompressionLZ4:
return "lz4"
default:
return "unknown"
}
}
57 changes: 57 additions & 0 deletions compression_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package kafka_test

import (
"testing"

kafka "github.com/segmentio/kafka-go"
_ "github.com/segmentio/kafka-go/gzip"
)

func TestCompression(t *testing.T) {
msg := kafka.Message{
Value: []byte("message"),
}

testEncodeDecode(t, msg, kafka.CompressionNone)
testEncodeDecode(t, msg, kafka.CompressionGZIP)
}

func testEncodeDecode(t *testing.T, m kafka.Message, codec int8) {
var r1, r2 kafka.Message
var err error

t.Run("encode with "+codecToStr(codec), func(t *testing.T) {
m.CompressionCodec = codec
r1, err = m.Encode()
if err != nil {
t.Error(err)
}
})

t.Run("encode with "+codecToStr(codec), func(t *testing.T) {
r2, err = r1.Decode()
if err != nil {
t.Error(err)
}
if string(r2.Value) != "message" {
t.Error("bad message")
t.Log("got: ", string(m.Value))
t.Log("expected: message")
}
})
}

func codecToStr(codec int8) string {
switch codec {
case kafka.CompressionNone:
return "none"
case kafka.CompressionGZIP:
return "gzip"
case kafka.CompressionSnappy:
return "snappy"
case kafka.CompressionLZ4:
return "lz4"
default:
return "unknown"
}
}
2 changes: 1 addition & 1 deletion conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,7 @@ func (c *Conn) WriteMessages(msgs ...Message) (int, error) {
}

var err error
msg, err = msg.encode()
msg, err = msg.Encode()
if err != nil {
return 0, err
}
Expand Down
44 changes: 44 additions & 0 deletions gzip/gzip.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package gzip

import (
"bytes"
"compress/gzip"
"io/ioutil"

kafka "github.com/segmentio/kafka-go"
)

func init() {
kafka.RegisterCompressionCodec(1, String, Encode, Decode)
}

func Code() int8 {
return 1
}

func String() string {
return "gzip"
}

//TODO: compression level
func Encode(src []byte) ([]byte, error) {
var buf bytes.Buffer
writer := gzip.NewWriter(&buf)
_, err := writer.Write(src)
if err != nil {
return nil, err
}
err = writer.Close()
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}

func Decode(src []byte) ([]byte, error) {
reader, err := gzip.NewReader(bytes.NewReader(src))
if err != nil {
return nil, err
}
return ioutil.ReadAll(reader)
}
36 changes: 36 additions & 0 deletions gzip/gzip_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package gzip

import (
"bytes"
"testing"
)

func TestGzip(t *testing.T) {
var r1, r2 []byte
var err error
payload := []byte("message")

t.Run("encode", func(t *testing.T) {
r1, err = Encode(payload)
if err != nil {
t.Error(err)
}
if bytes.Equal(payload, r1) {
t.Error("failed to encode payload")
t.Log("got: ", r1)
}
})

t.Run("decode", func(t *testing.T) {
r2, err = Decode(r1)
if err != nil {
t.Error(err)
}
if !bytes.Equal(payload, r2) {
t.Error("failed to decode payload")
t.Log("expected: ", payload)
t.Log("got: ", r2)
}
})

}
111 changes: 35 additions & 76 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,25 @@ package kafka

import (
"bufio"
"bytes"
"compress/gzip"
"fmt"
"io/ioutil"
"time"

"github.com/eapache/go-xerial-snappy"
)

const compressionCodecMask int8 = 0x03
const defaultCompressionLevel int = -1

// CompressionCodec represents the compression codec available in Kafka
// See : https://cwiki.apache.org/confluence/display/KAFKA/Compression
type CompressionCodec int8
var codecs map[int8]CompressionCodec

const (
CompressionNone CompressionCodec = iota
CompressionGZIP
CompressionSnappy
CompressionLZ4
)
func RegisterCompressionCodec(code int8, str func() string, encode, decode func(src []byte) ([]byte, error)) error {
if codecs == nil {
codecs = make(map[int8]CompressionCodec)
}

func (c CompressionCodec) String() string {
switch c {
case CompressionNone:
return "none"
case CompressionGZIP:
return "gzip"
case CompressionSnappy:
return "snappy"
case CompressionLZ4:
return "lz4"
default:
return "unknown"
codecs[code] = CompressionCodec{
str: str,
encode: encode,
decode: decode,
}
return nil
}

// Message is a data structure representing kafka messages.
Expand All @@ -56,7 +39,7 @@ type Message struct {
Time time.Time

// Compression codec used to encode the message value
CompressionCodec CompressionCodec
CompressionCodec int8

// Compression level for the codec if supported (only gzip)
CompressionLevel int
Expand All @@ -83,59 +66,35 @@ func (msg Message) message() message {
return m
}

func (msg Message) encode() (Message, error) {
var err error
switch msg.CompressionCodec {
case CompressionNone:
return msg, nil
case CompressionGZIP:
var buf bytes.Buffer
var writer *gzip.Writer

if msg.CompressionLevel != defaultCompressionLevel {
writer, err = gzip.NewWriterLevel(&buf, msg.CompressionLevel)
if err != nil {
return msg, err
}
} else {
writer = gzip.NewWriter(&buf)
}
if _, err := writer.Write(msg.Value); err != nil {
return msg, err
}
if err := writer.Close(); err != nil {
return msg, err
}
msg.Value = buf.Bytes()
return msg, nil
case CompressionSnappy:
msg.Value = snappy.Encode(msg.Value)
return msg, nil
default:
return msg, fmt.Errorf("compression codec not supported.")
func (msg Message) Encode() (Message, error) {
codec, ok := codecs[msg.CompressionCodec]
if !ok {
return msg, fmt.Errorf("codec %s not imported.", codecToStr(msg.CompressionCodec))
}
}

func (msg Message) decode() (Message, error) {
var err error

codec := msg.message().Attributes & compressionCodecMask
switch CompressionCodec(codec) {
case CompressionNone:
return msg, nil
case CompressionGZIP:
reader, err := gzip.NewReader(bytes.NewReader(msg.Value))
if err != nil {
return msg, err
}
msg.Value, err = ioutil.ReadAll(reader)
encodedValue, err := codec.encode(msg.Value)
if err != nil {
return msg, err
case CompressionSnappy:
msg.Value, err = snappy.Decode(msg.Value)
}

msg.Value = encodedValue
return msg, nil
}

func (msg Message) Decode() (Message, error) {
c := msg.message().Attributes & compressionCodecMask
codec, ok := codecs[c]
if !ok {
return msg, fmt.Errorf("codec %s not imported.", codecToStr(msg.CompressionCodec))
}

decodedValue, err := codec.decode(msg.Value)
if err != nil {
return msg, err
default:
return msg, fmt.Errorf("compression codec not supported.")
}

msg.Value = decodedValue
return msg, nil
}

type message struct {
Expand Down
41 changes: 0 additions & 41 deletions message_test.go

This file was deleted.

Loading

0 comments on commit 8000f96

Please sign in to comment.