Skip to content

Commit

Permalink
clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
Pryz committed Jul 6, 2018
1 parent 21a7974 commit e1db756
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 57 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ Compression can be enable on the writer :
w := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"localhost:9092"},
Topic: "topic-A",
CompressionCodecopic: kafka.CompressionSnappy,
CompressionCodec: kafka.CompressionSnappy,
})
```

Expand Down
25 changes: 19 additions & 6 deletions compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,22 @@ const (
CompressionLZ4
)

var codecs map[int8]CompressionCodec

// RegisterCompressionCodec registers a compression codec so it can be used by a Writer.
func RegisterCompressionCodec(code int8, codec func() CompressionCodec) {
if codecs == nil {
codecs = make(map[int8]CompressionCodec)
}

codecs[code] = codec()
}

// CompressionCodec represents a compression codec to encode and decode
// the messages.
// See : https://cwiki.apache.org/confluence/display/KAFKA/Compression
type CompressionCodec interface {
String() string
Code() int8
Encode(dst, src []byte) (int, error)
Decode(dst, src []byte) (int, error)
}
Expand All @@ -19,15 +31,16 @@ const compressionCodecMask int8 = 0x03
const DefaultCompressionLevel int = -1

func init() {
RegisterCompressionCodec(0, func() CompressionCodec {
return CompressionCodecNone{}
codec := CompressionCodecNone{}
RegisterCompressionCodec(codec.Code(), func() CompressionCodec {
return codec
})
}

type CompressionCodecNone struct{}

func (c CompressionCodecNone) String() string {
return "none"
func (c CompressionCodecNone) Code() int8 {
return 0
}

func (c CompressionCodecNone) Encode(dst, src []byte) (int, error) {
Expand All @@ -38,7 +51,7 @@ func (c CompressionCodecNone) Decode(dst, src []byte) (int, error) {
return copy(dst, src), nil
}

func codecToStr(codec int8) string {
func Codec(codec int8) string {
switch codec {
case CompressionNone:
return "none"
Expand Down
19 changes: 2 additions & 17 deletions compression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ func testEncodeDecode(t *testing.T, m kafka.Message, codec int8) {
var r1, r2 kafka.Message
var err error

t.Run("encode with "+codecToStr(codec), func(t *testing.T) {
t.Run("encode with "+kafka.Codec(codec), func(t *testing.T) {
m.CompressionCodec = codec
r1, err = m.Encode()
if err != nil {
t.Error(err)
}
})
t.Run("encode with "+codecToStr(codec), func(t *testing.T) {
t.Run("encode with "+kafka.Codec(codec), func(t *testing.T) {
r2, err = r1.Decode()
if err != nil {
t.Error(err)
Expand All @@ -57,18 +57,3 @@ func testUnknownCodec(t *testing.T, m kafka.Message, codec int8) {
}
})
}

func codecToStr(codec int8) string {
switch codec {
case kafka.CompressionNone:
return "none"
case kafka.CompressionGZIP:
return "gzip"
case kafka.CompressionSnappy:
return "snappy"
case kafka.CompressionLZ4:
return "lz4"
default:
return "unknown"
}
}
19 changes: 11 additions & 8 deletions gzip/gzip.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,29 @@ import (
)

func init() {
kafka.RegisterCompressionCodec(1, func() kafka.CompressionCodec {
return CompressionCodec{
CompressionLevel: kafka.DefaultCompressionLevel,
}
codec := NewCompressionCodec()
kafka.RegisterCompressionCodec(codec.Code(), func() kafka.CompressionCodec {
return codec
})
}

type CompressionCodec struct {
CompressionLevel int
}

func NewCompressionCodec(level int) CompressionCodec {
func NewCompressionCodec() CompressionCodec {
return NewCompressionCodecWith(kafka.DefaultCompressionLevel)
}

func NewCompressionCodecWith(level int) CompressionCodec {
return CompressionCodec{
CompressionLevel: level,
}
}

// String implements the kafka.CompressionCodec interface.
func (c CompressionCodec) String() string {
return "gzip"
// Code implements the kafka.CompressionCodec interface.
func (c CompressionCodec) Code() int8 {
return 1
}

// Encode implements the kafka.CompressionCodec interface.
Expand Down
15 changes: 10 additions & 5 deletions lz4/lz4.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,21 @@ import (
)

func init() {
kafka.RegisterCompressionCodec(3, func() kafka.CompressionCodec {
return CompressionCodec{}
codec := NewCompressionCodec()
kafka.RegisterCompressionCodec(codec.Code(), func() kafka.CompressionCodec {
return codec
})
}

type CompressionCodec struct{}

// String implements the kafka.CompressionCodec interface.
func (c CompressionCodec) String() string {
return "lz4"
func NewCompressionCodec() CompressionCodec {
return CompressionCodec{}
}

// Code implements the kafka.CompressionCodec interface.
func (c CompressionCodec) Code() int8 {
return 3
}

// Encode implements the kafka.CompressionCodec interface.
Expand Down
17 changes: 2 additions & 15 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,6 @@ import (
"time"
)

// CompressionCodec represents the compression codec available in Kafka
// See : https://cwiki.apache.org/confluence/display/KAFKA/Compression
var codecs map[int8]CompressionCodec

// RegisterCompressionCodec registers a compression codec so it can be used by a Writer.
func RegisterCompressionCodec(code int8, codec func() CompressionCodec) {
if codecs == nil {
codecs = make(map[int8]CompressionCodec)
}

codecs[code] = codec()
}

// Message is a data structure representing kafka messages.
type Message struct {
// Topic is reads only and MUST NOT be set when writing messages
Expand Down Expand Up @@ -68,7 +55,7 @@ func (msg Message) Encode() (Message, error) {
var err error
codec, ok := codecs[msg.CompressionCodec]
if !ok {
return msg, fmt.Errorf("codec %s not imported.", codecToStr(msg.CompressionCodec))
return msg, fmt.Errorf("codec %s not imported.", Codec(msg.CompressionCodec))
}
msg.Value, err = transform(msg.Value, codec.Encode)
return msg, err
Expand All @@ -80,7 +67,7 @@ func (msg Message) Decode() (Message, error) {
c := msg.message().Attributes & compressionCodecMask
codec, ok := codecs[c]
if !ok {
return msg, fmt.Errorf("codec %s not imported.", codecToStr(msg.CompressionCodec))
return msg, fmt.Errorf("codec %s not imported.", Codec(msg.CompressionCodec))
}
msg.Value, err = transform(msg.Value, codec.Decode)
return msg, err
Expand Down
15 changes: 10 additions & 5 deletions snappy/snappy.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,21 @@ import (
)

func init() {
kafka.RegisterCompressionCodec(2, func() kafka.CompressionCodec {
return CompressionCodec{}
codec := NewCompressionCodec()
kafka.RegisterCompressionCodec(codec.Code(), func() kafka.CompressionCodec {
return codec
})
}

type CompressionCodec struct{}

// String implements the kafka.CompressionCodec interface.
func (c CompressionCodec) String() string {
return "snappy"
func NewCompressionCodec() CompressionCodec {
return CompressionCodec{}
}

// Code implements the kafka.CompressionCodec interface.
func (c CompressionCodec) Code() int8 {
return 2
}

// Encode implements the kafka.CompressionCodec interface.
Expand Down

0 comments on commit e1db756

Please sign in to comment.