diff --git a/batch.go b/batch.go index 8987438fb..78cd03b6e 100644 --- a/batch.go +++ b/batch.go @@ -17,15 +17,16 @@ import ( // // Batches are safe to use concurrently from multiple goroutines. type Batch struct { - mutex sync.Mutex - conn *Conn - lock *sync.Mutex - reader *bufio.Reader - deadline time.Time - throttle time.Duration - remain int - offset int64 - err error + mutex sync.Mutex + conn *Conn + lock *sync.Mutex + reader *bufio.Reader + deadline time.Time + throttle time.Duration + remain int + offset int64 + highWaterMark int64 + err error } // Throttle gives the throttling duration applied by the kafka server on the @@ -34,6 +35,11 @@ func (batch *Batch) Throttle() time.Duration { return batch.throttle } +// Watermark returns the current highest watermark in a partition. +func (batch *Batch) HighWaterMark() int64 { + return batch.highWaterMark +} + // Offset returns the offset of the next message in the batch. func (batch *Batch) Offset() int64 { batch.mutex.Lock() diff --git a/conn.go b/conn.go index 36fec0a62..408d12f1a 100644 --- a/conn.go +++ b/conn.go @@ -338,16 +338,17 @@ func (c *Conn) ReadBatch(minBytes int, maxBytes int) *Batch { return &Batch{err: err} } - throttle, remain, err := readFetchResponseHeader(&c.rbuf, size) + throttle, highWaterMark, remain, err := readFetchResponseHeader(&c.rbuf, size) return &Batch{ - conn: c, - reader: &c.rbuf, - deadline: adjustedDeadline, - throttle: duration(throttle), - lock: lock, - remain: remain, - offset: offset, - err: err, + conn: c, + reader: &c.rbuf, + deadline: adjustedDeadline, + throttle: duration(throttle), + lock: lock, + remain: remain, + offset: offset, + highWaterMark: highWaterMark, + err: err, } } diff --git a/conn_test.go b/conn_test.go index 7f2b459f3..ad0cc6cf0 100644 --- a/conn_test.go +++ b/conn_test.go @@ -164,6 +164,11 @@ func TestConn(t *testing.T) { scenario: "reading messages from an empty partition should timeout after reaching the deadline", function: testConnReadEmptyWithDeadline, }, + + { + scenario: "write batch of messages and read the highest offset (watermark)", + function: testConnReadWatermarkFromBatch, + }, } const ( @@ -365,6 +370,34 @@ func testConnWriteBatchReadSequentially(t *testing.T, conn *Conn) { } } +func testConnReadWatermarkFromBatch(t *testing.T, conn *Conn) { + if _, err := conn.WriteMessages(makeTestSequence(10)...); err != nil { + t.Fatal(err) + } + + const minBytes = 1 + const maxBytes = 10e6 // 10 MB + + value := make([]byte, 10e3) // 10 KB + + batch := conn.ReadBatch(minBytes, maxBytes) + + for i := 0; i < 10; i++ { + _, err := batch.Read(value) + if err != nil { + if err = batch.Close(); err != nil { + t.Fatalf("error trying to read batch message: %s", err) + } + } + + if batch.HighWatermark() != 10 { + t.Fatal("expected highest offset (watermark) to be 10") + } + } + + batch.Close() +} + func testConnWriteReadConcurrently(t *testing.T, conn *Conn) { const N = 1000 var msgs = make([]string, N) diff --git a/read.go b/read.go index 52de6bb92..5923be2dd 100644 --- a/read.go +++ b/read.go @@ -215,7 +215,7 @@ func readSlice(r *bufio.Reader, sz int, v reflect.Value) (int, error) { return sz, nil } -func readFetchResponseHeader(r *bufio.Reader, size int) (throttle int32, remain int, err error) { +func readFetchResponseHeader(r *bufio.Reader, size int) (throttle int32, watermark int64, remain int, err error) { var n int32 var p struct { Partition int32 @@ -273,6 +273,7 @@ func readFetchResponseHeader(r *bufio.Reader, size int) (throttle int32, remain return } + watermark = p.HighwaterMarkOffset return } diff --git a/reader.go b/reader.go index daac6a8c3..f232eb2e9 100644 --- a/reader.go +++ b/reader.go @@ -28,6 +28,7 @@ type Reader struct { cancel context.CancelFunc version int64 offset int64 + lag int64 dirty bool closed bool } @@ -167,13 +168,13 @@ func (r *Reader) ReadMessage(ctx context.Context) (msg Message, err error) { case <-ctx.Done(): err = ctx.Err() return - case m := <-r.msgs: if m.version >= version { r.mutex.Lock() if version == r.version { r.offset = m.message.Offset + 1 } + r.lag = m.watermark - r.offset r.mutex.Unlock() msg = m.message return @@ -203,6 +204,15 @@ func (r *Reader) Offset() int64 { return offset } +// Lag returns the difference between the highest offset in a Kafka partition and the reader's +// current offset. This can be used as a queue-depth indicator. +func (r *Reader) Lag() int64 { + r.mutex.Lock() + lag := r.lag + r.mutex.Unlock() + return lag +} + // SetOffset changes the offset from which the next batch of messages will be // read. // @@ -238,8 +248,9 @@ type reader struct { } type readerMessage struct { - version int64 - message Message + version int64 + message Message + watermark int64 } type readerError struct { @@ -371,8 +382,8 @@ func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, err break } - if err = r.sendMessage(ctx, msg); err != nil { - batch.Close() + if err = r.sendMessage(ctx, msg, batch.HighWatermark()); err != nil { + err = batch.Close() break } @@ -382,9 +393,9 @@ func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, err return offset, err } -func (r *reader) sendMessage(ctx context.Context, msg Message) error { +func (r *reader) sendMessage(ctx context.Context, msg Message, watermark int64) error { select { - case r.msgs <- readerMessage{version: r.version, message: msg}: + case r.msgs <- readerMessage{version: r.version, message: msg, watermark: watermark}: return nil case <-ctx.Done(): return ctx.Err()