Skip to content

Consumer group larger than partition count causes constant reconnection attempts  #200

Closed
@cosmopetrich

Description

@cosmopetrich

Running a group of kafka-go consumers that contains more consumers than there are partitions in a topic results in the surplus consumers continuously attempting to rejoin the group with no delay.

The library will log these three messages 20+ times per second.

INFO[0015] rebalancing consumer group, example
INFO[0015] joinGroup succeeded for response, example.  generationID=51, memberID=example@local (github.com/segmentio/kafka-go)-72f08ec6-b36e-48cd-94b9-a33e32fb82e6
INFO[0015] rebalance failed for consumer group, example: received empty assignments for group, example as member example@local (github.com/segmentio/kafka-go)-72f08ec6-b36e-48cd-94b9-a33e32fb82e6 for generation 51

Running unnecessary extra consumers isn't usually desirable but it's something that can happen. For example, a Kubernetes event might result in additional replicas of a service being run for some length of time. If nothing else the large amount of connections and log entries generated by kafka-go can cause problems for connection-tracking firewalls and aggregated logging systems.

I'm quite new to Kafka. Is having a consumer with no allocations a problem? It seems like it could safely stay idle until the next rebalance. If it is very undesirable then would some kind of delay/backoff in the run() for loop be worthwhile?

kafka-go/reader.go

Lines 822 to 833 in 62cc6a8

for {
if err := r.handshake(); err != nil {
r.withErrorLogger(func(l *log.Logger) {
l.Println(err)
})
}
select {
case <-r.stctx.Done():
return
default:
}

Activity

changed the title Consumer groups larger than partition count cause constant reconnection attempts Consumer groups larger than partition count causes constant reconnection attempts on Feb 11, 2019
changed the title Consumer groups larger than partition count causes constant reconnection attempts Consumer group larger than partition count causes constant reconnection attempts on Feb 11, 2019
self-assigned this
on Feb 15, 2019
abuchanan-nr

abuchanan-nr commented on Mar 8, 2019

@abuchanan-nr
Contributor

@stevevls we're going to need this very soon, so I'm going to take a look. Just a heads up in case you already have a bunch of work done.

stevevls

stevevls commented on Mar 8, 2019

@stevevls
Contributor

@abuchanan-nr Thanks for the heads up! I haven't dug in on this bug yet, so please feel free to send in a PR. Let me know if I can help.

abuchanan-nr

abuchanan-nr commented on Mar 8, 2019

@abuchanan-nr
Contributor

@stevevls I think this is the source of the behavior: https://github.com/segmentio/kafka-go/blob/master/reader.go#L437

Question is, what to do instead. In the case where you have too many consumers (this bug), this should be an info log, I think. Is there another case? I can't think of one yet.

I can PR that simple change, but wanted to get some careful thought from others first.

pawanrawal

pawanrawal commented on Mar 10, 2019

@pawanrawal

This also happens when the topic doesn't exist and auto-creation of topics is disabled on the server. This loop: https://github.com/segmentio/kafka-go/blob/master/reader.go#L868 should try the reconnect with a delay in such cases.

abuchanan-nr

abuchanan-nr commented on Mar 11, 2019

@abuchanan-nr
Contributor

@pawanrawal I'm glad you mentioned that. By the way, it turns out that retry loop (handshake()) is not throttled by a backoff. It probably should be.

I think I can craft a way to use UnknownTopicOrPartition to retry that specific case. The simple way to do that is to add a retry loop here: https://github.com/segmentio/kafka-go/blob/master/reader.go#L255

This is what I have so far: master...newrelic-forks:ab/rebalance-too-many

stevevls

stevevls commented on Mar 11, 2019

@stevevls
Contributor

@abuchanan-nr I agree with your assessment. It's not an error from the point of view of the consumer group protocol if there are more consumers than partitions, so we should just log a warning (since it's almost certainly an undesirable state) and then return success. That should cause the consumer to hang out idle until another CG rebalance occurs.

Can you give me a little more context on when the UnknownTopicOrPartition error comes up? It seems like we should know about it before we try to subscribe since the topic is passed in the joinGroupRequest. I would expect that we should get an error when attempting to join the group.

Re: the throttling, I think that's a good change. Thanks!

abuchanan-nr

abuchanan-nr commented on Mar 11, 2019

@abuchanan-nr
Contributor

@stevevls Re: UnknownTopicOrPartition comes up via run -> handshake -> rebalance -> joinGroup -> assignTopicPartitions -> conn.ReadPartitions. So yes, it does come from joining a group. As far as I can tell, run() is the only code to initiate that pathway, so retrying in assignTopicPartitions seems reasonable (i.e. doesn't negatively affect some other code)

Sounds like I'm on the right track, so I'll open a PR soon.

stevevls

stevevls commented on Mar 11, 2019

@stevevls
Contributor

@abuchanan-nr Great...thanks for the call graph. One thing to keep in mind is that assignTopicPartitions only happens on the consumer group leader in between calling joinGroup and syncGroup. If we put a blocking retry loop there, it could have unanticipated side effects. I suspect that if the topic doesn't come into existence soon enough, it would cause the sync operation to time out for the other consumers and result in the leader getting evicted from the group because it never checked in with assignments.

I just refreshed myself on the protocol for consumer groups, and there's no way I can see for the CG leader to pass an error response to the other members of the group. Therefore, I would suggest that we treat UnknownTopicOrPartition as a special case of no assignments. There's a config option on the reader called WatchPartitionChanges that can be enabled to watch for the topic to come into existence and trigger a rebalance. What do you think?

abuchanan-nr

abuchanan-nr commented on Mar 12, 2019

@abuchanan-nr
Contributor

@stevevls Agree that blocking joinGroup is bad. I looked into your suggested approach, but it led to hunting down lots of places to handle UnknownTopicOrPartition.

This is what I have so far: #229

I created a blocking loop that will wait for the topic to exist before starting the rebalance process (joinGroup, syncGroup, etc).

WillAbides

WillAbides commented on Mar 22, 2019

@WillAbides
Contributor

Let me chime in as another person who has run across this and would like to avoid all the rebalances. There are two more scenarios I have run across where you would want more consumers than partitions.

  1. Services that consume multiple topics. As it stands currently, you need to scale them to have the same number of instances as the number of partitions on the smallest topic.

  2. Redundant availability zones. I would like to have full capacity on multiple kubernetes clusters so that losing a cluster doesn't cause a slow down in processing messages.

3 remaining items

Loading
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions

    Consumer group larger than partition count causes constant reconnection attempts · Issue #200 · segmentio/kafka-go