forked from segmentio/kafka-go
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
added consumer group support to Reader (segmentio#55)
* added consumer group support to Reader * simplified ready logic around NewReader Previously, if consumer groups were enabled we waited until after rebalancing and the first subscriptions were complete before returning from NewReader. Instead, we now advance Reader.version to 1 to ensure readers are only spawned from the rebalance func * fixed typo * exposing RebalanceTimeout to ReaderConfig * fixed validation error with SessionTimeout * renamed hbLoop to heartbeatLoop for clarity * removed backoff on rebalance. this is likely to cause more harm than help. the broker naturally slows down the rate of rebalancing through join group and sync group. * pulled loop logic for run out into separate func * introduced runGroup to simplify management of rebalance goroutines * renamed variable subs to assignments for consistency * renamed newJoinGroupRequestV2 to makeJoinGroupRequestV2 to follow Go conventions when returning a value (and not a pointer) * removing unnecessary Partition definition for consumer groups * corrected docs * making var declaration style consistent * updated readme example as per recommendation * pushing withErrorLogger up from runOnce to run * embedding withErrorLogger messages into error * continuing to convert withErrorLogger to errors * moving unsubscribe call to the top of the runOnce loop * converted withErrorLogger to withLogger for semantic correctness * converting time.After to time.NewTimer as per request * added comment to clarify reasoning for size == 0 check in (*groupAssignment).readFrom * renamed funcs that returned values makeXXX rather than newXXX * extracted logic, r.config.GroupID != "" into useConsumerGroup func as per suggestion * replaced a few additional cases of r.config.GroupID == "" with !r.useConsumerGroup() * added leaveGroup for more graceful interaction with broker * cleaned up noisy error handling in heartbeat. intentionally stopped handling InvalidMemberId in heartbeat * updated docs and log messages * renamed (*Reader).broker and (*Reader).connect to connect and coordinator respectively to clarify responsibilities * leaveGroup now called on Close when using consumer groups * ReadMessage now automatically commits offsets. Introduced FetchMessage to handle explicit commits (in conjunction with CommitMessage) * refactored shape of CommitMessage to CommitMessages to accept a context and multiple messages * renamed ProtocolMetadata to GroupMetadata and updated associated docs for clarity * created type offsetStash to managing temporary cache of offsets * promoted lifecycle of offsetStash to the same as Reader (previously was one consumer group generation) * commit will now retry up to defaultCommitRetries times before failing * added test case to verify consumer group rebalancing across many partitions * resolves edge case where rebalance happens during call to (*Reader).Close
- Loading branch information
1 parent
17c1389
commit 949ee27
Showing
14 changed files
with
2,742 additions
and
50 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.