Skip to content

Commit

Permalink
add reader stats
Browse files Browse the repository at this point in the history
  • Loading branch information
Achille Roussel committed Sep 29, 2017
1 parent c6ecc9b commit 15fd5e0
Show file tree
Hide file tree
Showing 3 changed files with 382 additions and 5 deletions.
170 changes: 165 additions & 5 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"log"
"math"
"strconv"
"sync"
"time"
)
Expand All @@ -30,10 +31,14 @@ type Reader struct {
mutex sync.Mutex
join sync.WaitGroup
cancel context.CancelFunc
stop context.CancelFunc
version int64
offset int64
lag int64
closed bool

// reader stats are all made of atomic values, no need for synchronization.
stats readerStats
}

// ReaderConfig is a configuration object used to create new instances of
Expand Down Expand Up @@ -64,11 +69,59 @@ type ReaderConfig struct {
// of messages from kafka.
MaxWait time.Duration

// ReadLagInterval sets the frequency at which the reader lag is updated.
// Setting this field to a negative value disables lag reporting.
ReadLagInterval time.Duration

// If not nil, specifies a logger used to report internal changes within the
// reader.
Logger *log.Logger
}

// ReaderStats is a data structure returned by a call to Reader.Stats that exposes
// details about the behavior of the reader.
type ReaderStats struct {
Dials int64 `metric:"kafka.reader.dial.count" type:"counter"`
Fetches int64 `metric:"kafak.reader.fetch.count" type:"counter"`
Messages int64 `metric:"kafka.reader.message.count" type:"counter"`
Bytes int64 `metric:"kafka.reader.message.bytes" type:"counter"`
Rebalances int64 `metric:"kafka.reader.rebalance.count" type:"counter"`
Timeouts int64 `metric:"kafka.reader.timeout.count" type:"counter"`
Errors int64 `metric:"kafka.reader.error.count" type:"counter"`

DialTime DurationStats `metric:"kafka.reader.dial.seconds"`
ReadTime DurationStats `metric:"kafka.reader.read.seconds"`
WaitTime DurationStats `metric:"kafka.reader.wait.seconds"`

Offset int64 `metric:"kafka.reader.offset" type:"gauge"`
Lag int64 `metric:"kafka.reader.lag" type:"gauge"`
MinBytes int64 `metric:"kafka.reader.fetch_bytes.min" type:"gauge"`
MaxBytes int64 `metric:"kafka.reader.fetch_bytes.max" type:"gauge"`
MaxWait int64 `metric:"kafka.reader.fetch_wait.max" type:"gauge"`
QueueLength int64 `metric:"kafka.reader.queue.length" type:"gauge"`
QueueCapacity int64 `metric:"kafka.reader.queue.capacity" type:"gauge"`

ClientID string `tag:"client_id"`
Topic string `tag:"topic"`
Partition string `tag:"partition"`
}

type readerStats struct {
dials counter
fetches counter
messages counter
bytes counter
rebalances counter
timeouts counter
errors counter
dialTime durationStats
readTime durationStats
waitTime durationStats
offset gauge
lag gauge
partition string
}

// NewReader creates and returns a new Reader configured with config.
func NewReader(config ReaderConfig) *Reader {
if len(config.Brokers) == 0 {
Expand Down Expand Up @@ -111,16 +164,34 @@ func NewReader(config ReaderConfig) *Reader {
config.MaxWait = 10 * time.Second
}

if config.ReadLagInterval == 0 {
config.ReadLagInterval = 1 * time.Minute
}

if config.QueueCapacity == 0 {
config.QueueCapacity = 100
}

return &Reader{
ctx, stop := context.WithCancel(context.Background())

r := &Reader{
config: config,
msgs: make(chan readerMessage, config.QueueCapacity),
cancel: func() {},
stop: stop,
offset: firstOffset,
stats: readerStats{
// Generate the string representation of the partition number only
// once when the reader is created.
partition: strconv.Itoa(config.Partition),
},
}

if config.ReadLagInterval > 0 {
go r.readLag(ctx)
}

return r
}

// Config returns the reader's configuration.
Expand All @@ -137,6 +208,7 @@ func (r *Reader) Close() error {
r.mutex.Unlock()

r.cancel()
r.stop()
r.join.Wait()

if !closed {
Expand Down Expand Up @@ -303,12 +375,69 @@ func (r *Reader) SetOffset(offset int64) error {
return err
}

// Stats returns a snapshot of the reader stats since the last time the method
// was called, or since the reader was created if it is called for the first
// time.
//
// A typical use of this method is to spawn a goroutine that will periodically
// call Stats on a kafka reader and report the metrics to a stats collection
// system.
func (r *Reader) Stats() ReaderStats {
return ReaderStats{
Dials: r.stats.dials.snapshot(),
Fetches: r.stats.fetches.snapshot(),
Messages: r.stats.messages.snapshot(),
Bytes: r.stats.bytes.snapshot(),
Rebalances: r.stats.rebalances.snapshot(),
Timeouts: r.stats.timeouts.snapshot(),
Errors: r.stats.errors.snapshot(),
DialTime: r.stats.dialTime.snapshot(),
ReadTime: r.stats.readTime.snapshot(),
WaitTime: r.stats.waitTime.snapshot(),
Offset: r.stats.offset.snapshot(),
Lag: r.stats.lag.snapshot(),
MinBytes: int64(r.config.MinBytes),
MaxBytes: int64(r.config.MaxBytes),
QueueLength: int64(len(r.msgs)),
QueueCapacity: int64(cap(r.msgs)),
ClientID: r.config.Dialer.ClientID,
Topic: r.config.Topic,
Partition: r.stats.partition,
}
}

func (r *Reader) withLogger(do func(*log.Logger)) {
if r.config.Logger != nil {
do(r.config.Logger)
}
}

func (r *Reader) readLag(ctx context.Context) {
ticker := time.NewTicker(r.config.ReadLagInterval)
defer ticker.Stop()

for {
timeout, cancel := context.WithTimeout(ctx, r.config.ReadLagInterval/2)
lag, err := r.ReadLag(timeout)
cancel()

if err != nil {
r.stats.errors.observe(1)
r.withLogger(func(log *log.Logger) {
log.Printf("kafka reader failed to read lag of partition %d of %s", r.config.Partition, r.config.Topic)
})
} else {
r.stats.lag.observe(lag)
}

select {
case <-ticker.C:
case <-ctx.Done():
return
}
}
}

func (r *Reader) start() {
ctx, cancel := context.WithCancel(context.Background())

Expand All @@ -328,6 +457,7 @@ func (r *Reader) start() {
maxWait: r.config.MaxWait,
version: r.version,
msgs: r.msgs,
stats: &r.stats,
}).run(ctx, r.offset, &r.join)
}

Expand All @@ -345,6 +475,7 @@ type reader struct {
maxWait time.Duration
version int64
msgs chan<- readerMessage
stats *readerStats
}

type readerMessage struct {
Expand Down Expand Up @@ -397,6 +528,7 @@ func (r *reader) run(ctx context.Context, offset int64, join *sync.WaitGroup) {
if attempt >= 3 {
r.sendError(ctx, err)
} else {
r.stats.errors.observe(1)
r.withLogger(func(log *log.Logger) {
log.Printf("error initializing the kafka reader for partition %d of %s:", r.partition, r.topic, err)
})
Expand Down Expand Up @@ -433,13 +565,15 @@ func (r *reader) run(ctx context.Context, offset int64, join *sync.WaitGroup) {

// The next call to .initialize will re-establish a connection to the proper
// partition leader.
r.stats.rebalances.observe(1)
break readLoop
case RequestTimedOut:
// Timeout on the kafka side, this can be safely retried.
errcount = 0
r.withLogger(func(log *log.Logger) {
log.Printf("no messages received from kafka within the allocated time for partition %d of %s at offset %d", r.partition, r.topic, offset)
})
r.stats.timeouts.observe(1)
continue
case OffsetOutOfRange:
// We may be reading past the last offset, will retry later.
Expand All @@ -454,6 +588,10 @@ func (r *reader) run(ctx context.Context, offset int64, join *sync.WaitGroup) {
if _, ok := err.(Error); ok {
r.sendError(ctx, err)
} else {
r.withLogger(func(log *log.Logger) {
log.Printf("the kafka reader got an unknown error reading partition %d of %s at offset %d", r.partition, r.topic, offset)
})
r.stats.errors.observe(1)
conn.Close()
break readLoop
}
Expand All @@ -470,7 +608,13 @@ func (r *reader) initialize(ctx context.Context, offset int64) (conn *Conn, star
var first int64
var last int64

if conn, err = r.dialer.DialLeader(ctx, "tcp", broker, r.topic, r.partition); err != nil {
t0 := time.Now()
conn, err = r.dialer.DialLeader(ctx, "tcp", broker, r.topic, r.partition)
t1 := time.Now()
r.stats.dials.observe(1)
r.stats.dialTime.observe(t1.Sub(t0))

if err != nil {
continue
}

Expand Down Expand Up @@ -508,9 +652,18 @@ func (r *reader) initialize(ctx context.Context, offset int64) (conn *Conn, star
}

func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, error) {
conn.SetReadDeadline(time.Now().Add(r.maxWait))
r.stats.fetches.observe(1)
r.stats.offset.observe(offset)

t0 := time.Now()
conn.SetReadDeadline(t0.Add(r.maxWait))

batch := conn.ReadBatch(r.minBytes, r.maxBytes)
conn.SetReadDeadline(time.Now().Add(10 * time.Second))
highWaterMark := batch.HighWaterMark()

t1 := time.Now()
r.stats.waitTime.observe(t1.Sub(t0))
conn.SetReadDeadline(t1.Add(10 * time.Second))

var msg Message
var err error
Expand All @@ -521,14 +674,21 @@ func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, err
break
}

if err = r.sendMessage(ctx, msg, batch.HighWaterMark()); err != nil {
r.stats.messages.observe(1)
r.stats.bytes.observe(int64(len(msg.Key)) + int64(len(msg.Value)))

if err = r.sendMessage(ctx, msg, highWaterMark); err != nil {
err = batch.Close()
break
}

offset = msg.Offset + 1
r.stats.offset.observe(offset)
r.stats.lag.observe(highWaterMark - offset)
}

t2 := time.Now()
r.stats.readTime.observe(t2.Sub(t1))
return offset, err
}

Expand Down
69 changes: 69 additions & 0 deletions reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ func TestReader(t *testing.T) {
scenario: "calling ReadLag returns the current lag of a reader",
function: testReaderReadLag,
},

{
scenario: "calling Stats returns accurate stats about the reader",
function: testReaderStats,
},
}

for _, test := range tests {
Expand Down Expand Up @@ -153,6 +158,70 @@ func testReaderReadLag(t *testing.T, ctx context.Context, r *Reader) {
}
}

func testReaderStats(t *testing.T, ctx context.Context, r *Reader) {
const N = 10
prepareReader(t, ctx, r, makeTestSequence(N)...)

var offset int64
var bytes int64

for i := 0; i != N; i++ {
m, err := r.ReadMessage(ctx)
if err != nil {
t.Error("reading message at offset", offset, "failed:", err)
return
}
offset = m.Offset + 1
bytes += int64(len(m.Key) + len(m.Value))
}

stats := r.Stats()

// First verify that metrics with unpredictable values are not zero.
if stats.DialTime == (DurationStats{}) {
t.Error("no dial time reported by reader stats")
}
if stats.ReadTime == (DurationStats{}) {
t.Error("no read time reported by reader stats")
}
if stats.WaitTime == (DurationStats{}) {
t.Error("no wait time reported by reader stats")
}
if len(stats.Topic) == 0 {
t.Error("empty topic in reader stats")
}

// Then compare all remaining metrics.
expect := ReaderStats{
Dials: 1,
Fetches: 1,
Messages: 10,
Bytes: 10,
Rebalances: 0,
Timeouts: 0,
Errors: 0,
DialTime: stats.DialTime,
ReadTime: stats.ReadTime,
WaitTime: stats.WaitTime,
Offset: 10,
Lag: 0,
MinBytes: 1,
MaxBytes: 10000000,
MaxWait: 0,
QueueLength: 0,
QueueCapacity: 100,
ClientID: "",
Topic: stats.Topic,
Partition: "0",
}

if stats != expect {
t.Error("bad stats:")
t.Log("expected:", expect)
t.Log("found: ", stats)
}
}

func makeTestSequence(n int) []Message {
msgs := make([]Message, n)
for i := 0; i != n; i++ {
Expand Down
Loading

0 comments on commit 15fd5e0

Please sign in to comment.