diff --git a/consul.go b/consul.go index 4fcaa0b3c..27a3cddb3 100644 --- a/consul.go +++ b/consul.go @@ -11,27 +11,46 @@ import ( const baseKey = "group-readers/" +// Consul reader implements consumer groups via Consul locks to ensure +// that each reader can acquire an exclusive partition. This doesn't spread partitions +// among readers but simply assigns partitions to readers. If there are 100 partitions then +// there needs to be 100 readers. +// +// The only reason this needs to implement the `Reader` interface itself is because +// the session needs to be terminated so that the acquired locks are released. type consulReader struct { - reader Reader - client *api.Client - id int - name string + // The underlying reader + reader Reader + // Consul client + client *api.Client + // The name of the group. Allows multiple groups without clashing. + name string + // The Consul session ID session string } type GroupConfig struct { - Name string - Addr string + // The group name + Name string + // The Consul address. e.g., http://localhost:8500 + Addr string + // List of Kafka brokers. Brokers []string - Topic string - // Number of partitions + // The Kafka topic the readers should consume from. + Topic string + // Number of partitions the topic has. This can be lower than the true number of partitions + // but cannot be higher. Partitions int + // Request-level settings. RequestMaxWaitTime time.Duration RequestMinBytes int RequestMaxBytes int } +// Create a new Consul-based consumer group returning a Kafka reader that has acquired +// a partition. To read from all partitions you must call `NewGroupReader` N times for N partitions. +// Each reader will acquire an exclusive partition. func NewGroupReader(config GroupConfig) (Reader, error) { conf := api.DefaultConfig() conf.Address = config.Addr @@ -45,11 +64,13 @@ func NewGroupReader(config GroupConfig) (Reader, error) { return nil, errors.Wrap(err, "failed to create consul client") } + // When the session expires or is removed, delete the locks that were acquired. entry := api.SessionEntry{ Name: "group reader: " + config.Name, Behavior: "delete", } + // The session id needs to be kept so that it can be deleted if the group/reader is closed. sessionID, _, err := client.Session().Create(&entry, &api.WriteOptions{}) if err != nil { return nil, errors.Wrap(err, "failed to create consul session") @@ -63,7 +84,7 @@ func NewGroupReader(config GroupConfig) (Reader, error) { partitionID, err := consulReader.acquire(config.Partitions) if err != nil { - return nil, err + return nil, errors.Wrap(err, "failed to acquire an exclusive lock on a partition") } consulReader.reader, err = NewReader(ReaderConfig{ @@ -106,18 +127,22 @@ func (reader *consulReader) acquire(partitions int) (int, error) { return 0, errors.New("failed to acquire consul lock") } +// Read the next message from the underlying reader func (reader *consulReader) Read(ctx context.Context) (Message, error) { return reader.reader.Read(ctx) } +// Return the current offset from the underlying reader func (reader *consulReader) Offset() int64 { return reader.reader.Offset() } +// Seek the underlying reader to a new offset func (reader *consulReader) Seek(ctx context.Context, offset int64) (int64, error) { return reader.reader.Seek(ctx, offset) } +// Release any locks/keys that were acquired and close the underlying reader. func (reader *consulReader) Close() error { reader.client.Session().Destroy(reader.session, &api.WriteOptions{}) return reader.reader.Close() diff --git a/docker-compose.yml b/docker-compose.yml index 5751ea746..14625a83e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -15,3 +15,8 @@ zookeeper: image: wurstmeister/zookeeper ports: - "2181:2181" + +consul: + image: consul + ports: + - "8500:8500" diff --git a/kafka.go b/kafka.go index 8941ab0e1..ee5db1269 100644 --- a/kafka.go +++ b/kafka.go @@ -33,16 +33,18 @@ type kafkaReader struct { partition int32 topic string - // async task - events chan Message - errors chan error + // Channels used by the async task + events chan Message + errors chan error + // Wait until the async task has finished. This is used to ensure that the task + // won't be sending on the channels and that we can close them. asyncWait sync.WaitGroup - mu sync.RWMutex // Determine if an async task has been spawned spawned bool // Cancel the async task cancel context.CancelFunc + // The current offset that is synchronized via atomics. offset int64 // kafka fetch configuration @@ -51,7 +53,7 @@ type kafkaReader struct { maxBytes int } -// Create a new base Kafka reader given a topic and partition. +// Create a new Kafka reader given a topic and partition. func NewReader(config ReaderConfig) (Reader, error) { conf := sarama.NewConfig() conf.Version = sarama.V0_10_1_0 @@ -80,14 +82,19 @@ func NewReader(config ReaderConfig) (Reader, error) { } return &kafkaReader{ - events: make(chan Message), - errors: make(chan error), - spawned: false, - client: client, - offset: 0, - cancel: func() {}, - topic: config.Topic, - partition: int32(config.Partition), + topic: config.Topic, + partition: int32(config.Partition), + client: client, + offset: 0, + + // async task fields + events: make(chan Message), + errors: make(chan error), + // Provide a noop context canceling function. + cancel: func() {}, + spawned: false, + + // Request-level parameters maxWaitTime: config.RequestMaxWaitTime, minBytes: config.RequestMinBytes, maxBytes: config.RequestMaxBytes, @@ -98,9 +105,6 @@ func NewReader(config ReaderConfig) (Reader, error) { // `Read` will block until a message is ready to be read. The asynchronous task // will also block until `Read` is called. func (kafka *kafkaReader) Read(ctx context.Context) (Message, error) { - kafka.mu.RLock() - defer kafka.mu.RUnlock() - select { case <-ctx.Done(): return Message{}, ctx.Err() @@ -116,25 +120,27 @@ func (kafka *kafkaReader) Read(ctx context.Context) (Message, error) { func (kafka *kafkaReader) closeAsync() { kafka.cancel() + // Reference the channels locally to avoid having to synchronize within + // the goroutine. + events := kafka.events + errors := kafka.errors + // Avoid blocking the async goroutine by emptying the channels. go func() { - kafka.mu.RLock() - defer kafka.mu.RUnlock() - - for _ = range kafka.events { + for _ = range events { } - for _ = range kafka.errors { + for _ = range errors { } }() + // Wait for the async task to finish canceling. The channels cannot be closed + // until the task has returned otherwise it may panic. kafka.asyncWait.Wait() - close(kafka.events) - close(kafka.errors) + close(events) + close(errors) - kafka.mu.Lock() - defer kafka.mu.Unlock() - // In-case `Seek` is called again and we need to these channels + // Re-establish the channels for future uses. kafka.events = make(chan Message) kafka.errors = make(chan error) } @@ -150,6 +156,14 @@ func (kafka *kafkaReader) closeAsync() { // // Calling `Seek` again will cancel the previous task and create a new one. func (kafka *kafkaReader) Seek(ctx context.Context, offset int64) (int64, error) { + select { + case <-ctx.Done(): + return 0, ctx.Err() + default: + } + + // Closing the async task before another Seek operation will avoid the channels to have partial data + // from a previous Seek operation. if kafka.spawned { kafka.closeAsync() } @@ -171,9 +185,6 @@ func (kafka *kafkaReader) Seek(ctx context.Context, offset int64) (int64, error) kafka.asyncWait.Add(1) - kafka.mu.RLock() - defer kafka.mu.RUnlock() - go kafka.fetchMessagesAsync(asyncCtx, kafka.events, kafka.errors) kafka.spawned = true @@ -286,11 +297,20 @@ func (kafka *kafkaReader) fetchMessagesAsync(ctx context.Context, eventsCh chan< continue } + // Kafka returns messages from a disk and may end up returning partial messages. The consumers need to prune them + // and re-request them, if needed. + if partition.MsgSet.PartialTrailingMessage { + // Remove the trailing message. The next offset will fetch it in-full. + msgSet = msgSet[:len(msgSet)-1] + } + // Bump the current offset to the last offset in the message set. The new offset will // be used the next time we fetch a block from Kafka. // // This doesn't commit the offset in any way, it only allows the iterator to continue to // make progress. + // + // If there was a trailing message this next offset will start there. offset = msgSet[len(msgSet)-1].Offset + 1 atomic.StoreInt64(&kafka.offset, offset) @@ -307,7 +327,7 @@ func (kafka *kafkaReader) fetchMessagesAsync(ctx context.Context, eventsCh chan< } } -// Shutdown the Kafka client. +// Shutdown the async task and the Kafka client. func (kafka *kafkaReader) Close() (err error) { kafka.closeAsync() return kafka.client.Close()