Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
thehydroimpulse committed May 9, 2017
0 parents commit 30cd00f
Show file tree
Hide file tree
Showing 5 changed files with 392 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
vendor/**/
119 changes: 119 additions & 0 deletions Readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
# Kafka Go

(Mostly just some ideas on potential abstractions) -- Feel free to send PRs

Kafka library in Go. This builds on Sarama but offers a few differences:

- Doesn't use Kafka consumer groups.
- Exposes an iterator-based API
- Builds a low-level interface that can be composed with higher-level components.

# Getting Started

**Consuming messages from a single partition:**

```golang
import "github.com/segmentio/kafka-go"

reader, err := kafka.NewReader(ReaderConfig{
BrokerAddrs: []string{"localhost:9092"},
Topic: "events",
Partition: 0,
})

if err != nil {
panic(err)
}

iter := reader.Read(context.Background(), kafka.Offset(0))

var msg kafka.Message
for iter.Next(&msg) {
fmt.Printf("offset: %d, key: %s, value: %s\n", int64(msg.Offset), string(msg.Key), string(msg.Value))
}
```

**Consuming from all partitions:**

```golang
import "github.com/segmentio/kafka-go"

group, err := kafka.NewConsulGroup(ConsulConfig{
Name: "my-consumer-group",
Addr: "localhost:8500",
})

reader, err := group.NewReader(ReaderConfig{
BrokerAddrs: []string{"localhost:9092"},
Topic: "events",
Partitions: 100,
})
```

Each reader will consume from a single partition. If you have 100 partitions you would need 100 readers. These can be spread across instances if you want.

**Consume from multiple readers:**

Iterators are naturally composable.

```golang
import "github.com/segmentio/kafka-go"

group, err := kafka.NewConsulGroup(ConsulConfig{
Name: "my-consumer-group",
Addr: "localhost:8500",
})

readers := []kafka.Reader{}
iterators := []kakfa.MessageIter{}

for i := 0; i < 100; i++ {
reader, err := group.NewReader(ReaderConfig{
BrokerAddrs: []string{"localhost:9092"},
Topic: "events",
Partitions: 100,
})

iter := reader.Read(context.Background(), kafka.Offset(0))

readers = append(readers, reader)
iterators = append(iterators, iter)
}

iter := kafka.NewMultiIter(iterators)

var msg kafka.Message
for iter.Next(&msg) {
// ...
}
```

**Spread partitions across consumers:**

So you have N consumers and K partitions that you want to spread around?

```golang
group, err := kafka.NewConsulGroup(ConsulConfig{
Name: "my-consumer-group",
Scheme: kafka.Spread
Addr: "localhost:8500",
})

readers, err := group.NewReaders(...)

iterators := []kafka.MessageIter{}
for reader := range readers {
iterators = append(iterators, reader.Read(context.Background(), kafka.Offset(0))
}

iter := kafka.NewMultiIter(iterators)

var msg kafka.Message
for iter.Next(&msg) {
// ...
}
```

# License

MIT
36 changes: 36 additions & 0 deletions error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package kafka

import "strings"

type errorList []error

func (errors errorList) Error() string {
switch len(errors) {
case 0:
return ""
case 1:
return errors[0].Error()
default:
s := make([]string, len(errors))
for i, e := range errors {
s[i] = e.Error()
}
return strings.Join(s, ": ")
}
}

func appendError(to error, err error) error {
if err == nil {
return to
}

if to == nil {
return err
}

if errlist, ok := to.(errorList); ok {
return append(errlist, err)
}

return errorList{to, err}
}
215 changes: 215 additions & 0 deletions kafka.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
package kafka

import (
"context"
"fmt"

"github.com/Shopify/sarama"
"github.com/pkg/errors"
)

// Implements the `MessageIter`.
//
// The underlying Kafka reader sends messages to the `msgs`
// channel and errors to the `err` channel. The Kafka reader is responsible
// for closing those channels.
type kafkaIter struct {
msgs chan Message
errs chan error
err error
ctx context.Context
cancel context.CancelFunc
}

func newKafkaIter(ctx context.Context, cancel context.CancelFunc, msgs chan Message, errs chan error) *kafkaIter {
return &kafkaIter{
msgs: msgs,
errs: errs,
err: nil,
ctx: ctx,
cancel: cancel,
}
}

func (iter *kafkaIter) Next(msg *Message) bool {
if iter.err != nil {
return false
}

select {
case <-iter.ctx.Done():
iter.err = appendError(iter.err, iter.ctx.Err())
return false
case err := <-iter.errs:
iter.err = appendError(iter.err, err)
return false
case val, ok := <-iter.msgs:
if !ok {
return false
}

*msg = val
}

return true
}

func (iter *kafkaIter) Close() error {
iter.cancel()
// Read the remaining messages so that the underlying reader may
// finish and return. Otherwise the goroutine will leak.
for _ = range iter.msgs {
}
for _ = range iter.errs {
}
return iter.err
}

type kafkaReader struct {
client sarama.Client
partition int32
buffer int
topic string

maxWaitTime uint
minBytes uint32
maxBytes uint32
}

type ReaderConfig struct {
BrokerAddrs []string
// Size of the iterator channel. Setting it at 0 means the underlying consumer and the iterator are 100% in-sync.
// The consumer will only make progress as the iterator does. Setting it >0 will allow the consumer to fetch data
// potentially faster than the iterator can read.
Buffer int
Topic string
Partition uint

// Kafka requests wait for `RequestMaxWaitTime` OR `RequestMinBytes`, but
// always stops at `RequestMaxBytes`.
RequestMaxWaitTime uint
RequestMinBytes uint32
RequestMaxBytes uint32
}

func NewReader(config ReaderConfig) (Reader, error) {
conf := sarama.NewConfig()
conf.Version = sarama.V0_10_0_0

client, err := sarama.NewClient(config.BrokerAddrs, conf)
if err != nil {
return nil, err
}

return &kafkaReader{
client: client,
topic: config.Topic,
partition: int32(config.Partition),
buffer: config.Buffer,
maxWaitTime: config.RequestMaxWaitTime,
minBytes: config.RequestMinBytes,
maxBytes: config.RequestMaxBytes,
}, nil
}

// Start consuming from Kafka starting at the given `offset`. `Read` will return a sequential iterator that makes progress
// as its being read. Rewinding or needing to reset the offset will require calling `Read` again, returning a new iterator.
func (kafka *kafkaReader) Read(ctx context.Context, offset Offset) MessageIter {
messagesCh := make(chan Message, kafka.buffer)
errsCh := make(chan error, 1)

// If the iterator is closed before the context is canceled it would block
// indefinitely (to flush the msgs/errs channels). The iterator will now
// cancel the context which will propagate to the `asyncFetch` goroutine.
ctx, cancel := context.WithCancel(ctx)

go kafka.asyncFetch(ctx, offset, messagesCh, errsCh)

return newKafkaIter(ctx, cancel, messagesCh, errsCh)
}

// Asynchronously fetch blocks of messages from Kafka, sending each message to the underlying iterator. The async consumer will progress
// in-sync with the underlying's iterator's progression. If the iterator is not being consumed from or blocks, so does the async process.
//
// Offset management is up to the consumer of the iterator to implement. The offset is incremented by the async process as messages are
// being read from Kafka but does not persist the offset in any way.
func (kafka *kafkaReader) asyncFetch(ctx context.Context, offset Offset, messagesCh chan<- Message, errsCh chan<- error) {
defer close(messagesCh)
defer close(errsCh)

for {
select {
default:
break
case <-ctx.Done():
return
}

// Find the broker that is the given partition's leader. Failure to fetch the leader is either
// the result of an invalid topic/partition OR the broker/leader is unavailable. This can happen
// due to a leader election happening (and thus the leader has changed).
broker, err := kafka.client.Leader(kafka.topic, kafka.partition)
if err != nil {
errsCh <- err
continue
}

// The request will wait at most `maxWaitTime` (milliseconds) OR at most `minBytes`,
// which ever happens first.
request := sarama.FetchRequest{
MaxWaitTime: int32(kafka.maxWaitTime),
MinBytes: int32(kafka.minBytes),
}

request.AddBlock(kafka.topic, kafka.partition, int64(offset), int32(kafka.maxBytes))
res, err := broker.Fetch(&request)
if err != nil {
errsCh <- errors.Wrap(err, "kafka reader failed to fetch a block")
continue
}

block, ok := res.Blocks[kafka.topic]
if !ok {
continue
}

// The only way Kafka does _not_ return a block is if the
// partition is invalid.
partition, ok := block[kafka.partition]
if !ok {
errsCh <- fmt.Errorf("kafka partition is invalid (partition: %d)", kafka.partition)
continue
}

// Possible errors: https://godoc.org/github.com/Shopify/sarama#KError
if partition.Err != sarama.ErrNoError {
errsCh <- errors.Wrap(partition.Err, "kafka block returned an error")
continue
}

// Bump the current offset to the last offset in the message set. The new offset will
// be used the next time we fetch a block from Kafka.
//
// This doesn't commit the offset in any way, it only allows the iterator to continue to
// make progress.
msgSet := partition.MsgSet.Messages
offset = Offset(msgSet[len(msgSet)-1].Offset)

for _, msg := range msgSet {
// Give the message to the iterator. This will block if the consumer of the iterator
// is blocking or not calling `.Next(..)`. This allows the Kafka reader to stay in-sync
// with the consumer.
messagesCh <- Message{
Offset: Offset(msg.Offset),
Key: msg.Msg.Key,
Value: msg.Msg.Value,
}
}
}
}

// Shutdown the Kafka client. The Kafka reader does not persist the offset
// when closing down and thus any iterator progress will be lost.
func (kafka *kafkaReader) Close() (err error) {
return kafka.client.Close()
}
21 changes: 21 additions & 0 deletions reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package kafka

import "context"

type Offset uint64

type Message struct {
Offset Offset
Key []byte
Value []byte
}

type MessageIter interface {
Next(*Message) bool
Close() error
}

type Reader interface {
Read(context.Context, Offset) MessageIter
Close() error
}

0 comments on commit 30cd00f

Please sign in to comment.