Skip to content

Commit

Permalink
consumer groups: ensure deadline is set on broker interactions (segme…
Browse files Browse the repository at this point in the history
…ntio#509)

Since the Conn does not have a timeout by default, it's possible for a
connection to hang if the remote side crashes before responding.  This
PR ensures that every interaction with the broker has a deadline so that
the consumer group won't get stuck if the coordinator goes down.
  • Loading branch information
Steve van Loben Sels authored Sep 14, 2020
1 parent ff55ecc commit b9217e4
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 13 deletions.
119 changes: 107 additions & 12 deletions consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ const (
// defaultPartitionWatchTime contains the amount of time the kafka-go will wait to
// query the brokers looking for partition changes.
defaultPartitionWatchTime = 5 * time.Second

// defaultTimeout is the deadline to set when interacting with the
// consumer group coordinator.
defaultTimeout = 5 * time.Second
)

// ConsumerGroupConfig is a configuration object used to create new instances of
Expand Down Expand Up @@ -143,6 +147,17 @@ type ConsumerGroupConfig struct {
// back to using Logger instead.
ErrorLogger Logger

// Timeout is the network timeout used when communicating with the consumer
// group coordinator. This value should not be too small since errors
// communicating with the broker will generally cause a consumer group
// rebalance, and it's undesirable that a transient network error intoduce
// that overhead. Similarly, it should not be too large or the consumer
// group may be slow to respond to the coordinator failing over to another
// broker.
//
// Default: 5s
Timeout time.Duration

// connect is a function for dialing the coordinator. This is provided for
// unit testing to mock broker connections.
connect func(dialer *Dialer, brokers ...string) (coordinator, error)
Expand Down Expand Up @@ -231,8 +246,12 @@ func (config *ConsumerGroupConfig) Validate() error {
return errors.New(fmt.Sprintf("StartOffset is not valid %d", config.StartOffset))
}

if config.Timeout == 0 {
config.Timeout = defaultTimeout
}

if config.connect == nil {
config.connect = connect
config.connect = makeConnect(config.Timeout)
}

return nil
Expand Down Expand Up @@ -442,7 +461,7 @@ func (g *Generation) partitionWatcher(interval time.Duration, topic string) {
ticker := time.NewTicker(interval)
defer ticker.Stop()

ops, err := g.conn.ReadPartitions(topic)
ops, err := g.conn.readPartitions(topic)
if err != nil {
g.logError(func(l Logger) {
l.Printf("Problem getting partitions during startup, %v\n, Returning and setting up nextGeneration", err)
Expand All @@ -455,7 +474,7 @@ func (g *Generation) partitionWatcher(interval time.Duration, topic string) {
case <-ctx.Done():
return
case <-ticker.C:
ops, err := g.conn.ReadPartitions(topic)
ops, err := g.conn.readPartitions(topic)
switch err {
case nil, UnknownTopicOrPartition:
if len(ops) != oParts {
Expand All @@ -480,8 +499,6 @@ func (g *Generation) partitionWatcher(interval time.Duration, topic string) {
})
}

var _ coordinator = &Conn{}

// coordinator is a subset of the functionality in Conn in order to facilitate
// testing the consumer group...especially for error conditions that are
// difficult to instigate with a live broker running in docker.
Expand All @@ -494,7 +511,78 @@ type coordinator interface {
heartbeat(heartbeatRequestV0) (heartbeatResponseV0, error)
offsetFetch(offsetFetchRequestV1) (offsetFetchResponseV1, error)
offsetCommit(offsetCommitRequestV2) (offsetCommitResponseV2, error)
ReadPartitions(...string) ([]Partition, error)
readPartitions(...string) ([]Partition, error)
}

// timeoutCoordinator wraps the Conn to ensure that every operation has a
// deadline. Otherwise, it would be possible for requests to block indefinitely
// if the remote server never responds. There are many spots where the consumer
// group needs to interact with the broker, so it feels less error prone to
// factor all of the deadline management into this shared location as opposed to
// peppering it all through where the code actually interacts with the broker.
type timeoutCoordinator struct {
timeout time.Duration
conn *Conn
}

func (t *timeoutCoordinator) Close() error {
return t.conn.Close()
}

func (t *timeoutCoordinator) findCoordinator(req findCoordinatorRequestV0) (findCoordinatorResponseV0, error) {
if err := t.conn.SetDeadline(time.Now().Add(t.timeout)); err != nil {
return findCoordinatorResponseV0{}, err
}
return t.conn.findCoordinator(req)
}

func (t *timeoutCoordinator) joinGroup(req joinGroupRequestV1) (joinGroupResponseV1, error) {
if err := t.conn.SetDeadline(time.Now().Add(t.timeout)); err != nil {
return joinGroupResponseV1{}, err
}
return t.conn.joinGroup(req)
}

func (t *timeoutCoordinator) syncGroup(req syncGroupRequestV0) (syncGroupResponseV0, error) {
if err := t.conn.SetDeadline(time.Now().Add(t.timeout)); err != nil {
return syncGroupResponseV0{}, err
}
return t.conn.syncGroup(req)
}

func (t *timeoutCoordinator) leaveGroup(req leaveGroupRequestV0) (leaveGroupResponseV0, error) {
if err := t.conn.SetDeadline(time.Now().Add(t.timeout)); err != nil {
return leaveGroupResponseV0{}, err
}
return t.conn.leaveGroup(req)
}

func (t *timeoutCoordinator) heartbeat(req heartbeatRequestV0) (heartbeatResponseV0, error) {
if err := t.conn.SetDeadline(time.Now().Add(t.timeout)); err != nil {
return heartbeatResponseV0{}, err
}
return t.conn.heartbeat(req)
}

func (t *timeoutCoordinator) offsetFetch(req offsetFetchRequestV1) (offsetFetchResponseV1, error) {
if err := t.conn.SetDeadline(time.Now().Add(t.timeout)); err != nil {
return offsetFetchResponseV1{}, err
}
return t.conn.offsetFetch(req)
}

func (t *timeoutCoordinator) offsetCommit(req offsetCommitRequestV2) (offsetCommitResponseV2, error) {
if err := t.conn.SetDeadline(time.Now().Add(t.timeout)); err != nil {
return offsetCommitResponseV2{}, err
}
return t.conn.offsetCommit(req)
}

func (t *timeoutCoordinator) readPartitions(topics ...string) ([]Partition, error) {
if err := t.conn.SetDeadline(time.Now().Add(t.timeout)); err != nil {
return nil, err
}
return t.conn.ReadPartitions(topics...)
}

// NewConsumerGroup creates a new ConsumerGroup. It returns an error if the
Expand Down Expand Up @@ -726,13 +814,20 @@ func (cg *ConsumerGroup) nextGeneration(memberID string) (string, error) {
}

// connect returns a connection to ANY broker
func connect(dialer *Dialer, brokers ...string) (conn coordinator, err error) {
for _, broker := range brokers {
if conn, err = dialer.Dial("tcp", broker); err == nil {
return
func makeConnect(timeout time.Duration) func(dialer *Dialer, brokers ...string) (coordinator, error) {
return func(dialer *Dialer, brokers ...string) (coordinator, error) {
var err error
for _, broker := range brokers {
var conn *Conn
if conn, err = dialer.Dial("tcp", broker); err == nil {
return &timeoutCoordinator{
conn: conn,
timeout: timeout,
}, nil
}
}
return nil, err // err will be non-nil
}
return // err will be non-nil
}

// coordinator establishes a connection to the coordinator for this consumer
Expand Down Expand Up @@ -868,7 +963,7 @@ func (cg *ConsumerGroup) assignTopicPartitions(conn coordinator, group joinGroup
}

topics := extractTopics(members)
partitions, err := conn.ReadPartitions(topics...)
partitions, err := conn.readPartitions(topics...)

// it's not a failure if the topic doesn't exist yet. it results in no
// assignments for the topic. this matches the behavior of the official
Expand Down
2 changes: 1 addition & 1 deletion consumergroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (c mockCoordinator) offsetCommit(req offsetCommitRequestV2) (offsetCommitRe
return c.offsetCommitFunc(req)
}

func (c mockCoordinator) ReadPartitions(topics ...string) ([]Partition, error) {
func (c mockCoordinator) readPartitions(topics ...string) ([]Partition, error) {
if c.readPartitionsFunc == nil {
return nil, errors.New("no Readpartitions behavior specified")
}
Expand Down

0 comments on commit b9217e4

Please sign in to comment.