Skip to content

Commit

Permalink
Only refresh metadata on success (segmentio#672) (segmentio#673)
Browse files Browse the repository at this point in the history
* Only refresh metadata on success (segmentio#672)

Implements changes discussed and agreed in GitHub issue.

* Second draft solution to issue 672

* Create unit test to detect issue 672

* Check for nil event in test 672
  • Loading branch information
omaskery authored Jul 2, 2021
1 parent f49f2cc commit 680bd71
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 16 deletions.
39 changes: 23 additions & 16 deletions transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,13 +339,6 @@ func (p *connPool) roundTrip(ctx context.Context, req Request) (Response, error)
return nil, ctx.Err()
}

var expectTopics []string
defer func() {
if len(expectTopics) != 0 {
p.refreshMetadata(ctx, expectTopics)
}
}()

state := p.grabState()
var response promise

Expand All @@ -360,14 +353,6 @@ func (p *connPool) roundTrip(ctx context.Context, req Request) (Response, error)
}
return filterMetadataResponse(m, state.metadata), nil

case *createtopics.Request:
// Force an update of the metadata when adding topics,
// otherwise the cached state would get out of sync.
expectTopics = make([]string, len(m.Topics))
for i := range m.Topics {
expectTopics[i] = m.Topics[i].Name
}

case protocol.Splitter:
// Messages that implement the Splitter interface trigger the creation of
// multiple requests that are all merged back into a single results by
Expand All @@ -387,7 +372,29 @@ func (p *connPool) roundTrip(ctx context.Context, req Request) (Response, error)
response = p.sendRequest(ctx, req, state)
}

return response.await(ctx)
r, err := response.await(ctx)
if err != nil {
return r, err
}

switch resp := r.(type) {
case *createtopics.Response:
// Force an update of the metadata when adding topics,
// otherwise the cached state would get out of sync.
topicsToRefresh := make([]string, 0, len(resp.Topics))
for _, topic := range resp.Topics {
// fixes issue 672: don't refresh topics that failed to create, it causes the library to hang indefinitely
if topic.ErrorCode != 0 {
continue
}

topicsToRefresh = append(topicsToRefresh, topic.Name)
}

p.refreshMetadata(ctx, topicsToRefresh)
}

return r, nil
}

// refreshMetadata forces an update of the cached cluster metadata, and waits
Expand Down
133 changes: 133 additions & 0 deletions transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,13 @@ package kafka
import (
"context"
"crypto/tls"
"errors"
"net"
"testing"
"time"

"github.com/segmentio/kafka-go/protocol"
"github.com/segmentio/kafka-go/protocol/createtopics"
)

func TestIssue477(t *testing.T) {
Expand Down Expand Up @@ -32,3 +37,131 @@ func TestIssue477(t *testing.T) {
t.Error("no error was reported when attempting to establish a TLS connection to a non-TLS endpoint")
}
}

func TestIssue672(t *testing.T) {
// ensure the test times out if the bug is re-introduced
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

// we'll simulate a situation with one good topic and one bad topic (bad configuration)
const brokenTopicName = "bad-topic"
const okTopicName = "good-topic"

// make the connection pool think it's immediately ready to send
ready := make(chan struct{})
close(ready)

// allow the system to wake as much as it wants
wake := make(chan event)
defer close(wake)
go func() {
for {
select {
case <-ctx.Done():
return
case e := <-wake:
if e == nil {
return
}
e.trigger()
}
}
}()

// handle requests by immediately resolving them with a create topics response,
// the "bad topic" will have an error value
requests := make(chan connRequest, 1)
defer close(requests)
go func() {
request := <-requests
request.res.resolve(&createtopics.Response{
ThrottleTimeMs: 0,
Topics: []createtopics.ResponseTopic{
{
Name: brokenTopicName,
ErrorCode: int16(InvalidPartitionNumber),
ErrorMessage: InvalidPartitionNumber.Description(),
},
{
Name: okTopicName,
NumPartitions: 1,
ReplicationFactor: 1,
},
},
})
}()

pool := &connPool{
ready: ready,
wake: wake,
conns: map[int32]*connGroup{},
}

// configure the state so it can find the good topic, but not the one that fails to create
pool.setState(connPoolState{
layout: protocol.Cluster{
Topics: map[string]protocol.Topic{
okTopicName: {
Name: okTopicName,
Partitions: map[int32]protocol.Partition{
0: {},
},
},
},
},
})

// trick the connection pool into thinking it has a valid connection to a broker
pool.conns[0] = &connGroup{
pool: pool,
broker: Broker{},
idleConns: []*conn{
{
reqs: requests,
},
},
}

// perform the round trip:
// - if the issue is presenting this will hang waiting for metadata to arrive that will
// never arrive, causing a deadline timeout.
// - if the issue is fixed this will resolve almost instantaneously
r, err := pool.roundTrip(ctx, &createtopics.Request{
Topics: []createtopics.RequestTopic{
{
Name: brokenTopicName,
NumPartitions: 0,
ReplicationFactor: 1,
},
{
Name: okTopicName,
NumPartitions: 1,
ReplicationFactor: 1,
},
},
})
// detect if the issue is presenting using the context timeout (note that checking the err return value
// isn't good enough as the original implementation didn't return the context cancellation error due to
// being run in a defer)
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
t.Fatalf("issue 672 is presenting! roundTrip should not have timed out")
}

// ancillary assertions as general house-keeping, not directly related to the issue:

// we're not expecting any errors in this test
if err != nil {
t.Fatalf("unexpected error provoking connection pool roundTrip: %v", err)
}

// we expect a response containing the errors from the broker
if r == nil {
t.Fatal("expected a non-nil response")
}

// we expect to have the create topic response with created earlier
_, ok := r.(*createtopics.Response)
if !ok {
t.Fatalf("expected a createtopics.Response but got %T", r)
}
}

0 comments on commit 680bd71

Please sign in to comment.