Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't rebalance on too many consumers. Wait for topic to exist. #229

Closed

Conversation

abuchanan-nr
Copy link
Contributor

Fix for #200

I'll try to write tests for this. Putting this up since there's some discussion in #200 on the best approach.

Copy link
Contributor

@stevevls stevevls left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR! The logging changes look good, but I think the wait is in the wrong place.

@@ -827,6 +858,9 @@ func (r *Reader) handshake() error {
_ = conn.Close()
}()

// wait for topic to exist
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned in #200, I don't think this is a good place to insert the wait as it will cause the rebalance to time out if the topic doesn't exist. I think that as you test it, you should be able to produce that condition fairly easily, but LMK if you find otherwise!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hrm, I'm confused. This is being called before rebalance, and therefore before joinGroup and syncGroup. Also, joinGroup is the only place I see config.RebalanceTimeout being used. There's no rebalance/join to timeout at this point, unless I'm missing something.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a little bit subtle. In a nutshell, there are three types of participants in consumer groups: the coordinator, the leader (which is a special consumer), and the consumers. The coordinator is a Kafka broker whose responsibility is to manage group membership and coordinate rebalances.

When a rebalance occurs:

  1. All the consumers phone in to the coordinator to join the group:
    response, err := conn.joinGroup(request)
  2. Via the join group response, one of the consumers discovers that it has been appointed the leader and will be responsible for generating assignments:
    if iAmLeader := response.MemberID == response.LeaderID; iAmLeader {
  3. Upon successful join, all consumers including the leader call sync group.
    response, err := conn.syncGroups(request)
    The leader will produce assignments and include them in the sync group request. The other consumers leave that part of the request empty.
    if memberAssignments != nil {
  4. The coordinator takes the leader's assignments and returns them to the other consumers via the sync group response.

So if you put in a blocking call before generating assignments, your leader won't ever send the sync group request with the assignments. If that happens, the coordinator will eventually time out the sync, evict the leader from the group, and then run another rebalance. It's not immediately obvious from reading the code that all this is happening, but you can read up on the protocol here: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal.

As with the case of more consumers than partitions, it's not a protocol level error to request a non-existent topic...it's a perfectly valid state. It's kind of odd to me that the CG protocol doesn't provide a way to flag that as an error, but that's not for us mere mortals to debate. 😉 Anyway, that's why I was recommending to return empty assignments and then leverage the topic watcher feature that's already built in to trigger the rebalance when the topic appears. Does that make sense?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stevevls What about moving the wait for topic logic earlier?
Maybe inside of the run func?
Like here https://github.com/segmentio/kafka-go/blob/master/reader.go#L867

From my understanding there isn't any reason to move into the handshake logic if the topic doesn't exist. By moving it higher we can also reduce the amount of times we can block and reduce the overall amount of network calls made.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ShaneSaww That's an interesting idea, though I wonder how it would handle the case where you have a large group of consumers all waiting for a topic to exist. The thing is, their wait times won't be synchronized, so you will have an initial flurry of rebalances as the consumers discover the existence of the topic and join the group.

Contrast that to where the group has already gone through the handshake. In that case, only a single rebalance would be triggered. Furthermore, having gone through handshake, all the consumers will be in sync and heartebeating, so the first member to detect the existence of the topic will force the rebalance.

What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ShaneSaww @abuchanan-nr I started to get curious about how other libraries handle the case of unknown topics, so I looked at Sarama and librdkafka.

Sarama handles it like kafka-go currently does--if the topic doesn't exist, the CG leader throws an error and the rebalance goes into a failure loop. Not terribly desirable.

AFAICT, librdkafka ignores unknown topics at assignment time: https://github.com/edenhill/librdkafka/blob/5140cd9d965406a144de067b775bc9ca5255aea1/src/rdkafka_cgrp.c#L924. But it does succeed the rebalance. I would appreciate another set of eyes on that code b/c I'm not a C++ or librdkafka expert. 😄 Either way, I think we should mimic whatever librdkafka is doing.

On another note, I think we've gotten into the weeds a bit with handling the unknown topic. One thing we can consider is merging in the fix for number consumers > number of partitions and then solve the unknown topic case in a separate PR. Up to you!

Copy link
Contributor

@ShaneSaww ShaneSaww Mar 14, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stevevls I can look into the java code as well.

merging in the fix for number consumers > number of partitions and then solve the unknown topic case in a separate PR.
I like this idea.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for all the great discussion.

I think it's important to move forward here. There are some important cases mentioned in #200 that could be very common.

We can revisit to optimize rebalances for unknown topics in future work. I agree it's important, but might take awhile.

We can either use this PR as-is, or I could move the waitForTopic into run where it would have its own connection that is not associated with a group coordinator.

Thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ShaneSaww Did you get a chance to see how the java client works? I'm still of the opinion that we should take the librdkafka approach where a non-existent topic simply results in empty assignments, but I think it's most important to behave like other official libraries in case there are places where folks need to inter-operate between languages.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, from what I can tell with the java client it does something like

  • start everything as normal
  • wait for metadata refresh
  • if metdata refresh shows a topic that you care about rebalance

@@ -436,7 +439,9 @@ func (r *Reader) syncGroup(conn *Conn, memberAssignments GroupMemberAssignments)

if len(assignments.Topics) == 0 {
generation, memberID := r.membership()
return nil, fmt.Errorf("received empty assignments for group, %v as member %s for generation %d", r.config.GroupID, memberID, generation)
r.withLogger(func(l *log.Logger) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm reading this correctly, this is the only change required to address #200.

Could we make this a ReaderOption AllowEmptyAssignments? That would allow the option of preserving existing behavior for the non-existent topic case while allowing us to move forward with allowing more consumers than topics.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a few downsides to consider there:

  • It would be public, and therefore hard to get rid of.
  • I'd argue it should default to true, to avoid a risky, subtle, and possibly common situation.
  • Too much config is a bad thing, IMO.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer not to have a configuration option. Currently, the code is incorrect because there are valid use cases and configurations that should work but don't. The downside of allowing empty assignments is that perhaps someone accidentally spins up too many consumers and some number of them sit idle. Preventing things like that feels like it's outside the scope of this library.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those are good arguments for not making it an option. Consider my suggestion withdrawn.

@stevevls
Copy link
Contributor

@abuchanan-nr I'm going to re-float the idea of splitting this into two PRs. 😄 What do you think about scoping this one down to just the problem of num consumers > num partitions?

@abuchanan-nr
Copy link
Contributor Author

I'm going to re-float the idea of splitting this into two PRs.

So this PR would remove the waitForTopic stuff?

@stevevls
Copy link
Contributor

Yeah...deferring the waitForTopic functionality to another PR is an option. Everyone seems in agreement on allowing the assignments to be empty, and that fixes one immediate problem. I think there's still some discussion to be had on the correct approach for waitForTopic, so splitting the PR would allow us to fix one issue while we make progress the another.

@abuchanan-nr
Copy link
Contributor Author

Works for me.

The only thing holding me up is figuring out how to write a test for this. Only idea so far: sleep long enough for rebalances to occur, then check ReaderStats. But, it would need to sleep for a relatively long time (~30 seconds).

@stevevls
Copy link
Contributor

Yeah...that's a tough one...in part because we're trying to prove a negative (the rebalance doesn't churn). I looked through the code to see if there was any internal state of the Reader that we might be able to poke at, but nothing jumped out. Another thing that came to mind, though, was you could create a topic with a single partition, spin up a bunch of reader (say 10?), then publish messages at a fixed rate and ensure the same reader received all of them. Sounds complicated, though, and also time-dependent. The rebalance idea may be a simpler!

@abuchanan-nr
Copy link
Contributor Author

Created #242 for the more minimal fix in order to avoid nuking any comments going in this thread.

Feel free to close this PR if you want.

@stevevls
Copy link
Contributor

stevevls commented Apr 4, 2019

Closing this, but I opened #251 to make sure we eventually follow up. Thanks!

@stevevls stevevls closed this Apr 4, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants