Skip to content

Commit

Permalink
Add WriteBackoffMin/Max config to Writer (segmentio#1015)
Browse files Browse the repository at this point in the history
* Added Writer config to configure min and max delay between reties. No changes in the logic or in default values.

* Empty commit

* Update writer.go

Co-authored-by: Achille <achille@segment.com>
  • Loading branch information
halkar and Achille authored Oct 28, 2022
1 parent 029f34c commit cf40a01
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 28 deletions.
86 changes: 58 additions & 28 deletions writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,18 @@ type Writer struct {
// The default is to try at most 10 times.
MaxAttempts int

// WriteBackoffMin optionally sets the smallest amount of time the writer waits before
// it attempts to write a batch of messages
//
// Default: 100ms
WriteBackoffMin time.Duration

// WriteBackoffMax optionally sets the maximum amount of time the writer waits before
// it attempts to write a batch of messages
//
// Default: 1s
WriteBackoffMax time.Duration

// Limit on how many messages will be buffered before being sent to a
// partition.
//
Expand Down Expand Up @@ -360,13 +372,15 @@ type WriterStats struct {
BatchSize SummaryStats `metric:"kafka.writer.batch.size"`
BatchBytes SummaryStats `metric:"kafka.writer.batch.bytes"`

MaxAttempts int64 `metric:"kafka.writer.attempts.max" type:"gauge"`
MaxBatchSize int64 `metric:"kafka.writer.batch.max" type:"gauge"`
BatchTimeout time.Duration `metric:"kafka.writer.batch.timeout" type:"gauge"`
ReadTimeout time.Duration `metric:"kafka.writer.read.timeout" type:"gauge"`
WriteTimeout time.Duration `metric:"kafka.writer.write.timeout" type:"gauge"`
RequiredAcks int64 `metric:"kafka.writer.acks.required" type:"gauge"`
Async bool `metric:"kafka.writer.async" type:"gauge"`
MaxAttempts int64 `metric:"kafka.writer.attempts.max" type:"gauge"`
WriteBackoffMin time.Duration `metric:"kafka.writer.backoff.min" type:"gauge"`
WriteBackoffMax time.Duration `metric:"kafka.writer.backoff.max" type:"gauge"`
MaxBatchSize int64 `metric:"kafka.writer.batch.max" type:"gauge"`
BatchTimeout time.Duration `metric:"kafka.writer.batch.timeout" type:"gauge"`
ReadTimeout time.Duration `metric:"kafka.writer.read.timeout" type:"gauge"`
WriteTimeout time.Duration `metric:"kafka.writer.write.timeout" type:"gauge"`
RequiredAcks int64 `metric:"kafka.writer.acks.required" type:"gauge"`
Async bool `metric:"kafka.writer.async" type:"gauge"`

Topic string `tag:"topic"`

Expand Down Expand Up @@ -759,6 +773,20 @@ func (w *Writer) maxAttempts() int {
return 10
}

func (w *Writer) writeBackoffMin() time.Duration {
if w.WriteBackoffMin > 0 {
return w.WriteBackoffMin
}
return 100 * time.Millisecond
}

func (w *Writer) writeBackoffMax() time.Duration {
if w.WriteBackoffMax > 0 {
return w.WriteBackoffMax
}
return 1 * time.Second
}

func (w *Writer) batchSize() int {
if w.BatchSize > 0 {
return w.BatchSize
Expand Down Expand Up @@ -829,26 +857,28 @@ func (w *Writer) stats() *writerStats {
func (w *Writer) Stats() WriterStats {
stats := w.stats()
return WriterStats{
Dials: stats.dials.snapshot(),
Writes: stats.writes.snapshot(),
Messages: stats.messages.snapshot(),
Bytes: stats.bytes.snapshot(),
Errors: stats.errors.snapshot(),
DialTime: stats.dialTime.snapshotDuration(),
BatchTime: stats.batchTime.snapshotDuration(),
WriteTime: stats.writeTime.snapshotDuration(),
WaitTime: stats.waitTime.snapshotDuration(),
Retries: stats.retries.snapshot(),
BatchSize: stats.batchSize.snapshot(),
BatchBytes: stats.batchSizeBytes.snapshot(),
MaxAttempts: int64(w.MaxAttempts),
MaxBatchSize: int64(w.BatchSize),
BatchTimeout: w.BatchTimeout,
ReadTimeout: w.ReadTimeout,
WriteTimeout: w.WriteTimeout,
RequiredAcks: int64(w.RequiredAcks),
Async: w.Async,
Topic: w.Topic,
Dials: stats.dials.snapshot(),
Writes: stats.writes.snapshot(),
Messages: stats.messages.snapshot(),
Bytes: stats.bytes.snapshot(),
Errors: stats.errors.snapshot(),
DialTime: stats.dialTime.snapshotDuration(),
BatchTime: stats.batchTime.snapshotDuration(),
WriteTime: stats.writeTime.snapshotDuration(),
WaitTime: stats.waitTime.snapshotDuration(),
Retries: stats.retries.snapshot(),
BatchSize: stats.batchSize.snapshot(),
BatchBytes: stats.batchSizeBytes.snapshot(),
MaxAttempts: int64(w.MaxAttempts),
WriteBackoffMin: w.WriteBackoffMin,
WriteBackoffMax: w.WriteBackoffMax,
MaxBatchSize: int64(w.BatchSize),
BatchTimeout: w.BatchTimeout,
ReadTimeout: w.ReadTimeout,
WriteTimeout: w.WriteTimeout,
RequiredAcks: int64(w.RequiredAcks),
Async: w.Async,
Topic: w.Topic,
}
}

Expand Down Expand Up @@ -1066,7 +1096,7 @@ func (ptw *partitionWriter) writeBatch(batch *writeBatch) {
// guarantees to abort, but may be better to avoid long wait times
// on close.
//
delay := backoff(attempt, 100*time.Millisecond, 1*time.Second)
delay := backoff(attempt, ptw.w.writeBackoffMin(), ptw.w.writeBackoffMax())
ptw.w.withLogger(func(log Logger) {
log.Printf("backing off %s writing %d messages to %s (partition: %d)", delay, len(batch.msgs), key.topic, key.partition)
})
Expand Down
17 changes: 17 additions & 0 deletions writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,10 @@ func TestWriter(t *testing.T) {
scenario: "writing a message with SASL Plain authentication",
function: testWriterSasl,
},
{
scenario: "test default configuration values",
function: testWriterDefaults,
},
}

for _, test := range tests {
Expand Down Expand Up @@ -818,6 +822,19 @@ func testWriterSasl(t *testing.T) {
}
}

func testWriterDefaults(t *testing.T) {
w := &Writer{}
defer w.Close()

if w.writeBackoffMin() != 100*time.Millisecond {
t.Error("Incorrect default min write backoff delay")
}

if w.writeBackoffMax() != 1*time.Second {
t.Error("Incorrect default max write backoff delay")
}
}

type staticBalancer struct {
partition int
}
Expand Down

0 comments on commit cf40a01

Please sign in to comment.