Skip to content

Commit

Permalink
feat(writer): add logger config, use to log write errors (segmentio#137)
Browse files Browse the repository at this point in the history
  • Loading branch information
dominicbarnes authored Nov 2, 2018
1 parent 5f048b9 commit f38dc45
Showing 1 changed file with 33 additions and 0 deletions.
33 changes: 33 additions & 0 deletions writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
"log"
"math/rand"
"sort"
"sync"
Expand Down Expand Up @@ -110,6 +111,14 @@ type WriterConfig struct {
// Note that messages are allowed to overwrite the compression codec individually.
CompressionCodec

// If not nil, specifies a logger used to report internal changes within the
// writer.
Logger *log.Logger

// ErrorLogger is the logger used to report errors. If nil, the writer falls
// back to using Logger instead.
ErrorLogger *log.Logger

newPartitionWriter func(partition int, config WriterConfig, stats *writerStats) partitionWriter
}

Expand Down Expand Up @@ -511,6 +520,8 @@ type writer struct {
msgs chan writerMessage
join sync.WaitGroup
stats *writerStats
logger *log.Logger
errorLogger *log.Logger
}

func newWriter(partition int, config WriterConfig, stats *writerStats) *writer {
Expand All @@ -525,6 +536,8 @@ func newWriter(partition int, config WriterConfig, stats *writerStats) *writer {
dialer: config.Dialer,
msgs: make(chan writerMessage, config.QueueCapacity),
stats: stats,
logger: config.Logger,
errorLogger: config.ErrorLogger,
}
w.join.Add(1)
go w.run()
Expand All @@ -540,6 +553,20 @@ func (w *writer) messages() chan<- writerMessage {
return w.msgs
}

func (w *writer) withLogger(do func(*log.Logger)) {
if w.logger != nil {
do(w.logger)
}
}

func (w *writer) withErrorLogger(do func(*log.Logger)) {
if w.errorLogger != nil {
do(w.errorLogger)
} else {
w.withLogger(do)
}
}

func (w *writer) run() {
defer w.join.Done()

Expand Down Expand Up @@ -623,6 +650,9 @@ func (w *writer) write(conn *Conn, batch []Message, resch [](chan<- error)) (ret
if conn == nil {
if conn, err = w.dial(); err != nil {
w.stats.errors.observe(1)
w.withErrorLogger(func(logger *log.Logger) {
logger.Printf("error dialing kafka brokers for topic %s (partition %d): %s", w.topic, w.partition, err)
})
for i, res := range resch {
res <- &writerError{msg: batch[i], err: err}
}
Expand All @@ -635,6 +665,9 @@ func (w *writer) write(conn *Conn, batch []Message, resch [](chan<- error)) (ret

if _, err = conn.WriteMessages(batch...); err != nil {
w.stats.errors.observe(1)
w.withErrorLogger(func(logger *log.Logger) {
logger.Printf("error writing messages to %s (partition %d): %s", w.topic, w.partition, err)
})
for i, res := range resch {
res <- &writerError{msg: batch[i], err: err}
}
Expand Down

0 comments on commit f38dc45

Please sign in to comment.