-
Notifications
You must be signed in to change notification settings - Fork 801
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Don't rebalance on too many consumers. Wait for topic to exist. #229
Don't rebalance on too many consumers. Wait for topic to exist. #229
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR! The logging changes look good, but I think the wait is in the wrong place.
@@ -827,6 +858,9 @@ func (r *Reader) handshake() error { | |||
_ = conn.Close() | |||
}() | |||
|
|||
// wait for topic to exist |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As mentioned in #200, I don't think this is a good place to insert the wait as it will cause the rebalance to time out if the topic doesn't exist. I think that as you test it, you should be able to produce that condition fairly easily, but LMK if you find otherwise!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hrm, I'm confused. This is being called before rebalance, and therefore before joinGroup and syncGroup. Also, joinGroup is the only place I see config.RebalanceTimeout being used. There's no rebalance/join to timeout at this point, unless I'm missing something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a little bit subtle. In a nutshell, there are three types of participants in consumer groups: the coordinator, the leader (which is a special consumer), and the consumers. The coordinator is a Kafka broker whose responsibility is to manage group membership and coordinate rebalances.
When a rebalance occurs:
- All the consumers phone in to the coordinator to join the group:
Line 303 in d19f52f
response, err := conn.joinGroup(request) - Via the join group response, one of the consumers discovers that it has been appointed the leader and will be responsible for generating assignments:
Line 337 in d19f52f
if iAmLeader := response.MemberID == response.LeaderID; iAmLeader { - Upon successful join, all consumers including the leader call sync group.
Line 410 in d19f52f
response, err := conn.syncGroups(request) Line 369 in d19f52f
if memberAssignments != nil { - The coordinator takes the leader's assignments and returns them to the other consumers via the sync group response.
So if you put in a blocking call before generating assignments, your leader won't ever send the sync group request with the assignments. If that happens, the coordinator will eventually time out the sync, evict the leader from the group, and then run another rebalance. It's not immediately obvious from reading the code that all this is happening, but you can read up on the protocol here: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal.
As with the case of more consumers than partitions, it's not a protocol level error to request a non-existent topic...it's a perfectly valid state. It's kind of odd to me that the CG protocol doesn't provide a way to flag that as an error, but that's not for us mere mortals to debate. 😉 Anyway, that's why I was recommending to return empty assignments and then leverage the topic watcher feature that's already built in to trigger the rebalance when the topic appears. Does that make sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stevevls What about moving the wait for topic logic earlier?
Maybe inside of the run func?
Like here https://github.com/segmentio/kafka-go/blob/master/reader.go#L867
From my understanding there isn't any reason to move into the handshake logic if the topic doesn't exist. By moving it higher we can also reduce the amount of times we can block and reduce the overall amount of network calls made.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ShaneSaww That's an interesting idea, though I wonder how it would handle the case where you have a large group of consumers all waiting for a topic to exist. The thing is, their wait times won't be synchronized, so you will have an initial flurry of rebalances as the consumers discover the existence of the topic and join the group.
Contrast that to where the group has already gone through the handshake. In that case, only a single rebalance would be triggered. Furthermore, having gone through handshake, all the consumers will be in sync and heartebeating, so the first member to detect the existence of the topic will force the rebalance.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ShaneSaww @abuchanan-nr I started to get curious about how other libraries handle the case of unknown topics, so I looked at Sarama and librdkafka.
Sarama handles it like kafka-go currently does--if the topic doesn't exist, the CG leader throws an error and the rebalance goes into a failure loop. Not terribly desirable.
AFAICT, librdkafka ignores unknown topics at assignment time: https://github.com/edenhill/librdkafka/blob/5140cd9d965406a144de067b775bc9ca5255aea1/src/rdkafka_cgrp.c#L924. But it does succeed the rebalance. I would appreciate another set of eyes on that code b/c I'm not a C++ or librdkafka expert. 😄 Either way, I think we should mimic whatever librdkafka is doing.
On another note, I think we've gotten into the weeds a bit with handling the unknown topic. One thing we can consider is merging in the fix for number consumers > number of partitions and then solve the unknown topic case in a separate PR. Up to you!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stevevls I can look into the java code as well.
merging in the fix for number consumers > number of partitions and then solve the unknown topic case in a separate PR.
I like this idea.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for all the great discussion.
I think it's important to move forward here. There are some important cases mentioned in #200 that could be very common.
We can revisit to optimize rebalances for unknown topics in future work. I agree it's important, but might take awhile.
We can either use this PR as-is, or I could move the waitForTopic
into run
where it would have its own connection that is not associated with a group coordinator.
Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ShaneSaww Did you get a chance to see how the java client works? I'm still of the opinion that we should take the librdkafka approach where a non-existent topic simply results in empty assignments, but I think it's most important to behave like other official libraries in case there are places where folks need to inter-operate between languages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, from what I can tell with the java client it does something like
- start everything as normal
- wait for metadata refresh
- if metdata refresh shows a topic that you care about rebalance
@@ -436,7 +439,9 @@ func (r *Reader) syncGroup(conn *Conn, memberAssignments GroupMemberAssignments) | |||
|
|||
if len(assignments.Topics) == 0 { | |||
generation, memberID := r.membership() | |||
return nil, fmt.Errorf("received empty assignments for group, %v as member %s for generation %d", r.config.GroupID, memberID, generation) | |||
r.withLogger(func(l *log.Logger) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I'm reading this correctly, this is the only change required to address #200.
Could we make this a ReaderOption AllowEmptyAssignments
? That would allow the option of preserving existing behavior for the non-existent topic case while allowing us to move forward with allowing more consumers than topics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a few downsides to consider there:
- It would be public, and therefore hard to get rid of.
- I'd argue it should default to true, to avoid a risky, subtle, and possibly common situation.
- Too much config is a bad thing, IMO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer not to have a configuration option. Currently, the code is incorrect because there are valid use cases and configurations that should work but don't. The downside of allowing empty assignments is that perhaps someone accidentally spins up too many consumers and some number of them sit idle. Preventing things like that feels like it's outside the scope of this library.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Those are good arguments for not making it an option. Consider my suggestion withdrawn.
@abuchanan-nr I'm going to re-float the idea of splitting this into two PRs. 😄 What do you think about scoping this one down to just the problem of num consumers > num partitions? |
So this PR would remove the |
Yeah...deferring the |
Works for me. The only thing holding me up is figuring out how to write a test for this. Only idea so far: sleep long enough for rebalances to occur, then check ReaderStats. But, it would need to sleep for a relatively long time (~30 seconds). |
Yeah...that's a tough one...in part because we're trying to prove a negative (the rebalance doesn't churn). I looked through the code to see if there was any internal state of the |
Created #242 for the more minimal fix in order to avoid nuking any comments going in this thread. Feel free to close this PR if you want. |
Closing this, but I opened #251 to make sure we eventually follow up. Thanks! |
Fix for #200
I'll try to write tests for this. Putting this up since there's some discussion in #200 on the best approach.