Skip to content

Commit

Permalink
Add snappy compression
Browse files Browse the repository at this point in the history
  • Loading branch information
Pryz committed Jun 26, 2018
1 parent 536dc15 commit 1630fbe
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 4 deletions.
25 changes: 21 additions & 4 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,19 @@ import (
"time"
)

const compressionCodecMask int8 = 0x03

// CompressionCodec represents the compression codec available in Kafka
// See : https://cwiki.apache.org/confluence/display/KAFKA/Compression
type CompressionCodec int8

const (
CompressionNone CompressionCodec = iota
CompressionGZIP
CompressionSnappy
CompressionLZ4
)

// Message is a data structure representing kafka messages.
type Message struct {
// Topic is reads only and MUST NOT be set when writing messages
Expand All @@ -19,6 +32,9 @@ type Message struct {
// If not set at the creation, Time will be automatically set when
// writing the message.
Time time.Time

// Compression codec used by the message. Default to none.
CompressionCodec int8
}

func (msg Message) item() messageSetItem {
Expand All @@ -32,10 +48,11 @@ func (msg Message) item() messageSetItem {

func (msg Message) message() message {
m := message{
MagicByte: 1,
Key: msg.Key,
Value: msg.Value,
Timestamp: timestamp(msg.Time),
MagicByte: 1,
Key: msg.Key,
Value: msg.Value,
Timestamp: timestamp(msg.Time),
Attributes: msg.CompressionCodec & compressionCodecMask,
}
m.CRC = m.crc32()
return m
Expand Down
3 changes: 3 additions & 0 deletions writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ type WriterConfig struct {
// whether the messages were written to kafka.
Async bool

// CompressionCodec set the codec to be used to compress Kafka messages.
CompressionCodec int8

newPartitionWriter func(partition int, config WriterConfig, stats *writerStats) partitionWriter
}

Expand Down

0 comments on commit 1630fbe

Please sign in to comment.