Skip to content

Commit

Permalink
Add CompressionLevel
Browse files Browse the repository at this point in the history
  • Loading branch information
Pryz committed Jul 2, 2018
1 parent fd9e299 commit 4b43636
Show file tree
Hide file tree
Showing 8 changed files with 26 additions and 10 deletions.
4 changes: 2 additions & 2 deletions compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const (

type CompressionCodec struct {
str func() string
encode func(src []byte) ([]byte, error)
encode func(src []byte, level int) ([]byte, error)
decode func(src []byte) ([]byte, error)
}

Expand All @@ -19,7 +19,7 @@ const defaultCompressionLevel int = -1
func init() {
RegisterCompressionCodec(0,
func() string { return "none" },
func(src []byte) ([]byte, error) { return src, nil },
func(src []byte, level int) ([]byte, error) { return src, nil },
func(src []byte) ([]byte, error) { return src, nil },
)
}
Expand Down
14 changes: 14 additions & 0 deletions compression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ func TestCompression(t *testing.T) {
testEncodeDecode(t, msg, kafka.CompressionGZIP)
testEncodeDecode(t, msg, kafka.CompressionSnappy)
testEncodeDecode(t, msg, kafka.CompressionLZ4)
testUnknownCodec(t, msg, 42)
}

func testEncodeDecode(t *testing.T, m kafka.Message, codec int8) {
Expand Down Expand Up @@ -45,6 +46,19 @@ func testEncodeDecode(t *testing.T, m kafka.Message, codec int8) {
})
}

func testUnknownCodec(t *testing.T, m kafka.Message, codec int8) {
t.Run("unknown codec", func(t *testing.T) {
expectedErr := "codec unknown not imported."
m.CompressionCodec = codec
_, err := m.Encode()
if err.Error() != expectedErr {
t.Error("wrong error")
t.Log("got: ", err)
t.Error("expected: ", expectedErr)
}
})
}

func codecToStr(codec int8) string {
switch codec {
case kafka.CompressionNone:
Expand Down
3 changes: 1 addition & 2 deletions gzip/gzip.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ func String() string {
return "gzip"
}

//TODO: compression level
func Encode(src []byte) ([]byte, error) {
func Encode(src []byte, level int) ([]byte, error) {
var buf bytes.Buffer
writer := gzip.NewWriter(&buf)
_, err := writer.Write(src)
Expand Down
2 changes: 1 addition & 1 deletion gzip/gzip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ func TestGzip(t *testing.T) {
payload := []byte("message")

t.Run("encode", func(t *testing.T) {
r1, err = Encode(payload)
r1, err = Encode(payload, 1)
if err != nil {
t.Error(err)
}
Expand Down
2 changes: 1 addition & 1 deletion lz4/lz4.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func String() string {
return "lz4"
}

func Encode(src []byte) ([]byte, error) {
func Encode(src []byte, level int) ([]byte, error) {
var buf bytes.Buffer
writer := lz4.NewWriter(&buf)
_, err := writer.Write(src)
Expand Down
4 changes: 2 additions & 2 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
// See : https://cwiki.apache.org/confluence/display/KAFKA/Compression
var codecs map[int8]CompressionCodec

func RegisterCompressionCodec(code int8, str func() string, encode, decode func(src []byte) ([]byte, error)) error {
func RegisterCompressionCodec(code int8, str func() string, encode func(src []byte, level int) ([]byte, error), decode func(src []byte) ([]byte, error)) error {
if codecs == nil {
codecs = make(map[int8]CompressionCodec)
}
Expand Down Expand Up @@ -72,7 +72,7 @@ func (msg Message) Encode() (Message, error) {
return msg, fmt.Errorf("codec %s not imported.", codecToStr(msg.CompressionCodec))
}

encodedValue, err := codec.encode(msg.Value)
encodedValue, err := codec.encode(msg.Value, msg.CompressionLevel)
if err != nil {
return msg, err
}
Expand Down
2 changes: 1 addition & 1 deletion snappy/snappy.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func String() string {
return "snappy"
}

func Encode(src []byte) ([]byte, error) {
func Encode(src []byte, level int) ([]byte, error) {
return snappy.Encode(src), nil
}

Expand Down
5 changes: 4 additions & 1 deletion writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,10 @@ func NewWriter(config WriterConfig) *Writer {
config.RebalanceInterval = 15 * time.Second
}

if config.CompressionLevel == 0 {
config.CompressionLevel = defaultCompressionLevel
}

w := &Writer{
config: config,
msgs: make(chan writerMessage, config.QueueCapacity),
Expand Down Expand Up @@ -266,7 +270,6 @@ func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error {
}

for _, msg := range msgs {
//TODO: is it the right place for the compression configuration ?
msg.CompressionCodec = w.config.CompressionCodec
msg.CompressionLevel = w.config.CompressionLevel

Expand Down

0 comments on commit 4b43636

Please sign in to comment.