Skip to content

Commit

Permalink
Add max idle conn timeout for Writer. (segmentio#348)
Browse files Browse the repository at this point in the history
* add max idle conn timeout

* Update writer.go

Co-Authored-By: Achille <achille.roussel@gmail.com>

* remove tests

* rename ConnMaxIdleTimeout to IdleTimeout

* Use idleConnDeadline

* remove extra white space
  • Loading branch information
sagarkrkv authored and Achille committed Sep 9, 2019
1 parent 531c50b commit 0748d0d
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 12 deletions.
16 changes: 16 additions & 0 deletions writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ type WriterConfig struct {
// The default is to refresh partitions every 15 seconds.
RebalanceInterval time.Duration

// Connections that were idle for this duration will not be reused.
//
// Defaults to 9 minutes.
IdleConnTimeout time.Duration

// Number of acknowledges from partition replicas required before receiving
// a response to a produce request (default to -1, which means to wait for
// all replicas).
Expand Down Expand Up @@ -248,6 +253,9 @@ func NewWriter(config WriterConfig) *Writer {
if config.RebalanceInterval == 0 {
config.RebalanceInterval = 15 * time.Second
}
if config.IdleConnTimeout == 0 {
config.IdleConnTimeout = 9 * time.Minute
}

w := &Writer{
config: config,
Expand Down Expand Up @@ -556,6 +564,7 @@ type writer struct {
maxMessageBytes int
batchTimeout time.Duration
writeTimeout time.Duration
idleConnTimeout time.Duration
dialer *Dialer
msgs chan writerMessage
join sync.WaitGroup
Expand All @@ -575,6 +584,7 @@ func newWriter(partition int, config WriterConfig, stats *writerStats) *writer {
maxMessageBytes: config.BatchBytes,
batchTimeout: config.BatchTimeout,
writeTimeout: config.WriteTimeout,
idleConnTimeout: config.IdleConnTimeout,
dialer: config.Dialer,
msgs: make(chan writerMessage, config.QueueCapacity),
stats: stats,
Expand Down Expand Up @@ -624,6 +634,7 @@ func (w *writer) run() {
var resch = make([](chan<- error), 0, w.batchSize)
var lastMsg writerMessage
var batchSizeBytes int
var idleConnDeadline time.Time

defer func() {
if conn != nil {
Expand Down Expand Up @@ -684,6 +695,10 @@ func (w *writer) run() {
}
batchTimerRunning = false
}
if conn != nil && time.Now().After(idleConnDeadline) {
conn.Close()
conn = nil
}
if len(batch) == 0 {
continue
}
Expand All @@ -694,6 +709,7 @@ func (w *writer) run() {
conn = nil
}
}
idleConnDeadline = time.Now().Add(w.idleConnTimeout)
for i := range batch {
batch[i] = Message{}
}
Expand Down
23 changes: 11 additions & 12 deletions writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,33 +205,32 @@ func testWriterMaxBytes(t *testing.T) {
return
}

firstMsg :=[]byte("Hello World!")
firstMsg := []byte("Hello World!")
secondMsg := []byte("LeftOver!")
msgs := []Message{
{
Value: firstMsg,
},
{
Value: secondMsg,
},

{
Value: firstMsg,
},
{
Value: secondMsg,
},
}
if err := w.WriteMessages(context.Background(),msgs...) ; err == nil {
if err := w.WriteMessages(context.Background(), msgs...); err == nil {
t.Error("expected error")
return
} else if err != nil {
switch e := err.(type) {
case MessageTooLargeError:
if string(e.Message.Value) != string(firstMsg) {
t.Errorf("unxpected returned message. Expected: %s, Got %s",firstMsg, e.Message.Value)
t.Errorf("unxpected returned message. Expected: %s, Got %s", firstMsg, e.Message.Value)
return
}
if len(e.Remaining) != 1 {
t.Error("expected remaining errors; found none")
return
}
if string(e.Remaining[0].Value) != string(secondMsg){
t.Errorf("unxpected returned message. Expected: %s, Got %s",secondMsg, e.Message.Value)
if string(e.Remaining[0].Value) != string(secondMsg) {
t.Errorf("unxpected returned message. Expected: %s, Got %s", secondMsg, e.Message.Value)
return
}
default:
Expand Down

0 comments on commit 0748d0d

Please sign in to comment.