Skip to content

Commit

Permalink
Added support for message set compression (segmentio#135)
Browse files Browse the repository at this point in the history
  • Loading branch information
stevevls authored Nov 5, 2018
1 parent f38dc45 commit fd74aea
Show file tree
Hide file tree
Showing 18 changed files with 532 additions and 460 deletions.
63 changes: 25 additions & 38 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ type Batch struct {
mutex sync.Mutex
conn *Conn
lock *sync.Mutex
reader *bufio.Reader
msgs *messageSetReader
deadline time.Time
throttle time.Duration
remain int
topic string
partition int
offset int64
Expand Down Expand Up @@ -65,7 +64,9 @@ func (batch *Batch) close() (err error) {

batch.conn = nil
batch.lock = nil
batch.discard(batch.remain)
if batch.msgs != nil {
batch.msgs.discard()
}

if err = batch.err; err == io.EOF {
err = nil
Expand Down Expand Up @@ -166,10 +167,7 @@ func (batch *Batch) ReadMessage() (Message, error) {
msg.Offset = offset
msg.Time = timestampToTime(timestamp)

if err != nil {
return msg, err
}
return msg.decode()
return msg, err
}

func (batch *Batch) readMessage(
Expand All @@ -180,47 +178,36 @@ func (batch *Batch) readMessage(
return
}

offset, timestamp, batch.remain, err = readMessage(
batch.reader,
batch.remain,
batch.offset,
key, val,
)

offset, timestamp, err = batch.msgs.readMessage(batch.offset, key, val)
switch err {
case nil:
batch.offset = offset + 1
case errShortRead:
// As an "optimization" kafka truncates the returned response after
// producing MaxBytes, which could then cause the code to return
// errShortRead.
err = batch.discard(batch.remain)
err = batch.msgs.discard()
switch {
case err != nil:
batch.err = err
case batch.msgs.remaining() == 0:
// Because we use the adjusted deadline we could end up returning
// before the actual deadline occurred. This is necessary otherwise
// timing out the connection for real could end up leaving it in an
// unpredictable state, which would require closing it.
// This design decision was made to maximize the chances of keeping
// the connection open, the trade off being to lose precision on the
// read deadline management.
if !batch.deadline.IsZero() && time.Now().After(batch.deadline) {
err = RequestTimedOut
} else {
err = io.EOF
}
batch.err = err
}
default:
batch.err = err
}

return
}

func (batch *Batch) discard(n int) (err error) {
batch.remain, err = discardN(batch.reader, batch.remain, n)
switch {
case err != nil:
batch.err = err
case batch.err == nil && batch.remain == 0:
// Because we use the adjusted deadline we could end up returning
// before the actual deadline occurred. This is necessary otherwise
// timing out the connection for real could end up leaving it in an
// unpredictable state, which would require closing it.
// This design decision was main to maximize the changes of keeping
// the connection open, the trade off being to lose precision on the
// read deadline management.
if !batch.deadline.IsZero() && time.Now().After(batch.deadline) {
err = RequestTimedOut
} else {
err = io.EOF
}
batch.err = err
}
return
}
34 changes: 25 additions & 9 deletions compression.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package kafka

import "sync"
import (
"errors"
"sync"
)

var errUnknownCodec = errors.New("invalid codec")

var codecs = make(map[int8]CompressionCodec)
var codecsMutex sync.RWMutex
Expand All @@ -13,22 +18,33 @@ func RegisterCompressionCodec(codec func() CompressionCodec) {
codecsMutex.Unlock()
}

// resolveCodec looks up a codec by Code()
func resolveCodec(code int8) (codec CompressionCodec, err error) {
codecsMutex.RLock()
codec = codecs[code]
codecsMutex.RUnlock()

if codec == nil {
err = errUnknownCodec
}
return
}

// CompressionCodec represents a compression codec to encode and decode
// the messages.
// See : https://cwiki.apache.org/confluence/display/KAFKA/Compression
//
// A CompressionCodec must be safe for concurrent access by multiple go
// routines.
type CompressionCodec interface {
// Code returns the compression codec code
Code() int8

// Encode encodes the src data and writes the result to dst.
// If ths destination buffer is too small, the function should
// return the bytes.ErrToolarge error.
Encode(dst, src []byte) (int, error)
// Encode encodes the src data
Encode(src []byte) ([]byte, error)

// Decode decodes the src data and writes the result to dst.
// If ths destination buffer is too small, the function should
// return the bytes.ErrToolarge error.
Decode(dst, src []byte) (int, error)
// Decode decodes the src data
Decode(src []byte) ([]byte, error)
}

const compressionCodecMask int8 = 0x03
Expand Down
Loading

0 comments on commit fd74aea

Please sign in to comment.