From 15fd5e09fb6d0948c8d4e1d43ffa2bda409473f9 Mon Sep 17 00:00:00 2001 From: Achille Roussel Date: Fri, 29 Sep 2017 01:44:42 -0700 Subject: [PATCH] add reader stats --- reader.go | 170 +++++++++++++++++++++++++++++++++++++++++++++++-- reader_test.go | 69 ++++++++++++++++++++ stats.go | 148 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 382 insertions(+), 5 deletions(-) create mode 100644 stats.go diff --git a/reader.go b/reader.go index 8739045f0..ec67eecae 100644 --- a/reader.go +++ b/reader.go @@ -6,6 +6,7 @@ import ( "io" "log" "math" + "strconv" "sync" "time" ) @@ -30,10 +31,14 @@ type Reader struct { mutex sync.Mutex join sync.WaitGroup cancel context.CancelFunc + stop context.CancelFunc version int64 offset int64 lag int64 closed bool + + // reader stats are all made of atomic values, no need for synchronization. + stats readerStats } // ReaderConfig is a configuration object used to create new instances of @@ -64,11 +69,59 @@ type ReaderConfig struct { // of messages from kafka. MaxWait time.Duration + // ReadLagInterval sets the frequency at which the reader lag is updated. + // Setting this field to a negative value disables lag reporting. + ReadLagInterval time.Duration + // If not nil, specifies a logger used to report internal changes within the // reader. Logger *log.Logger } +// ReaderStats is a data structure returned by a call to Reader.Stats that exposes +// details about the behavior of the reader. +type ReaderStats struct { + Dials int64 `metric:"kafka.reader.dial.count" type:"counter"` + Fetches int64 `metric:"kafak.reader.fetch.count" type:"counter"` + Messages int64 `metric:"kafka.reader.message.count" type:"counter"` + Bytes int64 `metric:"kafka.reader.message.bytes" type:"counter"` + Rebalances int64 `metric:"kafka.reader.rebalance.count" type:"counter"` + Timeouts int64 `metric:"kafka.reader.timeout.count" type:"counter"` + Errors int64 `metric:"kafka.reader.error.count" type:"counter"` + + DialTime DurationStats `metric:"kafka.reader.dial.seconds"` + ReadTime DurationStats `metric:"kafka.reader.read.seconds"` + WaitTime DurationStats `metric:"kafka.reader.wait.seconds"` + + Offset int64 `metric:"kafka.reader.offset" type:"gauge"` + Lag int64 `metric:"kafka.reader.lag" type:"gauge"` + MinBytes int64 `metric:"kafka.reader.fetch_bytes.min" type:"gauge"` + MaxBytes int64 `metric:"kafka.reader.fetch_bytes.max" type:"gauge"` + MaxWait int64 `metric:"kafka.reader.fetch_wait.max" type:"gauge"` + QueueLength int64 `metric:"kafka.reader.queue.length" type:"gauge"` + QueueCapacity int64 `metric:"kafka.reader.queue.capacity" type:"gauge"` + + ClientID string `tag:"client_id"` + Topic string `tag:"topic"` + Partition string `tag:"partition"` +} + +type readerStats struct { + dials counter + fetches counter + messages counter + bytes counter + rebalances counter + timeouts counter + errors counter + dialTime durationStats + readTime durationStats + waitTime durationStats + offset gauge + lag gauge + partition string +} + // NewReader creates and returns a new Reader configured with config. func NewReader(config ReaderConfig) *Reader { if len(config.Brokers) == 0 { @@ -111,16 +164,34 @@ func NewReader(config ReaderConfig) *Reader { config.MaxWait = 10 * time.Second } + if config.ReadLagInterval == 0 { + config.ReadLagInterval = 1 * time.Minute + } + if config.QueueCapacity == 0 { config.QueueCapacity = 100 } - return &Reader{ + ctx, stop := context.WithCancel(context.Background()) + + r := &Reader{ config: config, msgs: make(chan readerMessage, config.QueueCapacity), cancel: func() {}, + stop: stop, offset: firstOffset, + stats: readerStats{ + // Generate the string representation of the partition number only + // once when the reader is created. + partition: strconv.Itoa(config.Partition), + }, } + + if config.ReadLagInterval > 0 { + go r.readLag(ctx) + } + + return r } // Config returns the reader's configuration. @@ -137,6 +208,7 @@ func (r *Reader) Close() error { r.mutex.Unlock() r.cancel() + r.stop() r.join.Wait() if !closed { @@ -303,12 +375,69 @@ func (r *Reader) SetOffset(offset int64) error { return err } +// Stats returns a snapshot of the reader stats since the last time the method +// was called, or since the reader was created if it is called for the first +// time. +// +// A typical use of this method is to spawn a goroutine that will periodically +// call Stats on a kafka reader and report the metrics to a stats collection +// system. +func (r *Reader) Stats() ReaderStats { + return ReaderStats{ + Dials: r.stats.dials.snapshot(), + Fetches: r.stats.fetches.snapshot(), + Messages: r.stats.messages.snapshot(), + Bytes: r.stats.bytes.snapshot(), + Rebalances: r.stats.rebalances.snapshot(), + Timeouts: r.stats.timeouts.snapshot(), + Errors: r.stats.errors.snapshot(), + DialTime: r.stats.dialTime.snapshot(), + ReadTime: r.stats.readTime.snapshot(), + WaitTime: r.stats.waitTime.snapshot(), + Offset: r.stats.offset.snapshot(), + Lag: r.stats.lag.snapshot(), + MinBytes: int64(r.config.MinBytes), + MaxBytes: int64(r.config.MaxBytes), + QueueLength: int64(len(r.msgs)), + QueueCapacity: int64(cap(r.msgs)), + ClientID: r.config.Dialer.ClientID, + Topic: r.config.Topic, + Partition: r.stats.partition, + } +} + func (r *Reader) withLogger(do func(*log.Logger)) { if r.config.Logger != nil { do(r.config.Logger) } } +func (r *Reader) readLag(ctx context.Context) { + ticker := time.NewTicker(r.config.ReadLagInterval) + defer ticker.Stop() + + for { + timeout, cancel := context.WithTimeout(ctx, r.config.ReadLagInterval/2) + lag, err := r.ReadLag(timeout) + cancel() + + if err != nil { + r.stats.errors.observe(1) + r.withLogger(func(log *log.Logger) { + log.Printf("kafka reader failed to read lag of partition %d of %s", r.config.Partition, r.config.Topic) + }) + } else { + r.stats.lag.observe(lag) + } + + select { + case <-ticker.C: + case <-ctx.Done(): + return + } + } +} + func (r *Reader) start() { ctx, cancel := context.WithCancel(context.Background()) @@ -328,6 +457,7 @@ func (r *Reader) start() { maxWait: r.config.MaxWait, version: r.version, msgs: r.msgs, + stats: &r.stats, }).run(ctx, r.offset, &r.join) } @@ -345,6 +475,7 @@ type reader struct { maxWait time.Duration version int64 msgs chan<- readerMessage + stats *readerStats } type readerMessage struct { @@ -397,6 +528,7 @@ func (r *reader) run(ctx context.Context, offset int64, join *sync.WaitGroup) { if attempt >= 3 { r.sendError(ctx, err) } else { + r.stats.errors.observe(1) r.withLogger(func(log *log.Logger) { log.Printf("error initializing the kafka reader for partition %d of %s:", r.partition, r.topic, err) }) @@ -433,6 +565,7 @@ func (r *reader) run(ctx context.Context, offset int64, join *sync.WaitGroup) { // The next call to .initialize will re-establish a connection to the proper // partition leader. + r.stats.rebalances.observe(1) break readLoop case RequestTimedOut: // Timeout on the kafka side, this can be safely retried. @@ -440,6 +573,7 @@ func (r *reader) run(ctx context.Context, offset int64, join *sync.WaitGroup) { r.withLogger(func(log *log.Logger) { log.Printf("no messages received from kafka within the allocated time for partition %d of %s at offset %d", r.partition, r.topic, offset) }) + r.stats.timeouts.observe(1) continue case OffsetOutOfRange: // We may be reading past the last offset, will retry later. @@ -454,6 +588,10 @@ func (r *reader) run(ctx context.Context, offset int64, join *sync.WaitGroup) { if _, ok := err.(Error); ok { r.sendError(ctx, err) } else { + r.withLogger(func(log *log.Logger) { + log.Printf("the kafka reader got an unknown error reading partition %d of %s at offset %d", r.partition, r.topic, offset) + }) + r.stats.errors.observe(1) conn.Close() break readLoop } @@ -470,7 +608,13 @@ func (r *reader) initialize(ctx context.Context, offset int64) (conn *Conn, star var first int64 var last int64 - if conn, err = r.dialer.DialLeader(ctx, "tcp", broker, r.topic, r.partition); err != nil { + t0 := time.Now() + conn, err = r.dialer.DialLeader(ctx, "tcp", broker, r.topic, r.partition) + t1 := time.Now() + r.stats.dials.observe(1) + r.stats.dialTime.observe(t1.Sub(t0)) + + if err != nil { continue } @@ -508,9 +652,18 @@ func (r *reader) initialize(ctx context.Context, offset int64) (conn *Conn, star } func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, error) { - conn.SetReadDeadline(time.Now().Add(r.maxWait)) + r.stats.fetches.observe(1) + r.stats.offset.observe(offset) + + t0 := time.Now() + conn.SetReadDeadline(t0.Add(r.maxWait)) + batch := conn.ReadBatch(r.minBytes, r.maxBytes) - conn.SetReadDeadline(time.Now().Add(10 * time.Second)) + highWaterMark := batch.HighWaterMark() + + t1 := time.Now() + r.stats.waitTime.observe(t1.Sub(t0)) + conn.SetReadDeadline(t1.Add(10 * time.Second)) var msg Message var err error @@ -521,14 +674,21 @@ func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, err break } - if err = r.sendMessage(ctx, msg, batch.HighWaterMark()); err != nil { + r.stats.messages.observe(1) + r.stats.bytes.observe(int64(len(msg.Key)) + int64(len(msg.Value))) + + if err = r.sendMessage(ctx, msg, highWaterMark); err != nil { err = batch.Close() break } offset = msg.Offset + 1 + r.stats.offset.observe(offset) + r.stats.lag.observe(highWaterMark - offset) } + t2 := time.Now() + r.stats.readTime.observe(t2.Sub(t1)) return offset, err } diff --git a/reader_test.go b/reader_test.go index f3ed65d9d..b4dde4194 100644 --- a/reader_test.go +++ b/reader_test.go @@ -40,6 +40,11 @@ func TestReader(t *testing.T) { scenario: "calling ReadLag returns the current lag of a reader", function: testReaderReadLag, }, + + { + scenario: "calling Stats returns accurate stats about the reader", + function: testReaderStats, + }, } for _, test := range tests { @@ -153,6 +158,70 @@ func testReaderReadLag(t *testing.T, ctx context.Context, r *Reader) { } } +func testReaderStats(t *testing.T, ctx context.Context, r *Reader) { + const N = 10 + prepareReader(t, ctx, r, makeTestSequence(N)...) + + var offset int64 + var bytes int64 + + for i := 0; i != N; i++ { + m, err := r.ReadMessage(ctx) + if err != nil { + t.Error("reading message at offset", offset, "failed:", err) + return + } + offset = m.Offset + 1 + bytes += int64(len(m.Key) + len(m.Value)) + } + + stats := r.Stats() + + // First verify that metrics with unpredictable values are not zero. + if stats.DialTime == (DurationStats{}) { + t.Error("no dial time reported by reader stats") + } + if stats.ReadTime == (DurationStats{}) { + t.Error("no read time reported by reader stats") + } + if stats.WaitTime == (DurationStats{}) { + t.Error("no wait time reported by reader stats") + } + if len(stats.Topic) == 0 { + t.Error("empty topic in reader stats") + } + + // Then compare all remaining metrics. + expect := ReaderStats{ + Dials: 1, + Fetches: 1, + Messages: 10, + Bytes: 10, + Rebalances: 0, + Timeouts: 0, + Errors: 0, + DialTime: stats.DialTime, + ReadTime: stats.ReadTime, + WaitTime: stats.WaitTime, + Offset: 10, + Lag: 0, + MinBytes: 1, + MaxBytes: 10000000, + MaxWait: 0, + QueueLength: 0, + QueueCapacity: 100, + ClientID: "", + Topic: stats.Topic, + Partition: "0", + } + + if stats != expect { + t.Error("bad stats:") + t.Log("expected:", expect) + t.Log("found: ", stats) + } +} + func makeTestSequence(n int) []Message { msgs := make([]Message, n) for i := 0; i != n; i++ { diff --git a/stats.go b/stats.go new file mode 100644 index 000000000..474b67228 --- /dev/null +++ b/stats.go @@ -0,0 +1,148 @@ +package kafka + +import ( + "sync/atomic" + "time" + "unsafe" +) + +// DurationStats is a data structure that carries a summary of observed duration +// values. The average, minimum, maximum, sum, and count are reported. +type DurationStats struct { + Avg time.Duration `metric:"avg" type:"gauge"` + Min time.Duration `metric:"min" type:"gauge"` + Max time.Duration `metric:"max" type:"gauge"` +} + +type durationStats struct { + min minimum + max maximum + sum counter + count counter +} + +func makeDurationStats() durationStats { + return durationStats{ + min: -1, + max: -1, + } +} + +func (d *durationStats) observe(v time.Duration) { + d.min.observe(int64(v)) + d.max.observe(int64(v)) + d.sum.observe(int64(v)) + d.count.observe(1) +} + +func (d *durationStats) snapshot() DurationStats { + min := d.min.snapshot() + max := d.max.snapshot() + sum := d.sum.snapshot() + count := d.count.snapshot() + return DurationStats{ + Avg: time.Duration(float64(sum) / float64(count)), + Min: time.Duration(min), + Max: time.Duration(max), + } +} + +// counter is an atomic incrementing counter which gets reset on snapshot. +type counter int64 + +func (c *counter) ptr() *int64 { + return (*int64)(unsafe.Pointer(c)) +} + +func (c *counter) observe(v int64) { + atomic.AddInt64(c.ptr(), v) +} + +func (c *counter) snapshot() int64 { + p := c.ptr() + v := atomic.LoadInt64(p) + atomic.AddInt64(p, -v) + return v +} + +// gauge is an atomic integer that may be set to any arbitrary value, the value +// does not change after a snapshot. +type gauge int64 + +func (g *gauge) ptr() *int64 { + return (*int64)(unsafe.Pointer(g)) +} + +func (g *gauge) observe(v int64) { + atomic.StoreInt64(g.ptr(), v) +} + +func (g *gauge) snapshot() int64 { + return atomic.LoadInt64(g.ptr()) +} + +// minimum is an atomic integral type that keeps track of the minimum of all +// values that it observed between snapshots. +type minimum int64 + +func (m *minimum) ptr() *int64 { + return (*int64)(unsafe.Pointer(m)) +} + +func (m *minimum) observe(v int64) { + for { + ptr := m.ptr() + min := atomic.LoadInt64(ptr) + + if min >= 0 && min <= v { + break + } + + if atomic.CompareAndSwapInt64(ptr, min, v) { + break + } + } +} + +func (m *minimum) snapshot() int64 { + p := m.ptr() + v := atomic.LoadInt64(p) + atomic.CompareAndSwapInt64(p, v, -1) + if v < 0 { + v = 0 + } + return v +} + +// maximum is an atomic integral type that keeps track of the maximum of all +// values that it observed between snapshots. +type maximum int64 + +func (m *maximum) ptr() *int64 { + return (*int64)(unsafe.Pointer(m)) +} + +func (m *maximum) observe(v int64) { + for { + ptr := m.ptr() + max := atomic.LoadInt64(ptr) + + if max >= 0 && max >= v { + break + } + + if atomic.CompareAndSwapInt64(ptr, max, v) { + break + } + } +} + +func (m *maximum) snapshot() int64 { + p := m.ptr() + v := atomic.LoadInt64(p) + atomic.CompareAndSwapInt64(p, v, -1) + if v < 0 { + v = 0 + } + return v +}