Skip to content

Commit

Permalink
Add MaxAttempts to Reader (segmentio#264)
Browse files Browse the repository at this point in the history
  • Loading branch information
VictorDenisov authored and Achille committed May 3, 2019
1 parent eda7f37 commit b4a5812
Showing 1 changed file with 12 additions and 1 deletion.
13 changes: 12 additions & 1 deletion reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1001,6 +1001,11 @@ type ReaderConfig struct {
// ReadUncommitted makes all records visible. With ReadCommitted only
// non-transactional and committed records are visible.
IsolationLevel IsolationLevel

// Limit of how many attempts will be made before delivering the error.
//
// The default is to try 3 times.
MaxAttempts int
}

// Validate method validates ReaderConfig properties.
Expand Down Expand Up @@ -1179,6 +1184,10 @@ func NewReader(config ReaderConfig) *Reader {
config.QueueCapacity = 100
}

if config.MaxAttempts == 0 {
config.MaxAttempts = 3
}

// when configured as a consumer group; stats should report a partition of -1
readerStatsPartition := config.Partition
if config.GroupID != "" {
Expand Down Expand Up @@ -1659,6 +1668,7 @@ func (r *Reader) start(offsetsByPartition map[int]int64) {
msgs: r.msgs,
stats: r.stats,
isolationLevel: r.config.IsolationLevel,
maxAttempts: r.config.MaxAttempts,
}).run(ctx, offset)
}(ctx, partition, offset, &r.join)
}
Expand All @@ -1681,6 +1691,7 @@ type reader struct {
msgs chan<- readerMessage
stats *readerStats
isolationLevel IsolationLevel
maxAttempts int
}

type readerMessage struct {
Expand Down Expand Up @@ -1729,7 +1740,7 @@ func (r *reader) run(ctx context.Context, offset int64) {
// Wait 4 attempts before reporting the first errors, this helps
// mitigate situations where the kafka server is temporarily
// unavailable.
if attempt >= 3 {
if attempt >= r.maxAttempts {
r.sendError(ctx, err)
} else {
r.stats.errors.observe(1)
Expand Down

0 comments on commit b4a5812

Please sign in to comment.