package kafka
import (
"context"
"fmt"
"io"
"math/rand"
"sort"
"sync"
"time"
)
// The Writer type provides the implementation of a producer of kafka messages
// that automatically distributes messages across partitions of a single topic
// using a configurable balancing policy.
//
// Instances of Writer are safe to use concurrently from multiple goroutines.
type Writer struct {
config WriterConfig
mutex sync.RWMutex
closed bool
join sync.WaitGroup
msgs chan writerMessage
done chan struct{}
// writer stats are all made of atomic values, no need for synchronization.
// Use a pointer to ensure 64-bit alignment of the values.
stats *writerStats
}
// WriterConfig is a configuration type used to create new instances of Writer.
type WriterConfig struct {
// The list of brokers used to discover the partitions available on the
// kafka cluster.
//
// This field is required, attempting to create a writer with an empty list
// of brokers will panic.
Brokers []string
// The topic that the writer will produce messages to.
//
// This field is required, attempting to create a writer with an empty topic
// will panic.
Topic string
// The dialer used by the writer to establish connections to the kafka
// cluster.
//
// If nil, the default dialer is used instead.
Dialer *Dialer
// The balancer used to distribute messages across partitions.
//
// The default is to use a round-robin distribution.
Balancer Balancer
// Limit on how many attempts will be made to deliver a message.
//
// The default is to try at most 10 times.
MaxAttempts int
// A hint on the capacity of the writer's internal message queue.
//
// The default is to use a queue capacity of 100 messages.
QueueCapacity int
// Limit on how many messages will be buffered before being sent to a
// partition.
//
// The default is to use a target batch size of 100 messages.
BatchSize int
// Time limit on how often incomplete message batches will be flushed to
// kafka.
//
// The default is to flush at least every second.
BatchTimeout time.Duration
// Timeout for read operations performed by the Writer.
//
// Defaults to 10 seconds.
ReadTimeout time.Duration
// Timeout for write operation performed by the Writer.
//
// Defaults to 10 seconds.
WriteTimeout time.Duration
// This interval defines how often the list of partitions is refreshed from
// kafka. It allows the writer to automatically handle when new partitions
// are added to a topic.
//
// The default is to refresh partitions every 15 seconds.
RebalanceInterval time.Duration
// Number of acknowledges from partition replicas required before receiving
// a response to a produce request (default to -1, which means to wait for
// all replicas).
RequiredAcks int
// Setting this flag to true causes the WriteMessages method to never block.
// It also means that errors are ignored since the caller will not receive
// the returned value. Use this only if you don't care about guarantees of
// whether the messages were written to kafka.
Async bool
newPartitionWriter func(partition int, config WriterConfig, stats *writerStats) partitionWriter
}
// WriterStats is a data structure returned by a call to Writer.Stats that
// exposes details about the behavior of the writer.
type WriterStats struct {
Dials int64 `metric:"kafka.writer.dial.count" type:"counter"`
Writes int64 `metric:"kafka.writer.write.count" type:"counter"`
Messages int64 `metric:"kafka.writer.message.count" type:"counter"`
Bytes int64 `metric:"kafka.writer.message.bytes" type:"counter"`
Rebalances int64 `metric:"kafka.writer.rebalance.count" type:"counter"`
Errors int64 `metric:"kafka.writer.error.count" type:"counter"`
DialTime DurationStats `metric:"kafka.writer.dial.seconds"`
WriteTime DurationStats `metric:"kafka.writer.write.seconds"`
WaitTime DurationStats `metric:"kafka.writer.wait.seconds"`
Retries SummaryStats `metric:"kafka.writer.retries.count"`
BatchSize SummaryStats `metric:"kafka.writer.batch.size"`
MaxAttempts int64 `metric:"kafka.writer.attempts.max" type:"gauge"`
MaxBatchSize int64 `metric:"kafka.writer.batch.max" type:"gauge"`
BatchTimeout time.Duration `metric:"kafka.writer.batch.timeout" type:"gauge"`
ReadTimeout time.Duration `metric:"kafka.writer.read.timeout" type:"gauge"`
WriteTimeout time.Duration `metric:"kafka.writer.write.timeout" type:"gauge"`
RebalanceInterval time.Duration `metric:"kafka.writer.rebalance.interval" type:"gauge"`
RequiredAcks int64 `metric:"kafka.writer.acks.required" type:"gauge"`
Async bool `metric:"kafka.writer.async" type:"gauge"`
QueueLength int64 `metric:"kafka.writer.queue.length" type:"gauge"`
QueueCapacity int64 `metric:"kafka.writer.queue.capacity" type:"gauge"`
ClientID string `tag:"client_id"`
Topic string `tag:"topic"`
}
// writerStats is a struct that contains statistics on a writer.
//
// Since atomic is used to mutate the statistics the values must be 64-bit aligned.
// This is easily accomplished by always allocating this struct directly, (i.e. using a pointer to the struct).
// See https://golang.org/pkg/sync/atomic/#pkg-note-BUG
type writerStats struct {
dials counter
writes counter
messages counter
bytes counter
rebalances counter
errors counter
dialTime summary
writeTime summary
waitTime summary
retries summary
batchSize summary
}
// NewWriter creates and returns a new Writer configured with config.
func NewWriter(config WriterConfig) *Writer {
if len(config.Brokers) == 0 {
panic("cannot create a kafka writer with an empty list of brokers")
}
if len(config.Topic) == 0 {
panic("cannot create a kafka writer with an empty topic")
}
if config.Dialer == nil {
config.Dialer = DefaultDialer
}
if config.Balancer == nil {
config.Balancer = &RoundRobin{}
}
if config.newPartitionWriter == nil {
config.newPartitionWriter = func(partition int, config WriterConfig, stats *writerStats) partitionWriter {
return newWriter(partition, config, stats)
}
}
if config.MaxAttempts == 0 {
config.MaxAttempts = 10
}
if config.QueueCapacity == 0 {
config.QueueCapacity = 100
}
if config.BatchSize == 0 {
config.BatchSize = 100
}
if config.BatchTimeout == 0 {
config.BatchTimeout = 1 * time.Second
}
if config.ReadTimeout == 0 {
config.ReadTimeout = 10 * time.Second
}
if config.WriteTimeout == 0 {
config.WriteTimeout = 10 * time.Second
}
if config.RebalanceInterval == 0 {
config.RebalanceInterval = 15 * time.Second
}
w := &Writer{
config: config,
msgs: make(chan writerMessage, config.QueueCapacity),
done: make(chan struct{}),
stats: &writerStats{
dialTime: makeSummary(),
writeTime: makeSummary(),
waitTime: makeSummary(),
retries: makeSummary(),
},
}
w.join.Add(1)
go w.run()
return w
}
// WriteMessages writes a batch of messages to the kafka topic configured on this
// writer.
//
// Unless the writer was configured to write messages asynchronously, the method
// blocks until all messages have been written, or until the maximum number of
// attempts was reached.
//
// When the method returns an error, there's no way to know yet which messages
// have succeeded of failed.
//
// The context passed as first argument may also be used to asynchronously
// cancel the operation. Note that in this case there are no guarantees made on
// whether messages were written to kafka. The program should assume that the
// whole batch failed and re-write the messages later (which could then cause
// duplicates).
func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error {
if len(msgs) == 0 {
return nil
}
var res = make(chan error, len(msgs))
var err error
t0 := time.Now()
for attempt := 0; attempt < w.config.MaxAttempts; attempt++ {
w.mutex.RLock()
if w.closed {
w.mutex.RUnlock()
return io.ErrClosedPipe
}
for _, msg := range msgs {
select {
case w.msgs <- writerMessage{
msg: msg,
res: res,
}:
case <-ctx.Done():
w.mutex.RUnlock()
return ctx.Err()
}
}
w.mutex.RUnlock()
if w.config.Async {
break
}
var retry []Message
for i := 0; i != len(msgs); i++ {
select {
case e := <-res:
if e != nil {
if we, ok := e.(*writerError); ok {
w.stats.retries.observe(1)
retry, err = append(retry, we.msg), we.err
} else {
err = e
}
}
case <-ctx.Done():
return ctx.Err()
}
}
if msgs = retry; len(msgs) == 0 {
break
}
timer := time.NewTimer(backoff(attempt+1, 100*time.Millisecond, 1*time.Second))
select {
case <-timer.C:
// Only clear the error (so we retry the loop) if we have more retries, otherwise
// we risk silencing the error.
if attempt < w.config.MaxAttempts-1 {
err = nil
}
case <-ctx.Done():
err = ctx.Err()
case <-w.done:
err = io.ErrClosedPipe
}
timer.Stop()
if err != nil {
break
}
}
t1 := time.Now()
w.stats.writeTime.observeDuration(t1.Sub(t0))
return err
}
// Stats returns a snapshot of the writer stats since the last time the method
// was called, or since the writer was created if it is called for the first
// time.
//
// A typical use of this method is to spawn a goroutine that will periodically
// call Stats on a kafka writer and report the metrics to a stats collection
// system.
func (w *Writer) Stats() WriterStats {
return WriterStats{
Dials: w.stats.dials.snapshot(),
Writes: w.stats.writes.snapshot(),
Messages: w.stats.messages.snapshot(),
Bytes: w.stats.bytes.snapshot(),
Rebalances: w.stats.rebalances.snapshot(),
Errors: w.stats.errors.snapshot(),
DialTime: w.stats.dialTime.snapshotDuration(),
WriteTime: w.stats.writeTime.snapshotDuration(),
WaitTime: w.stats.waitTime.snapshotDuration(),
Retries: w.stats.retries.snapshot(),
BatchSize: w.stats.batchSize.snapshot(),
MaxAttempts: int64(w.config.MaxAttempts),
MaxBatchSize: int64(w.config.BatchSize),
BatchTimeout: w.config.BatchTimeout,
ReadTimeout: w.config.ReadTimeout,
WriteTimeout: w.config.WriteTimeout,
RebalanceInterval: w.config.RebalanceInterval,
RequiredAcks: int64(w.config.RequiredAcks),
Async: w.config.Async,
QueueLength: int64(len(w.msgs)),
QueueCapacity: int64(cap(w.msgs)),
ClientID: w.config.Dialer.ClientID,
Topic: w.config.Topic,
}
}
// Close flushes all buffered messages and closes the writer. The call to Close
// aborts any concurrent calls to WriteMessages, which then return with the
// io.ErrClosedPipe error.
func (w *Writer) Close() (err error) {
w.mutex.Lock()
if !w.closed {
w.closed = true
close(w.msgs)
close(w.done)
}
w.mutex.Unlock()
w.join.Wait()
return
}
func (w *Writer) run() {
defer w.join.Done()
ticker := time.NewTicker(w.config.RebalanceInterval)
defer ticker.Stop()
var rebalance = true
var writers = make(map[int]partitionWriter)
var partitions []int
var err error
for {
if rebalance {
w.stats.rebalances.observe(1)
rebalance = false
var newPartitions []int
var oldPartitions = partitions
if newPartitions, err = w.partitions(); err == nil {
for _, partition := range diffp(oldPartitions, newPartitions) {
w.close(writers[partition])
delete(writers, partition)
}
for _, partition := range diffp(newPartitions, oldPartitions) {
writers[partition] = w.open(partition)
}
partitions = newPartitions
}
}
select {
case wm, ok := <-w.msgs:
if !ok {
for _, writer := range writers {
w.close(writer)
}
return
}
if len(partitions) != 0 {
selectedPartition := w.config.Balancer.Balance(wm.msg, partitions...)
writers[selectedPartition].messages() <- wm
} else {
// No partitions were found because the topic doesn't exist.
if err == nil {
err = fmt.Errorf("failed to find any partitions for topic %s", w.config.Topic)
}
wm.res <- &writerError{msg: wm.msg, err: err}
}
case <-ticker.C:
rebalance = true
}
}
}
func (w *Writer) partitions() (partitions []int, err error) {
for _, broker := range shuffledStrings(w.config.Brokers) {
var conn *Conn
var plist []Partition
if conn, err = w.config.Dialer.Dial("tcp", broker); err != nil {
continue
}
conn.SetReadDeadline(time.Now().Add(w.config.ReadTimeout))
plist, err = conn.ReadPartitions(w.config.Topic)
conn.Close()
if err == nil {
partitions = make([]int, len(plist))
for i, p := range plist {
partitions[i] = p.ID
}
break
}
}
sort.Ints(partitions)
return
}
func (w *Writer) open(partition int) partitionWriter {
return w.config.newPartitionWriter(partition, w.config, w.stats)
}
func (w *Writer) close(writer partitionWriter) {
w.join.Add(1)
go func() {
writer.close()
w.join.Done()
}()
}
func diffp(new []int, old []int) (diff []int) {
for _, p := range new {
if i := sort.SearchInts(old, p); i == len(old) || old[i] != p {
diff = append(diff, p)
}
}
return
}
type partitionWriter interface {
messages() chan<- writerMessage
close()
}
type writer struct {
brokers []string
topic string
partition int
requiredAcks int
batchSize int
batchTimeout time.Duration
writeTimeout time.Duration
dialer *Dialer
msgs chan writerMessage
join sync.WaitGroup
stats *writerStats
}
func newWriter(partition int, config WriterConfig, stats *writerStats) *writer {
w := &writer{
brokers: config.Brokers,
topic: config.Topic,
partition: partition,
requiredAcks: config.RequiredAcks,
batchSize: config.BatchSize,
batchTimeout: config.BatchTimeout,
writeTimeout: config.WriteTimeout,
dialer: config.Dialer,
msgs: make(chan writerMessage, config.QueueCapacity),
stats: stats,
}
w.join.Add(1)
go w.run()
return w
}
func (w *writer) close() {
close(w.msgs)
w.join.Wait()
}
func (w *writer) messages() chan<- writerMessage {
return w.msgs
}
func (w *writer) run() {
defer w.join.Done()
ticker := time.NewTicker(w.batchTimeout / 10)
defer ticker.Stop()
var conn *Conn
var done bool
var batch = make([]Message, 0, w.batchSize)
var resch = make([](chan<- error), 0, w.batchSize)
var lastFlushAt = time.Now()
defer func() {
if conn != nil {
conn.Close()
}
}()
for !done {
var mustFlush bool
select {
case wm, ok := <-w.msgs:
if !ok {
done, mustFlush = true, true
} else {
batch = append(batch, wm.msg)
resch = append(resch, wm.res)
mustFlush = len(batch) >= w.batchSize
}
case now := <-ticker.C:
mustFlush = now.Sub(lastFlushAt) > w.batchTimeout
}
if mustFlush {
lastFlushAt = time.Now()
if len(batch) == 0 {
continue
}
var err error
if conn, err = w.write(conn, batch, resch); err != nil {
if conn != nil {
conn.Close()
conn = nil
}
}
for i := range batch {
batch[i] = Message{}
}
for i := range resch {
resch[i] = nil
}
batch = batch[:0]
resch = resch[:0]
}
}
}
func (w *writer) dial() (conn *Conn, err error) {
for _, broker := range shuffledStrings(w.brokers) {
t0 := time.Now()
if conn, err = w.dialer.DialLeader(context.Background(), "tcp", broker, w.topic, w.partition); err == nil {
t1 := time.Now()
w.stats.dials.observe(1)
w.stats.dialTime.observeDuration(t1.Sub(t0))
conn.SetRequiredAcks(w.requiredAcks)
break
}
}
return
}
func (w *writer) write(conn *Conn, batch []Message, resch [](chan<- error)) (ret *Conn, err error) {
w.stats.writes.observe(1)
if conn == nil {
if conn, err = w.dial(); err != nil {
w.stats.errors.observe(1)
for i, res := range resch {
res <- &writerError{msg: batch[i], err: err}
}
return
}
}
t0 := time.Now()
conn.SetWriteDeadline(time.Now().Add(w.writeTimeout))
if _, err = conn.WriteMessages(batch...); err != nil {
w.stats.errors.observe(1)
for i, res := range resch {
res <- &writerError{msg: batch[i], err: err}
}
} else {
for _, m := range batch {
w.stats.messages.observe(1)
w.stats.bytes.observe(int64(len(m.Key) + len(m.Value)))
}
for _, res := range resch {
res <- nil
}
}
t1 := time.Now()
w.stats.waitTime.observeDuration(t1.Sub(t0))
w.stats.batchSize.observe(int64(len(batch)))
ret = conn
return
}
type writerMessage struct {
msg Message
res chan<- error
}
type writerError struct {
msg Message
err error
}
func (e *writerError) Cause() error {
return e.err
}
func (e *writerError) Error() string {
return e.err.Error()
}
func (e *writerError) Temporary() bool {
return isTemporary(e.err)
}
func (e *writerError) Timeout() bool {
return isTimeout(e.err)
}
func shuffledStrings(list []string) []string {
shuffledList := make([]string, len(list))
copy(shuffledList, list)
shufflerMutex.Lock()
for i := range shuffledList {
j := shuffler.Intn(i + 1)
shuffledList[i], shuffledList[j] = shuffledList[j], shuffledList[i]
}
shufflerMutex.Unlock()
return shuffledList
}
var (
shufflerMutex = sync.Mutex{}
shuffler = rand.New(rand.NewSource(time.Now().Unix()))
)