Skip to content

Commit

Permalink
Fix NotLeaderForPartition error (segmentio#19)
Browse files Browse the repository at this point in the history
* Fix NotLeaderForPartition error
  • Loading branch information
thehydroimpulse authored Sep 26, 2017
1 parent 56ac8b3 commit c6ecc9b
Showing 1 changed file with 9 additions and 0 deletions.
9 changes: 9 additions & 0 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,16 @@ func (r *reader) run(ctx context.Context, offset int64, join *sync.WaitGroup) {
switch offset, err = r.read(ctx, offset, conn); err {
case nil:
errcount = 0
case NotLeaderForPartition:
r.withLogger(func(log *log.Logger) {
log.Printf("failed to read from current broker for partition %d of %s at offset %d, not the leader", r.partition, r.topic, offset)
})

conn.Close()

// The next call to .initialize will re-establish a connection to the proper
// partition leader.
break readLoop
case RequestTimedOut:
// Timeout on the kafka side, this can be safely retried.
errcount = 0
Expand Down

0 comments on commit c6ecc9b

Please sign in to comment.