Skip to content

Commit

Permalink
Refactored kafka reader
Browse files Browse the repository at this point in the history
  • Loading branch information
thehydroimpulse committed May 18, 2017
1 parent e53040c commit 7cb9bb1
Show file tree
Hide file tree
Showing 4 changed files with 361 additions and 126 deletions.
17 changes: 17 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
kafka:
image: wurstmeister/kafka
links:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_VERSION: "0.10.1.0"
KAFKA_CREATE_TOPICS: "events:2:1,test:1:1"
KAFKA_ADVERTISED_HOST_NAME: "localhost"
KAFKA_ADVERTISED_PORT: "9092"
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"

zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
266 changes: 149 additions & 117 deletions kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,187 +3,207 @@ package kafka
import (
"context"
"fmt"
"sync"
"sync/atomic"

"github.com/Shopify/sarama"
"github.com/pkg/errors"
)

// Implements the `MessageIter`.
//
// The underlying Kafka reader sends messages to the `msgs`
// channel and errors to the `err` channel. The Kafka reader is responsible
// for closing those channels.
type kafkaIter struct {
msgs chan Message
errs chan error
err error
ctx context.Context
cancel context.CancelFunc
}

func newKafkaIter(ctx context.Context, cancel context.CancelFunc, msgs chan Message, errs chan error) *kafkaIter {
return &kafkaIter{
msgs: msgs,
errs: errs,
err: nil,
ctx: ctx,
cancel: cancel,
}
}

func (iter *kafkaIter) Next(msg *Message) bool {
if iter.err != nil {
return false
}

select {
case <-iter.ctx.Done():
iter.err = appendError(iter.err, iter.ctx.Err())
return false
case err := <-iter.errs:
iter.err = appendError(iter.err, err)
return false
case val, ok := <-iter.msgs:
if !ok {
return false
}

*msg = val
}

return true
}

func (iter *kafkaIter) Close() error {
iter.cancel()
// Read the remaining messages so that the underlying reader may
// finish and return. Otherwise the goroutine will leak.
for _ = range iter.msgs {
}
for _ = range iter.errs {
}
return iter.err
}

type kafkaReader struct {
client sarama.Client
partition int32
buffer int
topic string

maxWaitTime uint
minBytes uint32
maxBytes uint32
events chan Message
errors chan error
asyncWait sync.WaitGroup
spawned bool
cancel context.CancelFunc
offset int64

maxWaitMs uint
minBytes uint32
maxBytes uint32
}

type ReaderConfig struct {
BrokerAddrs []string
// Size of the iterator channel. Setting it at 0 means the underlying consumer and the iterator are 100% in-sync.
// The consumer will only make progress as the iterator does. Setting it >0 will allow the consumer to fetch data
// potentially faster than the iterator can read.
Buffer int
Topic string
Partition uint
Topic string
Partition uint

// Kafka requests wait for `RequestMaxWaitTime` OR `RequestMinBytes`, but
// always stops at `RequestMaxBytes`.
RequestMaxWaitTime uint
RequestMinBytes uint32
RequestMaxBytes uint32
RequestMaxWaitMs uint
RequestMinBytes uint32
RequestMaxBytes uint32
}

func NewReader(config ReaderConfig) (Reader, error) {
conf := sarama.NewConfig()
conf.Version = sarama.V0_10_0_0
conf.Version = sarama.V0_10_1_0

client, err := sarama.NewClient(config.BrokerAddrs, conf)
if err != nil {
return nil, err
}

if config.RequestMaxBytes == 0 {
config.RequestMaxBytes = 1000000
}

return &kafkaReader{
client: client,
topic: config.Topic,
partition: int32(config.Partition),
buffer: config.Buffer,
maxWaitTime: config.RequestMaxWaitTime,
minBytes: config.RequestMinBytes,
maxBytes: config.RequestMaxBytes,
events: make(chan Message),
errors: make(chan error),
spawned: false,
client: client,
offset: 0,
cancel: func() {},
topic: config.Topic,
partition: int32(config.Partition),
maxWaitMs: config.RequestMaxWaitMs,
minBytes: config.RequestMinBytes,
maxBytes: config.RequestMaxBytes,
}, nil
}

// Start consuming from Kafka starting at the given `offset`. `Read` will return a sequential iterator that makes progress
// as its being read. Rewinding or needing to reset the offset will require calling `Read` again, returning a new iterator.
func (kafka *kafkaReader) Read(ctx context.Context, offset Offset) MessageIter {
messagesCh := make(chan Message, kafka.buffer)
errsCh := make(chan error, 1)
func (kafka *kafkaReader) Read(ctx context.Context) (Message, error) {
select {
case <-ctx.Done():
return Message{}, ctx.Err()
case msg := <-kafka.events:
return msg, nil
case err := <-kafka.errors:
return Message{}, err
}
}

func (kafka *kafkaReader) closeAsync() {
kafka.cancel()

// Avoid blocking the async goroutine by emptying the channels.
go func() {
for _ = range kafka.events {
}
for _ = range kafka.errors {
}
}()

// If the iterator is closed before the context is canceled it would block
// indefinitely (to flush the msgs/errs channels). The iterator will now
// cancel the context which will propagate to the `asyncFetch` goroutine.
ctx, cancel := context.WithCancel(ctx)
kafka.asyncWait.Wait()

go kafka.asyncFetch(ctx, offset, messagesCh, errsCh)
close(kafka.events)
close(kafka.errors)

return newKafkaIter(ctx, cancel, messagesCh, errsCh)
kafka.events = make(chan Message)
kafka.errors = make(chan error)
}

// Asynchronously fetch blocks of messages from Kafka, sending each message to the underlying iterator. The async consumer will progress
// in-sync with the underlying's iterator's progression. If the iterator is not being consumed from or blocks, so does the async process.
//
// Offset management is up to the consumer of the iterator to implement. The offset is incremented by the async process as messages are
// being read from Kafka but does not persist the offset in any way.
func (kafka *kafkaReader) asyncFetch(ctx context.Context, offset Offset, messagesCh chan<- Message, errsCh chan<- error) {
defer close(messagesCh)
defer close(errsCh)
func (kafka *kafkaReader) Seek(ctx context.Context, offset int64) (int64, error) {
broker, err := kafka.client.Leader(kafka.topic, kafka.partition)
if err != nil {
return 0, err
}

offset, err = kafka.getOffset(broker, offset)
if err != nil {
return 0, err
}

atomic.StoreInt64(&kafka.offset, offset)

if !kafka.spawned {
asyncCtx, cancel := context.WithCancel(ctx)
kafka.cancel = cancel

kafka.asyncWait.Add(1)
go kafka.fetchMessagesAsync(asyncCtx)
kafka.spawned = true
}

return offset, nil
}

func (kafka *kafkaReader) getOffset(broker *sarama.Broker, offset int64) (int64, error) {
// An offset of -1/-2 means the partition has no offset state associated with it yet.
// -1 = Newest Offset
// -2 = Oldest Offset
if offset == -1 || offset == -2 {
request := &sarama.OffsetRequest{Version: 1}
request.AddBlock(kafka.topic, kafka.partition, int64(offset), 1)

offsetResponse, err := broker.GetAvailableOffsets(request)
if err != nil {
return offset, errors.Wrap(err, "failed to fetch available offsets")
}

block := offsetResponse.GetBlock(kafka.topic, kafka.partition)
if block == nil {
return offset, errors.Wrap(err, "fetching available offsets returned 0 blocks")
}

if block.Err != sarama.ErrNoError {
return offset, errors.Wrap(err, "fetching available offsets failed")
}

if block.Offset != 0 {
return block.Offset - 1, nil
}

return block.Offset, nil
}

return offset, nil
}

func (kafka *kafkaReader) Offset() int64 {
return atomic.LoadInt64(&kafka.offset)
}

func (kafka *kafkaReader) fetchMessagesAsync(ctx context.Context) {
defer kafka.asyncWait.Done()

for {
select {
default:
break
case <-ctx.Done():
return
default:
}

// Find the broker that is the given partition's leader. Failure to fetch the leader is either
// the result of an invalid topic/partition OR the broker/leader is unavailable. This can happen
// due to a leader election happening (and thus the leader has changed).
broker, err := kafka.client.Leader(kafka.topic, kafka.partition)
if err != nil {
errsCh <- err
kafka.errors <- errors.Wrap(err, "failed to find leader for topic/partition")
continue
}

// The request will wait at most `maxWaitTime` (milliseconds) OR at most `minBytes`,
// The request will wait at most `maxWaitMs` (milliseconds) OR at most `minBytes`,
// which ever happens first.
request := sarama.FetchRequest{
MaxWaitTime: int32(kafka.maxWaitTime),
MaxWaitTime: int32(kafka.maxWaitMs),
MinBytes: int32(kafka.minBytes),
}

request.AddBlock(kafka.topic, kafka.partition, int64(offset), int32(kafka.maxBytes))
offset := atomic.LoadInt64(&kafka.offset)

request.AddBlock(kafka.topic, kafka.partition, offset, int32(kafka.maxBytes))
res, err := broker.Fetch(&request)
if err != nil {
errsCh <- errors.Wrap(err, "kafka reader failed to fetch a block")
kafka.errors <- errors.Wrap(err, "kafka reader failed to fetch a block")
continue
}

block, ok := res.Blocks[kafka.topic]
if !ok {
continue
}

// The only way Kafka does _not_ return a block is if the
// partition is invalid.
partition, ok := block[kafka.partition]
if !ok {
errsCh <- fmt.Errorf("kafka partition is invalid (partition: %d)", kafka.partition)
partition := res.GetBlock(kafka.topic, kafka.partition)
if partition == nil {
kafka.errors <- fmt.Errorf("kafka topic/partition is invalid (topic: %s, partition: %d)", kafka.topic, kafka.partition)
continue
}

// Possible errors: https://godoc.org/github.com/Shopify/sarama#KError
if partition.Err != sarama.ErrNoError {
errsCh <- errors.Wrap(partition.Err, "kafka block returned an error")
kafka.errors <- errors.Wrap(partition.Err, "kafka block returned an error")
continue
}

Expand All @@ -193,14 +213,25 @@ func (kafka *kafkaReader) asyncFetch(ctx context.Context, offset Offset, message
// This doesn't commit the offset in any way, it only allows the iterator to continue to
// make progress.
msgSet := partition.MsgSet.Messages
offset = Offset(msgSet[len(msgSet)-1].Offset)

if len(msgSet) == 0 {
continue
}

// Bump the current offset to the last offset in the message set. The new offset will
// be used the next time we fetch a block from Kafka.
//
// This doesn't commit the offset in any way, it only allows the iterator to continue to
// make progress.
offset = msgSet[len(msgSet)-1].Offset + 1
atomic.StoreInt64(&kafka.offset, offset)

for _, msg := range msgSet {
// Give the message to the iterator. This will block if the consumer of the iterator
// is blocking or not calling `.Next(..)`. This allows the Kafka reader to stay in-sync
// with the consumer.
messagesCh <- Message{
Offset: Offset(msg.Offset),
kafka.events <- Message{
Offset: msg.Offset,
Key: msg.Msg.Key,
Value: msg.Msg.Value,
}
Expand All @@ -211,5 +242,6 @@ func (kafka *kafkaReader) asyncFetch(ctx context.Context, offset Offset, message
// Shutdown the Kafka client. The Kafka reader does not persist the offset
// when closing down and thus any iterator progress will be lost.
func (kafka *kafkaReader) Close() (err error) {
kafka.closeAsync()
return kafka.client.Close()
}
Loading

0 comments on commit 7cb9bb1

Please sign in to comment.