Skip to content

Commit

Permalink
Refactor the synchronization of the channels. Add more docs
Browse files Browse the repository at this point in the history
  • Loading branch information
thehydroimpulse committed May 20, 2017
1 parent 1b7a6be commit 319d285
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 39 deletions.
43 changes: 34 additions & 9 deletions consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,46 @@ import (

const baseKey = "group-readers/"

// Consul reader implements consumer groups via Consul locks to ensure
// that each reader can acquire an exclusive partition. This doesn't spread partitions
// among readers but simply assigns partitions to readers. If there are 100 partitions then
// there needs to be 100 readers.
//
// The only reason this needs to implement the `Reader` interface itself is because
// the session needs to be terminated so that the acquired locks are released.
type consulReader struct {
reader Reader
client *api.Client
id int
name string
// The underlying reader
reader Reader
// Consul client
client *api.Client
// The name of the group. Allows multiple groups without clashing.
name string
// The Consul session ID
session string
}

type GroupConfig struct {
Name string
Addr string
// The group name
Name string
// The Consul address. e.g., http://localhost:8500
Addr string
// List of Kafka brokers.
Brokers []string
Topic string
// Number of partitions
// The Kafka topic the readers should consume from.
Topic string
// Number of partitions the topic has. This can be lower than the true number of partitions
// but cannot be higher.
Partitions int

// Request-level settings.
RequestMaxWaitTime time.Duration
RequestMinBytes int
RequestMaxBytes int
}

// Create a new Consul-based consumer group returning a Kafka reader that has acquired
// a partition. To read from all partitions you must call `NewGroupReader` N times for N partitions.
// Each reader will acquire an exclusive partition.
func NewGroupReader(config GroupConfig) (Reader, error) {
conf := api.DefaultConfig()
conf.Address = config.Addr
Expand All @@ -45,11 +64,13 @@ func NewGroupReader(config GroupConfig) (Reader, error) {
return nil, errors.Wrap(err, "failed to create consul client")
}

// When the session expires or is removed, delete the locks that were acquired.
entry := api.SessionEntry{
Name: "group reader: " + config.Name,
Behavior: "delete",
}

// The session id needs to be kept so that it can be deleted if the group/reader is closed.
sessionID, _, err := client.Session().Create(&entry, &api.WriteOptions{})
if err != nil {
return nil, errors.Wrap(err, "failed to create consul session")
Expand All @@ -63,7 +84,7 @@ func NewGroupReader(config GroupConfig) (Reader, error) {

partitionID, err := consulReader.acquire(config.Partitions)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "failed to acquire an exclusive lock on a partition")
}

consulReader.reader, err = NewReader(ReaderConfig{
Expand Down Expand Up @@ -106,18 +127,22 @@ func (reader *consulReader) acquire(partitions int) (int, error) {
return 0, errors.New("failed to acquire consul lock")
}

// Read the next message from the underlying reader
func (reader *consulReader) Read(ctx context.Context) (Message, error) {
return reader.reader.Read(ctx)
}

// Return the current offset from the underlying reader
func (reader *consulReader) Offset() int64 {
return reader.reader.Offset()
}

// Seek the underlying reader to a new offset
func (reader *consulReader) Seek(ctx context.Context, offset int64) (int64, error) {
return reader.reader.Seek(ctx, offset)
}

// Release any locks/keys that were acquired and close the underlying reader.
func (reader *consulReader) Close() error {
reader.client.Session().Destroy(reader.session, &api.WriteOptions{})
return reader.reader.Close()
Expand Down
5 changes: 5 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,8 @@ zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"

consul:
image: consul
ports:
- "8500:8500"
80 changes: 50 additions & 30 deletions kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,18 @@ type kafkaReader struct {
partition int32
topic string

// async task
events chan Message
errors chan error
// Channels used by the async task
events chan Message
errors chan error
// Wait until the async task has finished. This is used to ensure that the task
// won't be sending on the channels and that we can close them.
asyncWait sync.WaitGroup
mu sync.RWMutex
// Determine if an async task has been spawned
spawned bool
// Cancel the async task
cancel context.CancelFunc

// The current offset that is synchronized via atomics.
offset int64

// kafka fetch configuration
Expand All @@ -51,7 +53,7 @@ type kafkaReader struct {
maxBytes int
}

// Create a new base Kafka reader given a topic and partition.
// Create a new Kafka reader given a topic and partition.
func NewReader(config ReaderConfig) (Reader, error) {
conf := sarama.NewConfig()
conf.Version = sarama.V0_10_1_0
Expand Down Expand Up @@ -80,14 +82,19 @@ func NewReader(config ReaderConfig) (Reader, error) {
}

return &kafkaReader{
events: make(chan Message),
errors: make(chan error),
spawned: false,
client: client,
offset: 0,
cancel: func() {},
topic: config.Topic,
partition: int32(config.Partition),
topic: config.Topic,
partition: int32(config.Partition),
client: client,
offset: 0,

// async task fields
events: make(chan Message),
errors: make(chan error),
// Provide a noop context canceling function.
cancel: func() {},
spawned: false,

// Request-level parameters
maxWaitTime: config.RequestMaxWaitTime,
minBytes: config.RequestMinBytes,
maxBytes: config.RequestMaxBytes,
Expand All @@ -98,9 +105,6 @@ func NewReader(config ReaderConfig) (Reader, error) {
// `Read` will block until a message is ready to be read. The asynchronous task
// will also block until `Read` is called.
func (kafka *kafkaReader) Read(ctx context.Context) (Message, error) {
kafka.mu.RLock()
defer kafka.mu.RUnlock()

select {
case <-ctx.Done():
return Message{}, ctx.Err()
Expand All @@ -116,25 +120,27 @@ func (kafka *kafkaReader) Read(ctx context.Context) (Message, error) {
func (kafka *kafkaReader) closeAsync() {
kafka.cancel()

// Reference the channels locally to avoid having to synchronize within
// the goroutine.
events := kafka.events
errors := kafka.errors

// Avoid blocking the async goroutine by emptying the channels.
go func() {
kafka.mu.RLock()
defer kafka.mu.RUnlock()

for _ = range kafka.events {
for _ = range events {
}
for _ = range kafka.errors {
for _ = range errors {
}
}()

// Wait for the async task to finish canceling. The channels cannot be closed
// until the task has returned otherwise it may panic.
kafka.asyncWait.Wait()

close(kafka.events)
close(kafka.errors)
close(events)
close(errors)

kafka.mu.Lock()
defer kafka.mu.Unlock()
// In-case `Seek` is called again and we need to these channels
// Re-establish the channels for future uses.
kafka.events = make(chan Message)
kafka.errors = make(chan error)
}
Expand All @@ -150,6 +156,14 @@ func (kafka *kafkaReader) closeAsync() {
//
// Calling `Seek` again will cancel the previous task and create a new one.
func (kafka *kafkaReader) Seek(ctx context.Context, offset int64) (int64, error) {
select {
case <-ctx.Done():
return 0, ctx.Err()
default:
}

// Closing the async task before another Seek operation will avoid the channels to have partial data
// from a previous Seek operation.
if kafka.spawned {
kafka.closeAsync()
}
Expand All @@ -171,9 +185,6 @@ func (kafka *kafkaReader) Seek(ctx context.Context, offset int64) (int64, error)

kafka.asyncWait.Add(1)

kafka.mu.RLock()
defer kafka.mu.RUnlock()

go kafka.fetchMessagesAsync(asyncCtx, kafka.events, kafka.errors)
kafka.spawned = true

Expand Down Expand Up @@ -286,11 +297,20 @@ func (kafka *kafkaReader) fetchMessagesAsync(ctx context.Context, eventsCh chan<
continue
}

// Kafka returns messages from a disk and may end up returning partial messages. The consumers need to prune them
// and re-request them, if needed.
if partition.MsgSet.PartialTrailingMessage {
// Remove the trailing message. The next offset will fetch it in-full.
msgSet = msgSet[:len(msgSet)-1]
}

// Bump the current offset to the last offset in the message set. The new offset will
// be used the next time we fetch a block from Kafka.
//
// This doesn't commit the offset in any way, it only allows the iterator to continue to
// make progress.
//
// If there was a trailing message this next offset will start there.
offset = msgSet[len(msgSet)-1].Offset + 1
atomic.StoreInt64(&kafka.offset, offset)

Expand All @@ -307,7 +327,7 @@ func (kafka *kafkaReader) fetchMessagesAsync(ctx context.Context, eventsCh chan<
}
}

// Shutdown the Kafka client.
// Shutdown the async task and the Kafka client.
func (kafka *kafkaReader) Close() (err error) {
kafka.closeAsync()
return kafka.client.Close()
Expand Down

0 comments on commit 319d285

Please sign in to comment.