You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Describe the bug
If you pass an invalid kafka.CreateTopicsRequest to kafka.(*Client).CreateTopics the function never returns, even though internally the library receives a valid response from the broker containing a valid error message.
I have debugged the issue and think I know the cause. See end of this issue for details and possible solutions.
(Caveat: I am not familiar with this library's innards, nor Kafka's protocol).
If you attempt to create an invalid topic via kafka.(*Client).CreateTopics as shown below, the function will never return:
funcexample(client*kafka.Client) error {
illegalPartitionCount:=0_, _:=client.CreateTopics(ctx, &kafka.CreateTopicsRequest{
Topics: []kafka.TopicConfig{
{
Topic: "some.topic.name",
NumPartitions: illegalPartitionCount,
ReplicationFactor: 1,
},
},
})
panic("the code will never get here")
}
Expected behavior
I would expect to receive an error value from CreateTopics containing the error from the broker describing why the request was invalid.
Additional context
I have looked through the code and I believe the issue is here due to a defer in the kafka.(*connPool).roundTrip function. There is special handling for CreateTopics that is designed to force a metadata refresh before returning to the caller, in order to ensure the topic has been created - since it is an asynchronous operation. Unfortunately, in the event that the request was invalid this defer causes it to infinitely poll for a topic that will never exist:
func (p *connPool) roundTrip(ctx context.Context, req Request) (Response, error) {
// This first select should never block after the first metadata response
// that would mark the pool as `ready`.
select {
case <-p.ready:
case <-ctx.Done():
return nil, ctx.Err()
}
var expectTopics []string
- defer func() {- if len(expectTopics) != 0 {- p.refreshMetadata(ctx, expectTopics)- }- }()
state := p.grabState()
var response promise
switch m := req.(type) {
case *meta.Request:
// We serve metadata requests directly from the transport cache.
//
// This reduces the number of round trips to kafka brokers while keeping
// the logic simple when applying partitioning strategies.
if state.err != nil {
return nil, state.err
}
return filterMetadataResponse(m, state.metadata), nil
case *createtopics.Request:
// Force an update of the metadata when adding topics,
// otherwise the cached state would get out of sync.
expectTopics = make([]string, len(m.Topics))
for i := range m.Topics {
expectTopics[i] = m.Topics[i].Name
}
case protocol.Splitter:
// Messages that implement the Splitter interface trigger the creation of
// multiple requests that are all merged back into a single results by
// a merger.
messages, merger, err := m.Split(state.layout)
if err != nil {
return nil, err
}
promises := make([]promise, len(messages))
for i, m := range messages {
promises[i] = p.sendRequest(ctx, m, state)
}
response = join(promises, messages, merger)
}
if response == nil {
response = p.sendRequest(ctx, req, state)
}
+ r, err := response.await(ctx)+ if err != nil {+ return r, err+ }++ if len(expectTopics) != 0 {+ p.refreshMetadata(ctx, expectTopics)+ }++ return r, nil- return response.await(ctx)
}
Another possible alternative:
-func (p *connPool) roundTrip(ctx context.Context, req Request) (Response, error) {+func (p *connPool) roundTrip(ctx context.Context, req Request) (r Response, err error) {
// This first select should never block after the first metadata response
// that would mark the pool as `ready`.
select {
case <-p.ready:
case <-ctx.Done():
return nil, ctx.Err()
}
var expectTopics []string
defer func() {
- if len(expectTopics) != 0 {+ if err == nil && len(expectTopics) != 0 {
p.refreshMetadata(ctx, expectTopics)
}
}()
state := p.grabState()
var response promise
switch m := req.(type) {
case *meta.Request:
// We serve metadata requests directly from the transport cache.
//
// This reduces the number of round trips to kafka brokers while keeping
// the logic simple when applying partitioning strategies.
if state.err != nil {
return nil, state.err
}
return filterMetadataResponse(m, state.metadata), nil
case *createtopics.Request:
// Force an update of the metadata when adding topics,
// otherwise the cached state would get out of sync.
expectTopics = make([]string, len(m.Topics))
for i := range m.Topics {
expectTopics[i] = m.Topics[i].Name
}
case protocol.Splitter:
// Messages that implement the Splitter interface trigger the creation of
// multiple requests that are all merged back into a single results by
// a merger.
messages, merger, err := m.Split(state.layout)
if err != nil {
return nil, err
}
promises := make([]promise, len(messages))
for i, m := range messages {
promises[i] = p.sendRequest(ctx, m, state)
}
response = join(promises, messages, merger)
}
if response == nil {
response = p.sendRequest(ctx, req, state)
}
return response.await(ctx)
}
The text was updated successfully, but these errors were encountered:
* Only refresh metadata on success (#672)
Implements changes discussed and agreed in GitHub issue.
* Second draft solution to issue 672
* Create unit test to detect issue 672
* Check for nil event in test 672
Describe the bug
If you pass an invalid
kafka.CreateTopicsRequest
tokafka.(*Client).CreateTopics
the function never returns, even though internally the library receives a valid response from the broker containing a valid error message.I have debugged the issue and think I know the cause. See end of this issue for details and possible solutions.
(Caveat: I am not familiar with this library's innards, nor Kafka's protocol).
Kafka Version
This docker image: https://hub.docker.com/r/bitnami/kafka/
Seems to be version 2.8.0 of Kafka according to its docs.
To Reproduce
If you attempt to create an invalid topic via
kafka.(*Client).CreateTopics
as shown below, the function will never return:Expected behavior
I would expect to receive an error value from
CreateTopics
containing the error from the broker describing why the request was invalid.Additional context
I have looked through the code and I believe the issue is here due to a
defer
in thekafka.(*connPool).roundTrip
function. There is special handling forCreateTopics
that is designed to force a metadata refresh before returning to the caller, in order to ensure the topic has been created - since it is an asynchronous operation. Unfortunately, in the event that the request was invalid thisdefer
causes it to infinitely poll for a topic that will never exist:Would something like this work?
Another possible alternative:
The text was updated successfully, but these errors were encountered: