Description
Running a group of kafka-go consumers that contains more consumers than there are partitions in a topic results in the surplus consumers continuously attempting to rejoin the group with no delay.
The library will log these three messages 20+ times per second.
INFO[0015] rebalancing consumer group, example
INFO[0015] joinGroup succeeded for response, example. generationID=51, memberID=example@local (github.com/segmentio/kafka-go)-72f08ec6-b36e-48cd-94b9-a33e32fb82e6
INFO[0015] rebalance failed for consumer group, example: received empty assignments for group, example as member example@local (github.com/segmentio/kafka-go)-72f08ec6-b36e-48cd-94b9-a33e32fb82e6 for generation 51
Running unnecessary extra consumers isn't usually desirable but it's something that can happen. For example, a Kubernetes event might result in additional replicas of a service being run for some length of time. If nothing else the large amount of connections and log entries generated by kafka-go can cause problems for connection-tracking firewalls and aggregated logging systems.
I'm quite new to Kafka. Is having a consumer with no allocations a problem? It seems like it could safely stay idle until the next rebalance. If it is very undesirable then would some kind of delay/backoff in the run()
for loop be worthwhile?
Lines 822 to 833 in 62cc6a8
Activity
abuchanan-nr commentedon Mar 8, 2019
@stevevls we're going to need this very soon, so I'm going to take a look. Just a heads up in case you already have a bunch of work done.
stevevls commentedon Mar 8, 2019
@abuchanan-nr Thanks for the heads up! I haven't dug in on this bug yet, so please feel free to send in a PR. Let me know if I can help.
abuchanan-nr commentedon Mar 8, 2019
@stevevls I think this is the source of the behavior: https://github.com/segmentio/kafka-go/blob/master/reader.go#L437
Question is, what to do instead. In the case where you have too many consumers (this bug), this should be an info log, I think. Is there another case? I can't think of one yet.
I can PR that simple change, but wanted to get some careful thought from others first.
pawanrawal commentedon Mar 10, 2019
This also happens when the topic doesn't exist and auto-creation of topics is disabled on the server. This loop: https://github.com/segmentio/kafka-go/blob/master/reader.go#L868 should try the reconnect with a delay in such cases.
abuchanan-nr commentedon Mar 11, 2019
@pawanrawal I'm glad you mentioned that. By the way, it turns out that retry loop (
handshake()
) is not throttled by a backoff. It probably should be.I think I can craft a way to use
UnknownTopicOrPartition
to retry that specific case. The simple way to do that is to add a retry loop here: https://github.com/segmentio/kafka-go/blob/master/reader.go#L255This is what I have so far: master...newrelic-forks:ab/rebalance-too-many
stevevls commentedon Mar 11, 2019
@abuchanan-nr I agree with your assessment. It's not an error from the point of view of the consumer group protocol if there are more consumers than partitions, so we should just log a warning (since it's almost certainly an undesirable state) and then return success. That should cause the consumer to hang out idle until another CG rebalance occurs.
Can you give me a little more context on when the
UnknownTopicOrPartition
error comes up? It seems like we should know about it before we try to subscribe since the topic is passed in thejoinGroupRequest
. I would expect that we should get an error when attempting to join the group.Re: the throttling, I think that's a good change. Thanks!
abuchanan-nr commentedon Mar 11, 2019
@stevevls Re:
UnknownTopicOrPartition
comes up viarun -> handshake -> rebalance -> joinGroup -> assignTopicPartitions -> conn.ReadPartitions
. So yes, it does come from joining a group. As far as I can tell,run()
is the only code to initiate that pathway, so retrying inassignTopicPartitions
seems reasonable (i.e. doesn't negatively affect some other code)Sounds like I'm on the right track, so I'll open a PR soon.
stevevls commentedon Mar 11, 2019
@abuchanan-nr Great...thanks for the call graph. One thing to keep in mind is that
assignTopicPartitions
only happens on the consumer group leader in between callingjoinGroup
andsyncGroup
. If we put a blocking retry loop there, it could have unanticipated side effects. I suspect that if the topic doesn't come into existence soon enough, it would cause the sync operation to time out for the other consumers and result in the leader getting evicted from the group because it never checked in with assignments.I just refreshed myself on the protocol for consumer groups, and there's no way I can see for the CG leader to pass an error response to the other members of the group. Therefore, I would suggest that we treat
UnknownTopicOrPartition
as a special case of no assignments. There's a config option on the reader calledWatchPartitionChanges
that can be enabled to watch for the topic to come into existence and trigger a rebalance. What do you think?abuchanan-nr commentedon Mar 12, 2019
@stevevls Agree that blocking
joinGroup
is bad. I looked into your suggested approach, but it led to hunting down lots of places to handleUnknownTopicOrPartition
.This is what I have so far: #229
I created a blocking loop that will wait for the topic to exist before starting the rebalance process (joinGroup, syncGroup, etc).
WillAbides commentedon Mar 22, 2019
Let me chime in as another person who has run across this and would like to avoid all the rebalances. There are two more scenarios I have run across where you would want more consumers than partitions.
Services that consume multiple topics. As it stands currently, you need to scale them to have the same number of instances as the number of partitions on the smallest topic.
Redundant availability zones. I would like to have full capacity on multiple kubernetes clusters so that losing a cluster doesn't cause a slow down in processing messages.
3 remaining items