Skip to content

Commit

Permalink
CompressionCodec interface
Browse files Browse the repository at this point in the history
  • Loading branch information
Pryz committed Jul 3, 2018
1 parent 262746a commit 8ba51bd
Show file tree
Hide file tree
Showing 9 changed files with 119 additions and 79 deletions.
36 changes: 22 additions & 14 deletions compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,33 @@ const (

// CompressionCodec represents a compression codec to encode and decode
// the messages.
type CompressionCodec struct {
str func() string
encode func(dst, src []byte) (int, error)
decode func(dst, src []byte) (int, error)
type CompressionCodec interface {
String() string
Encode(dst, src []byte) (int, error)
Decode(dst, src []byte) (int, error)
}

const compressionCodecMask int8 = 0x03
const defaultCompressionLevel int = -1
const DefaultCompressionLevel int = -1

func init() {
RegisterCompressionCodec(0,
func() string { return "none" },
func(dst, src []byte) (int, error) {
return copy(dst, src), nil
},
func(dst, src []byte) (int, error) {
return copy(dst, src), nil
},
)
RegisterCompressionCodec(0, func() CompressionCodec {
return CompressionCodecNone{}
})
}

type CompressionCodecNone struct{}

func (c CompressionCodecNone) String() string {
return "none"
}

func (c CompressionCodecNone) Encode(dst, src []byte) (int, error) {
return copy(dst, src), nil
}

func (c CompressionCodecNone) Decode(dst, src []byte) (int, error) {
return copy(dst, src), nil
}

func codecToStr(codec int8) string {
Expand Down
47 changes: 32 additions & 15 deletions gzip/gzip.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,30 @@ import (
)

func init() {
kafka.RegisterCompressionCodec(1, String, Encode, Decode)
kafka.RegisterCompressionCodec(1, func() kafka.CompressionCodec {
return CompressionCodec{
CompressionLevel: kafka.DefaultCompressionLevel,
}
})
}

func String() string {
return "gzip"
type CompressionCodec struct {
CompressionLevel int
}

type buffer struct {
data []byte
size int
func NewCompressionCodec(level int) CompressionCodec {
return CompressionCodec{
CompressionLevel: level,
}
}

func (buf *buffer) Write(b []byte) (int, error) {
n := copy(buf.data[buf.size:], b)
buf.size += n
if n != len(b) {
return n, bytes.ErrTooLarge
}
return n, nil
// String implements the kafka.CompressionCodec interface.
func (c CompressionCodec) String() string {
return "gzip"
}

func Encode(dst, src []byte) (int, error) {
// Encode implements the kafka.CompressionCodec interface.
func (c CompressionCodec) Encode(dst, src []byte) (int, error) {
var buf bytes.Buffer
writer := gzip.NewWriter(&buf)
_, err := writer.Write(src)
Expand All @@ -48,7 +50,8 @@ func Encode(dst, src []byte) (int, error) {
return int(n), err
}

func Decode(dst, src []byte) (int, error) {
// Decode implements the kafka.CompressionCodec interface.
func (c CompressionCodec) Decode(dst, src []byte) (int, error) {
reader, err := gzip.NewReader(bytes.NewReader(src))
if err != nil {
return 0, err
Expand All @@ -62,3 +65,17 @@ func Decode(dst, src []byte) (int, error) {
}
return buf.Write(data)
}

type buffer struct {
data []byte
size int
}

func (buf *buffer) Write(b []byte) (int, error) {
n := copy(buf.data[buf.size:], b)
buf.size += n
if n != len(b) {
return n, bytes.ErrTooLarge
}
return n, nil
}
6 changes: 4 additions & 2 deletions gzip/gzip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ func TestGzip(t *testing.T) {
r1 := make([]byte, 6*len(payload))
r2 := make([]byte, len(payload))

c := CompressionCodec{}

t.Run("encode", func(t *testing.T) {
n, err := Encode(r1, payload)
n, err := c.Encode(r1, payload)
if err != nil {
t.Error(err)
}
Expand All @@ -23,7 +25,7 @@ func TestGzip(t *testing.T) {
})

t.Run("decode", func(t *testing.T) {
n, err := Decode(r2, r1)
n, err := c.Decode(r2, r1)
if err != nil {
t.Error(err)
}
Expand Down
43 changes: 25 additions & 18 deletions lz4/lz4.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,20 @@ import (
)

func init() {
kafka.RegisterCompressionCodec(3, String, Encode, Decode)
}

func String() string {
return "lz4"
kafka.RegisterCompressionCodec(3, func() kafka.CompressionCodec {
return CompressionCodec{}
})
}

type buffer struct {
data []byte
size int
}
type CompressionCodec struct{}

func (buf *buffer) Write(b []byte) (int, error) {
n := copy(buf.data[buf.size:], b)
buf.size += n
if n != len(b) {
return n, bytes.ErrTooLarge
}
return n, nil
// String implements the kafka.CompressionCodec interface.
func (c CompressionCodec) String() string {
return "lz4"
}

func Encode(dst, src []byte) (int, error) {
// Encode implements the kafka.CompressionCodec interface.
func (c CompressionCodec) Encode(dst, src []byte) (int, error) {
var buf bytes.Buffer
writer := lz4.NewWriter(&buf)
_, err := writer.Write(src)
Expand All @@ -49,7 +41,8 @@ func Encode(dst, src []byte) (int, error) {
return int(n), err
}

func Decode(dst, src []byte) (int, error) {
// Decode implements the kafka.CompressionCodec interface.
func (c CompressionCodec) Decode(dst, src []byte) (int, error) {
reader := lz4.NewReader(bytes.NewReader(src))
data, err := ioutil.ReadAll(reader)
if err != nil {
Expand All @@ -60,3 +53,17 @@ func Decode(dst, src []byte) (int, error) {
}
return buf.Write(data)
}

type buffer struct {
data []byte
size int
}

func (buf *buffer) Write(b []byte) (int, error) {
n := copy(buf.data[buf.size:], b)
buf.size += n
if n != len(b) {
return n, bytes.ErrTooLarge
}
return n, nil
}
6 changes: 4 additions & 2 deletions lz4/lz4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ func TestLZ4(t *testing.T) {
r1 := make([]byte, 6*len(payload))
r2 := make([]byte, len(payload))

c := CompressionCodec{}

t.Run("encode", func(t *testing.T) {
n, err := Encode(r1, payload)
n, err := c.Encode(r1, payload)
if err != nil {
t.Error(err)
}
Expand All @@ -23,7 +25,7 @@ func TestLZ4(t *testing.T) {
})

t.Run("decode", func(t *testing.T) {
n, err := Decode(r2, r1)
n, err := c.Decode(r2, r1)
if err != nil {
t.Error(err)
}
Expand Down
13 changes: 4 additions & 9 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,12 @@ import (
var codecs map[int8]CompressionCodec

// RegisterCompressionCodec registers a compression codec so it can be used by a Writer.
func RegisterCompressionCodec(code int8, str func() string, encode, decode func(dst, src []byte) (int, error)) error {
func RegisterCompressionCodec(code int8, codec func() CompressionCodec) {
if codecs == nil {
codecs = make(map[int8]CompressionCodec)
}

codecs[code] = CompressionCodec{
str: str,
encode: encode,
decode: decode,
}
return nil
codecs[code] = codec()
}

// Message is a data structure representing kafka messages.
Expand Down Expand Up @@ -75,7 +70,7 @@ func (msg Message) Encode() (Message, error) {
if !ok {
return msg, fmt.Errorf("codec %s not imported.", codecToStr(msg.CompressionCodec))
}
msg.Value, err = transform(msg.Value, codec.encode)
msg.Value, err = transform(msg.Value, codec.Encode)
return msg, err
}

Expand All @@ -87,7 +82,7 @@ func (msg Message) Decode() (Message, error) {
if !ok {
return msg, fmt.Errorf("codec %s not imported.", codecToStr(msg.CompressionCodec))
}
msg.Value, err = transform(msg.Value, codec.decode)
msg.Value, err = transform(msg.Value, codec.Decode)
return msg, err
}

Expand Down
39 changes: 23 additions & 16 deletions snappy/snappy.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,34 @@ import (
)

func init() {
kafka.RegisterCompressionCodec(2, String, Encode, Decode)
kafka.RegisterCompressionCodec(2, func() kafka.CompressionCodec {
return CompressionCodec{}
})
}

func String() string {
type CompressionCodec struct{}

// String implements the kafka.CompressionCodec interface.
func (c CompressionCodec) String() string {
return "snappy"
}

// Encode implements the kafka.CompressionCodec interface.
func (c CompressionCodec) Encode(dst, src []byte) (int, error) {
buf := buffer{data: dst}
return buf.Write(snappy.Encode(src))
}

// Decode implements the kafka.CompressionCodec interface.
func (c CompressionCodec) Decode(dst, src []byte) (int, error) {
buf := buffer{data: dst}
data, err := snappy.Decode(src)
if err != nil {
return 0, err
}
return buf.Write(data)
}

type buffer struct {
data []byte
size int
Expand All @@ -28,17 +49,3 @@ func (buf *buffer) Write(b []byte) (int, error) {
}
return n, nil
}

func Encode(dst, src []byte) (int, error) {
buf := buffer{data: dst}
return buf.Write(snappy.Encode(src))
}

func Decode(dst, src []byte) (int, error) {
buf := buffer{data: dst}
data, err := snappy.Decode(src)
if err != nil {
return 0, err
}
return buf.Write(data)
}
6 changes: 4 additions & 2 deletions snappy/snappy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ func TestSnappy(t *testing.T) {
r1 := make([]byte, 6*len(payload))
r2 := make([]byte, len(payload))

c := CompressionCodec{}

t.Run("encode", func(t *testing.T) {
n, err := Encode(r1, payload)
n, err := c.Encode(r1, payload)
if err != nil {
t.Error(err)
}
Expand All @@ -23,7 +25,7 @@ func TestSnappy(t *testing.T) {
})

t.Run("decode", func(t *testing.T) {
n, err := Decode(r2, r1)
n, err := c.Decode(r2, r1)
if err != nil {
t.Error(err)
}
Expand Down
2 changes: 1 addition & 1 deletion writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func NewWriter(config WriterConfig) *Writer {
}

if config.CompressionLevel == 0 {
config.CompressionLevel = defaultCompressionLevel
config.CompressionLevel = DefaultCompressionLevel
}

w := &Writer{
Expand Down

0 comments on commit 8ba51bd

Please sign in to comment.