Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka.(*Client).CreateTopics never returns when kafka responds with an error - includes possible fixes #672

Closed
omaskery opened this issue May 24, 2021 · 2 comments
Assignees
Labels

Comments

@omaskery
Copy link
Contributor

Describe the bug
If you pass an invalid kafka.CreateTopicsRequest to kafka.(*Client).CreateTopics the function never returns, even though internally the library receives a valid response from the broker containing a valid error message.

I have debugged the issue and think I know the cause. See end of this issue for details and possible solutions.
(Caveat: I am not familiar with this library's innards, nor Kafka's protocol).

Kafka Version
This docker image: https://hub.docker.com/r/bitnami/kafka/
Seems to be version 2.8.0 of Kafka according to its docs.

To Reproduce

If you attempt to create an invalid topic via kafka.(*Client).CreateTopics as shown below, the function will never return:

func example(client *kafka.Client) error {
	illegalPartitionCount := 0
	_, _ := client.CreateTopics(ctx, &kafka.CreateTopicsRequest{
		Topics: []kafka.TopicConfig{
			{
				Topic:             "some.topic.name",
				NumPartitions:     illegalPartitionCount,
				ReplicationFactor: 1,
			},
		},
	})
	panic("the code will never get here")
}

Expected behavior
I would expect to receive an error value from CreateTopics containing the error from the broker describing why the request was invalid.

Additional context
I have looked through the code and I believe the issue is here due to a defer in the kafka.(*connPool).roundTrip function. There is special handling for CreateTopics that is designed to force a metadata refresh before returning to the caller, in order to ensure the topic has been created - since it is an asynchronous operation. Unfortunately, in the event that the request was invalid this defer causes it to infinitely poll for a topic that will never exist:

Would something like this work?

 func (p *connPool) roundTrip(ctx context.Context, req Request) (Response, error) {
 	// This first select should never block after the first metadata response
 	// that would mark the pool as `ready`.
 	select {
 	case <-p.ready:
 	case <-ctx.Done():
 		return nil, ctx.Err()
 	}
 
 	var expectTopics []string
-	defer func() {
-		if len(expectTopics) != 0 {
-			p.refreshMetadata(ctx, expectTopics)
-		}
-	}()
 
 	state := p.grabState()
 	var response promise
 
 	switch m := req.(type) {
 	case *meta.Request:
 		// We serve metadata requests directly from the transport cache.
 		//
 		// This reduces the number of round trips to kafka brokers while keeping
 		// the logic simple when applying partitioning strategies.
 		if state.err != nil {
 			return nil, state.err
 		}
 		return filterMetadataResponse(m, state.metadata), nil
 
 	case *createtopics.Request:
 		// Force an update of the metadata when adding topics,
 		// otherwise the cached state would get out of sync.
 		expectTopics = make([]string, len(m.Topics))
 		for i := range m.Topics {
 			expectTopics[i] = m.Topics[i].Name
 		}
 
 	case protocol.Splitter:
 		// Messages that implement the Splitter interface trigger the creation of
 		// multiple requests that are all merged back into a single results by
 		// a merger.
 		messages, merger, err := m.Split(state.layout)
 		if err != nil {
 			return nil, err
 		}
 		promises := make([]promise, len(messages))
 		for i, m := range messages {
 			promises[i] = p.sendRequest(ctx, m, state)
 		}
 		response = join(promises, messages, merger)
 	}
 
 	if response == nil {
 		response = p.sendRequest(ctx, req, state)
 	}
 
+	r, err := response.await(ctx)
+	if err != nil {
+		return r, err
+	}
+	
+	if len(expectTopics) != 0 {
+		p.refreshMetadata(ctx, expectTopics)
+	}
+	
+	return r, nil
-	return response.await(ctx)
 }

Another possible alternative:

-func (p *connPool) roundTrip(ctx context.Context, req Request) (Response, error) {
+func (p *connPool) roundTrip(ctx context.Context, req Request) (r Response, err error) {
 	// This first select should never block after the first metadata response
 	// that would mark the pool as `ready`.
 	select {
 	case <-p.ready:
 	case <-ctx.Done():
 		return nil, ctx.Err()
 	}
 
 	var expectTopics []string
 	defer func() {
-		if len(expectTopics) != 0 {
+		if err == nil && len(expectTopics) != 0 {
 			p.refreshMetadata(ctx, expectTopics)
 		}
 	}()
 
 	state := p.grabState()
 	var response promise
 
 	switch m := req.(type) {
 	case *meta.Request:
 		// We serve metadata requests directly from the transport cache.
 		//
 		// This reduces the number of round trips to kafka brokers while keeping
 		// the logic simple when applying partitioning strategies.
 		if state.err != nil {
 			return nil, state.err
 		}
 		return filterMetadataResponse(m, state.metadata), nil
 
 	case *createtopics.Request:
 		// Force an update of the metadata when adding topics,
 		// otherwise the cached state would get out of sync.
 		expectTopics = make([]string, len(m.Topics))
 		for i := range m.Topics {
 			expectTopics[i] = m.Topics[i].Name
 		}
 
 	case protocol.Splitter:
 		// Messages that implement the Splitter interface trigger the creation of
 		// multiple requests that are all merged back into a single results by
 		// a merger.
 		messages, merger, err := m.Split(state.layout)
 		if err != nil {
 			return nil, err
 		}
 		promises := make([]promise, len(messages))
 		for i, m := range messages {
 			promises[i] = p.sendRequest(ctx, m, state)
 		}
 		response = join(promises, messages, merger)
 	}
 
 	if response == nil {
 		response = p.sendRequest(ctx, req, state)
 	}
 
 	return response.await(ctx)
 }
@omaskery omaskery added the bug label May 24, 2021
@achille-roussel
Copy link
Contributor

Hello @omaskery, thanks for reporting the issue!

Your first suggestion sounds great, would you be able to open a pull request to submit the fix?

@achille-roussel achille-roussel self-assigned this May 28, 2021
@omaskery
Copy link
Contributor Author

Sure, will do! What I'll do is dumbly make the change,open the PR, then I'll ask for guidance on how to perform any necessary testing.

omaskery added a commit to omaskery/kafka-go that referenced this issue May 29, 2021
Implements changes discussed and agreed in GitHub issue.
achille-roussel pushed a commit that referenced this issue Jul 2, 2021
* Only refresh metadata on success (#672)

Implements changes discussed and agreed in GitHub issue.

* Second draft solution to issue 672

* Create unit test to detect issue 672

* Check for nil event in test 672
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants