Skip to content

Commit

Permalink
Stream compression (segmentio#306)
Browse files Browse the repository at this point in the history
* implement stream compression

* fix zstd + pass benchmarks

* add comments and remove old API

* add kafka.CompressionCodec.Name()

* avoid a copy when the output buffer for decoding snappy compressed data is large enough

* fix xerial framing + validate against go-xerial-snappy

* remove unused function

* PR feedback

* cleanup APIs + improve benchmarks

* optimize gzip reader

* fix snappy compression codec

* support running decompression benchmarks alone
  • Loading branch information
Achille authored Jul 22, 2019
1 parent 03ea927 commit 59f58f0
Show file tree
Hide file tree
Showing 12 changed files with 1,129 additions and 353 deletions.
35 changes: 22 additions & 13 deletions compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,28 @@ package kafka

import (
"errors"
"io"
"sync"
)

var errUnknownCodec = errors.New("the compression code is invalid or its codec has not been imported")
const (
CompressionNoneCode = 0

var codecs = make(map[int8]CompressionCodec)
var codecsMutex sync.RWMutex
compressionCodecMask = 0x07
)

var (
errUnknownCodec = errors.New("the compression code is invalid or its codec has not been imported")

codecs = make(map[int8]CompressionCodec)
codecsMutex sync.RWMutex
)

// RegisterCompressionCodec registers a compression codec so it can be used by a Writer.
func RegisterCompressionCodec(codec func() CompressionCodec) {
c := codec()
func RegisterCompressionCodec(codec CompressionCodec) {
code := codec.Code()
codecsMutex.Lock()
codecs[c.Code()] = c
codecs[code] = codec
codecsMutex.Unlock()
}

Expand All @@ -40,12 +49,12 @@ type CompressionCodec interface {
// Code returns the compression codec code
Code() int8

// Encode encodes the src data
Encode(src []byte) ([]byte, error)
// Human-readable name for the codec.
Name() string

// Decode decodes the src data
Decode(src []byte) ([]byte, error)
}
// Constructs a new reader which decompresses data from r.
NewReader(r io.Reader) io.ReadCloser

const compressionCodecMask int8 = 0x07
const CompressionNoneCode = 0
// Constructs a new writer which writes compressed data to w.
NewWriter(w io.Writer) io.WriteCloser
}
215 changes: 138 additions & 77 deletions compression_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
package kafka_test

import (
"bytes"
compressGzip "compress/gzip"
"context"
"fmt"
"math/rand"
"io"
"io/ioutil"
"os"
"path/filepath"
"strconv"
"testing"
"text/tabwriter"
"time"

"github.com/segmentio/kafka-go"
kafka "github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/gzip"
"github.com/segmentio/kafka-go/lz4"
"github.com/segmentio/kafka-go/snappy"
Expand All @@ -29,24 +35,46 @@ func TestCompression(t *testing.T) {
}
}

func compress(codec kafka.CompressionCodec, src []byte) ([]byte, error) {
b := new(bytes.Buffer)
r := bytes.NewReader(src)
w := codec.NewWriter(b)
if _, err := io.Copy(w, r); err != nil {
w.Close()
return nil, err
}
if err := w.Close(); err != nil {
return nil, err
}
return b.Bytes(), nil
}

func decompress(codec kafka.CompressionCodec, src []byte) ([]byte, error) {
b := new(bytes.Buffer)
r := codec.NewReader(bytes.NewReader(src))
if _, err := io.Copy(b, r); err != nil {
r.Close()
return nil, err
}
if err := r.Close(); err != nil {
return nil, err
}
return b.Bytes(), nil
}

func testEncodeDecode(t *testing.T, m kafka.Message, codec kafka.CompressionCodec) {
var r1, r2 []byte
var err error
var code int8

if codec != nil {
code = codec.Code()
}

t.Run("encode with "+codecToStr(code), func(t *testing.T) {
r1, err = codec.Encode(m.Value)
t.Run("encode with "+codec.Name(), func(t *testing.T) {
r1, err = compress(codec, m.Value)
if err != nil {
t.Error(err)
}
})

t.Run("decode with "+codecToStr(code), func(t *testing.T) {
r2, err = codec.Decode(r1)
t.Run("decode with "+codec.Name(), func(t *testing.T) {
r2, err = decompress(codec, r1)
if err != nil {
t.Error(err)
}
Expand All @@ -58,23 +86,6 @@ func testEncodeDecode(t *testing.T, m kafka.Message, codec kafka.CompressionCode
})
}

func codecToStr(codec int8) string {
switch codec {
case kafka.CompressionNoneCode:
return "none"
case gzip.Code:
return "gzip"
case snappy.Code:
return "snappy"
case lz4.Code:
return "lz4"
case zstd.Code:
return "zstd"
default:
return "unknown"
}
}

func TestCompressedMessages(t *testing.T) {
testCompressedMessages(t, gzip.NewCompressionCodec())
testCompressedMessages(t, snappy.NewCompressionCodec())
Expand All @@ -86,7 +97,7 @@ func TestCompressedMessages(t *testing.T) {
}

func testCompressedMessages(t *testing.T, codec kafka.CompressionCodec) {
t.Run("produce/consume with"+codecToStr(codec.Code()), func(t *testing.T) {
t.Run("produce/consume with"+codec.Name(), func(t *testing.T) {
t.Parallel()

topic := kafka.CreateTopic(t, 1)
Expand Down Expand Up @@ -232,98 +243,148 @@ func (noopCodec) Code() int8 {
return 0
}

func (noopCodec) Encode(src []byte) ([]byte, error) {
return src, nil
func (noopCodec) Name() string {
return "none"
}

func (noopCodec) Decode(src []byte) ([]byte, error) {
return src, nil
func (noopCodec) NewReader(r io.Reader) io.ReadCloser {
return ioutil.NopCloser(r)
}

func (noopCodec) NewWriter(w io.Writer) io.WriteCloser {
return nopWriteCloser{w}
}

type nopWriteCloser struct{ io.Writer }

func (nopWriteCloser) Close() error { return nil }

func BenchmarkCompression(b *testing.B) {
benchmarks := []struct {
scenario string
codec kafka.CompressionCodec
function func(*testing.B, kafka.CompressionCodec, int, map[int][]byte)
function func(*testing.B, kafka.CompressionCodec, *bytes.Buffer, []byte) float64
}{
{
scenario: "None",
codec: &noopCodec{},
function: benchmarkCompression,
},
{
scenario: "GZIP",
codec: gzip.NewCompressionCodec(),
function: benchmarkCompression,
},
{
scenario: "Snappy",
codec: snappy.NewCompressionCodec(),
function: benchmarkCompression,
},
{
scenario: "LZ4",
codec: lz4.NewCompressionCodec(),
function: benchmarkCompression,
},
{
scenario: "zstd",
codec: zstd.NewCompressionCodec(),
function: benchmarkCompression,
},
}

payload := map[int][]byte{
1024: randomPayload(1024),
4096: randomPayload(4096),
8192: randomPayload(8192),
16384: randomPayload(16384),
f, err := os.Open(filepath.Join(os.Getenv("GOROOT"), "src/encoding/json/testdata/code.json.gz"))
if err != nil {
b.Fatal(err)
}
defer f.Close()

for _, benchmark := range benchmarks {
b.Run(benchmark.scenario+"1024", func(b *testing.B) {
benchmark.function(b, benchmark.codec, 1024, payload)
})
b.Run(benchmark.scenario+"4096", func(b *testing.B) {
benchmark.function(b, benchmark.codec, 4096, payload)
})
b.Run(benchmark.scenario+"8192", func(b *testing.B) {
benchmark.function(b, benchmark.codec, 8192, payload)
})
b.Run(benchmark.scenario+"16384", func(b *testing.B) {
benchmark.function(b, benchmark.codec, 16384, payload)
})
z, err := compressGzip.NewReader(f)
if err != nil {
b.Fatal(err)
}

}
payload, err := ioutil.ReadAll(z)
if err != nil {
b.Fatal(err)
}

func benchmarkCompression(b *testing.B, codec kafka.CompressionCodec, payloadSize int, payload map[int][]byte) {
msg := kafka.Message{
Value: payload[payloadSize],
buffer := bytes.Buffer{}
buffer.Grow(len(payload))

ts := &bytes.Buffer{}
tw := tabwriter.NewWriter(ts, 0, 8, 0, '\t', 0)
defer func() {
tw.Flush()
fmt.Printf("input => %.2f MB\n", float64(len(payload))/(1024*1024))
fmt.Println(ts)
}()

b.ResetTimer()

for i := range benchmarks {
benchmark := &benchmarks[i]
ratio := 0.0

b.Run(fmt.Sprintf("%s", benchmark.codec.Name()), func(b *testing.B) {
ratio = benchmark.function(b, benchmark.codec, &buffer, payload)
})

fmt.Fprintf(tw, " %s:\t%.2f%%\n", benchmark.codec.Name(), 100*ratio)
}
}

for i := 0; i < b.N; i++ {
m1, err := codec.Encode(msg.Value)
if err != nil {
b.Fatal(err)
func benchmarkCompression(b *testing.B, codec kafka.CompressionCodec, buf *bytes.Buffer, payload []byte) float64 {
// In case only the decompression benchmark are run, we use this flags to
// detect whether we have to compress the payload before the decompression
// benchmarks.
compressed := false

b.Run("compress", func(b *testing.B) {
compressed = true
r := bytes.NewReader(payload)

for i := 0; i < b.N; i++ {
buf.Reset()
r.Reset(payload)
w := codec.NewWriter(buf)

_, err := io.Copy(w, r)
if err != nil {
b.Fatal(err)
}
if err := w.Close(); err != nil {
b.Fatal(err)
}
}

b.SetBytes(int64(len(m1)))
b.SetBytes(int64(buf.Len()))
})

if !compressed {
r := bytes.NewReader(payload)
w := codec.NewWriter(buf)

_, err = codec.Decode(m1)
_, err := io.Copy(w, r)
if err != nil {
b.Fatal(err)
}

if err := w.Close(); err != nil {
b.Fatal(err)
}
}
}

const dataset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890"
b.Run("decompress", func(b *testing.B) {
c := bytes.NewReader(buf.Bytes())

func randomPayload(n int) []byte {
b := make([]byte, n)
for i := range b {
b[i] = dataset[rand.Intn(len(dataset))]
}
return b
for i := 0; i < b.N; i++ {
c.Reset(buf.Bytes())
r := codec.NewReader(c)

n, err := io.Copy(ioutil.Discard, r)
if err != nil {
b.Fatal(err)
}
if err := r.Close(); err != nil {
b.Fatal(err)
}

b.SetBytes(n)
}
})

return 1 - (float64(buf.Len()) / float64(len(payload)))
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.11

require (
github.com/DataDog/zstd v1.4.0
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21
github.com/golang/snappy v0.0.1
github.com/pierrec/lz4 v2.0.5+incompatible
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
github.com/DataDog/zstd v1.4.0 h1:vhoV+DUHnRZdKW1i5UMjAk2G4JY8wN4ayRfYDNdEhwo=
github.com/DataDog/zstd v1.4.0/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I=
Expand Down
Loading

0 comments on commit 59f58f0

Please sign in to comment.