Skip to content

Commit

Permalink
Skip refresh metadata for errored topic, fix segmentio#806 (segmentio…
Browse files Browse the repository at this point in the history
…#820)

* Skip refresh metadata for errored topic.

Skip refrash metadata in roundTrip about errored topic,
such as write message to a not exist topic,
kafka will response error UNKNOWN_TOPIC_OR_PARTITION to the  topic,
it causes indefinitely metadata refreshing.

Fixes segmentio#806

* Create unit test to detect issue segmentio#806
  • Loading branch information
1000Delta authored Jan 13, 2022
1 parent cdc927e commit f487e01
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 1 deletion.
13 changes: 12 additions & 1 deletion transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,18 @@ func (p *connPool) roundTrip(ctx context.Context, req Request) (Response, error)
// we didn't have that topic in our cache so we should update
// the cache.
if m.AllowAutoTopicCreation {
p.refreshMetadata(ctx, m.TopicNames)
topicsToRefresh := make([]string, 0, len(resp.Topics))
for _, topic := range resp.Topics {
// fixes issue 806: don't refresh topics that failed to create,
// it may means kafka doesn't enable auto topic creation.
// This causes the library to hang indefinitely, same as createtopics process.
if topic.ErrorCode != 0 {
continue
}

topicsToRefresh = append(topicsToRefresh, topic.Name)
}
p.refreshMetadata(ctx, topicsToRefresh)
}
}

Expand Down
139 changes: 139 additions & 0 deletions transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/segmentio/kafka-go/protocol"
"github.com/segmentio/kafka-go/protocol/createtopics"
meta "github.com/segmentio/kafka-go/protocol/metadata"
)

func TestIssue477(t *testing.T) {
Expand Down Expand Up @@ -165,3 +166,141 @@ func TestIssue672(t *testing.T) {
t.Fatalf("expected a createtopics.Response but got %T", r)
}
}

func TestIssue806(t *testing.T) {
// ensure the test times out if the bug is re-introduced
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

// simulate unknown topic want auto create with unknownTopicName,
const unknownTopicName = "unknown-topic"
const okTopicName = "good-topic"

// make the connection pool think it's immediately ready to send
ready := make(chan struct{})
close(ready)

// allow the system to wake as much as it wants
wake := make(chan event)
defer close(wake)
go func() {
for {
select {
case <-ctx.Done():
return
case e := <-wake:
if e == nil {
return
}
e.trigger()
}
}
}()

// handle requests by immediately resolving them with a create topics response,
// the "unknown topic" will have err UNKNOWN_TOPIC_OR_PARTITION
requests := make(chan connRequest, 1)
defer close(requests)
go func() {
request := <-requests
request.res.resolve(&meta.Response{
Topics: []meta.ResponseTopic{
{
Name: unknownTopicName,
ErrorCode: int16(UnknownTopicOrPartition),
},
{
Name: okTopicName,
Partitions: []meta.ResponsePartition{
{
PartitionIndex: 0,
},
},
},
},
})
}()

pool := &connPool{
ready: ready,
wake: wake,
conns: map[int32]*connGroup{},
}

// configure the state,
//
// set cached metadata only have good topic,
// so it need to request metadata,
// caused by unknown topic cannot find in cached metadata
//
// set layout only have good topic,
// so it can find the good topic, but not the one that fails to create
pool.setState(connPoolState{
metadata: &meta.Response{
Topics: []meta.ResponseTopic{
{
Name: okTopicName,
Partitions: []meta.ResponsePartition{
{
PartitionIndex: 0,
},
},
},
},
},
layout: protocol.Cluster{
Topics: map[string]protocol.Topic{
okTopicName: {
Name: okTopicName,
Partitions: map[int32]protocol.Partition{
0: {},
},
},
},
},
})

// trick the connection pool into thinking it has a valid connection to request metadata
pool.ctrl = &connGroup{
pool: pool,
broker: Broker{},
idleConns: []*conn{
{
reqs: requests,
},
},
}

// perform the round trip:
// - if the issue is presenting this will hang waiting for metadata to arrive that will
// never arrive, causing a deadline timeout.
// - if the issue is fixed this will resolve almost instantaneously
r, err := pool.roundTrip(ctx, &meta.Request{
TopicNames: []string{unknownTopicName},
AllowAutoTopicCreation: true,
})
// detect if the issue is presenting using the context timeout (note that checking the err return value
// isn't good enough as the original implementation didn't return the context cancellation error due to
// being run in a defer)
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
t.Fatalf("issue 806 is presenting! roundTrip should not have timed out")
}

// ancillary assertions as general house-keeping, not directly related to the issue:

// we're not expecting any errors in this test
if err != nil {
t.Fatalf("unexpected error provoking connection pool roundTrip: %v", err)
}

// we expect a response containing the errors from the broker
if r == nil {
t.Fatal("expected a non-nil response")
}

// we expect to have the create topic response with created earlier
_, ok := r.(*meta.Response)
if !ok {
t.Fatalf("expected a meta.Response but got %T", r)
}
}

0 comments on commit f487e01

Please sign in to comment.