Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WriteMessages function will block the goroutine while topic not exist and client no permission to create topic #806

Closed
cridly opened this issue Dec 6, 2021 · 1 comment · Fixed by #820
Labels

Comments

@cridly
Copy link

cridly commented Dec 6, 2021

Describe the bug
When the topic does not exist and the client does not have permission to create the topic, the WriteMessages function will block indefinitely.

Kafka Version
v1.1.0

To Reproduce
Steps to reproduce the behavior. Bonus points for a code sample.

  1. Topic does not exist
  2. Client does not have permission to create topic
  3. Topics are automatically created when the client writes a message
  4. The goroutine will be blocked permanently without returning an error
  5. This problem exists in v0.4.23 and later versions

Expected behavior
WriteMessages function will not block the goroutine and get errors return while client no permission to create topic

@cridly cridly added the bug label Dec 6, 2021
@1000Delta
Copy link
Contributor

I got the same problem, I need to set WriteMessages ctx WithTimeout so that can cancel and return error TopicOrPartitionNotFound.

I review the code about WriteMessages, I see it call RoundTripper.RountTrip, and the param req set AllowAutoTopicCreation: true, this caused the block indefinitely.

kafka-go/writer.go

Lines 704 to 707 in 8dfb915

r, err := client.transport().RoundTrip(ctx, client.Addr, &metadataAPI.Request{
TopicNames: []string{topic},
AllowAutoTopicCreation: true,
})

Caused by this option, client will to refresh metadata to try to get expect topics from broker, so it call connPool.refreshMetadata:

kafka-go/transport.go

Lines 418 to 420 in 8dfb915

if m.AllowAutoTopicCreation {
p.refreshMetadata(ctx, m.TopicNames)
}

In this method, it send event to p.wake to force update metadata, After receive the notify, it will check the topics that want to create whether in updated topic list, and wait time between minBackoff to maxBackoff, until find all expect topics or ctx cancel or ctx Error not nil:

kafka-go/transport.go

Lines 436 to 474 in 8dfb915

for ctx.Err() == nil {
notify := make(event)
select {
case <-cancel:
return
case p.wake <- notify:
select {
case <-notify:
case <-cancel:
return
}
}
state := p.grabState()
found := 0
for _, topic := range expectTopics {
if _, ok := state.layout.Topics[topic]; ok {
found++
}
}
if found == len(expectTopics) {
return
}
if delay := time.Duration(rand.Int63n(int64(minBackoff))); delay > 0 {
timer := time.NewTimer(minBackoff)
select {
case <-cancel:
case <-timer.C:
}
timer.Stop()
if minBackoff *= 2; minBackoff > maxBackoff {
minBackoff = maxBackoff
}
}
}

I think kafka-go should expose the option of AllowAutoTopicCreation or automatic check the config of auto create topic from kafka broker?

1000Delta added a commit to 1000Delta/kafka-go that referenced this issue Jan 7, 2022
Skip refrash metadata in roundTrip about errored topic,
such as write message to a not exist topic,
kafka will response error UNKNOWN_TOPIC_OR_PARTITION to the  topic,
it causes indefinitely metadata refreshing.

Fixes segmentio#806
1000Delta added a commit to 1000Delta/kafka-go that referenced this issue Jan 7, 2022
Skip refrash metadata in roundTrip about errored topic,
such as write message to a not exist topic,
kafka will response error UNKNOWN_TOPIC_OR_PARTITION to the  topic,
it causes indefinitely metadata refreshing.

Fixes segmentio#806
1000Delta added a commit to 1000Delta/kafka-go that referenced this issue Jan 7, 2022
Skip refrash metadata in roundTrip about errored topic,
such as write message to a not exist topic,
kafka will response error UNKNOWN_TOPIC_OR_PARTITION to the  topic,
it causes indefinitely metadata refreshing.

Fixes segmentio#806
1000Delta added a commit to 1000Delta/kafka-go that referenced this issue Jan 7, 2022
Skip refrash metadata in roundTrip about errored topic,
such as write message to a not exist topic,
kafka will response error UNKNOWN_TOPIC_OR_PARTITION to the  topic,
it causes indefinitely metadata refreshing.

Fixes segmentio#806
1000Delta added a commit to 1000Delta/kafka-go that referenced this issue Jan 7, 2022
achille-roussel pushed a commit that referenced this issue Jan 13, 2022
* Skip refresh metadata for errored topic.

Skip refrash metadata in roundTrip about errored topic,
such as write message to a not exist topic,
kafka will response error UNKNOWN_TOPIC_OR_PARTITION to the  topic,
it causes indefinitely metadata refreshing.

Fixes #806

* Create unit test to detect issue #806
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants