Skip to content

Commit

Permalink
add high offset (watermark) and reader lag support
Browse files Browse the repository at this point in the history
  • Loading branch information
thehydroimpulse committed Jun 5, 2017
1 parent 51a0752 commit 7795237
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 26 deletions.
24 changes: 15 additions & 9 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@ import (
//
// Batches are safe to use concurrently from multiple goroutines.
type Batch struct {
mutex sync.Mutex
conn *Conn
lock *sync.Mutex
reader *bufio.Reader
deadline time.Time
throttle time.Duration
remain int
offset int64
err error
mutex sync.Mutex
conn *Conn
lock *sync.Mutex
reader *bufio.Reader
deadline time.Time
throttle time.Duration
remain int
offset int64
highWaterMark int64
err error
}

// Throttle gives the throttling duration applied by the kafka server on the
Expand All @@ -34,6 +35,11 @@ func (batch *Batch) Throttle() time.Duration {
return batch.throttle
}

// Watermark returns the current highest watermark in a partition.
func (batch *Batch) HighWaterMark() int64 {
return batch.highWaterMark
}

// Offset returns the offset of the next message in the batch.
func (batch *Batch) Offset() int64 {
batch.mutex.Lock()
Expand Down
19 changes: 10 additions & 9 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,16 +338,17 @@ func (c *Conn) ReadBatch(minBytes int, maxBytes int) *Batch {
return &Batch{err: err}
}

throttle, remain, err := readFetchResponseHeader(&c.rbuf, size)
throttle, highWaterMark, remain, err := readFetchResponseHeader(&c.rbuf, size)
return &Batch{
conn: c,
reader: &c.rbuf,
deadline: adjustedDeadline,
throttle: duration(throttle),
lock: lock,
remain: remain,
offset: offset,
err: err,
conn: c,
reader: &c.rbuf,
deadline: adjustedDeadline,
throttle: duration(throttle),
lock: lock,
remain: remain,
offset: offset,
highWaterMark: highWaterMark,
err: err,
}
}

Expand Down
33 changes: 33 additions & 0 deletions conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,11 @@ func TestConn(t *testing.T) {
scenario: "reading messages from an empty partition should timeout after reaching the deadline",
function: testConnReadEmptyWithDeadline,
},

{
scenario: "write batch of messages and read the highest offset (watermark)",
function: testConnReadWatermarkFromBatch,
},
}

const (
Expand Down Expand Up @@ -365,6 +370,34 @@ func testConnWriteBatchReadSequentially(t *testing.T, conn *Conn) {
}
}

func testConnReadWatermarkFromBatch(t *testing.T, conn *Conn) {
if _, err := conn.WriteMessages(makeTestSequence(10)...); err != nil {
t.Fatal(err)
}

const minBytes = 1
const maxBytes = 10e6 // 10 MB

value := make([]byte, 10e3) // 10 KB

batch := conn.ReadBatch(minBytes, maxBytes)

for i := 0; i < 10; i++ {
_, err := batch.Read(value)
if err != nil {
if err = batch.Close(); err != nil {
t.Fatalf("error trying to read batch message: %s", err)
}
}

if batch.HighWatermark() != 10 {
t.Fatal("expected highest offset (watermark) to be 10")
}
}

batch.Close()
}

func testConnWriteReadConcurrently(t *testing.T, conn *Conn) {
const N = 1000
var msgs = make([]string, N)
Expand Down
3 changes: 2 additions & 1 deletion read.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func readSlice(r *bufio.Reader, sz int, v reflect.Value) (int, error) {
return sz, nil
}

func readFetchResponseHeader(r *bufio.Reader, size int) (throttle int32, remain int, err error) {
func readFetchResponseHeader(r *bufio.Reader, size int) (throttle int32, watermark int64, remain int, err error) {
var n int32
var p struct {
Partition int32
Expand Down Expand Up @@ -273,6 +273,7 @@ func readFetchResponseHeader(r *bufio.Reader, size int) (throttle int32, remain
return
}

watermark = p.HighwaterMarkOffset
return
}

Expand Down
25 changes: 18 additions & 7 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Reader struct {
cancel context.CancelFunc
version int64
offset int64
lag int64
dirty bool
closed bool
}
Expand Down Expand Up @@ -167,13 +168,13 @@ func (r *Reader) ReadMessage(ctx context.Context) (msg Message, err error) {
case <-ctx.Done():
err = ctx.Err()
return

case m := <-r.msgs:
if m.version >= version {
r.mutex.Lock()
if version == r.version {
r.offset = m.message.Offset + 1
}
r.lag = m.watermark - r.offset
r.mutex.Unlock()
msg = m.message
return
Expand Down Expand Up @@ -203,6 +204,15 @@ func (r *Reader) Offset() int64 {
return offset
}

// Lag returns the difference between the highest offset in a Kafka partition and the reader's
// current offset. This can be used as a queue-depth indicator.
func (r *Reader) Lag() int64 {
r.mutex.Lock()
lag := r.lag
r.mutex.Unlock()
return lag
}

// SetOffset changes the offset from which the next batch of messages will be
// read.
//
Expand Down Expand Up @@ -238,8 +248,9 @@ type reader struct {
}

type readerMessage struct {
version int64
message Message
version int64
message Message
watermark int64
}

type readerError struct {
Expand Down Expand Up @@ -371,8 +382,8 @@ func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, err
break
}

if err = r.sendMessage(ctx, msg); err != nil {
batch.Close()
if err = r.sendMessage(ctx, msg, batch.HighWatermark()); err != nil {
err = batch.Close()
break
}

Expand All @@ -382,9 +393,9 @@ func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, err
return offset, err
}

func (r *reader) sendMessage(ctx context.Context, msg Message) error {
func (r *reader) sendMessage(ctx context.Context, msg Message, watermark int64) error {
select {
case r.msgs <- readerMessage{version: r.version, message: msg}:
case r.msgs <- readerMessage{version: r.version, message: msg, watermark: watermark}:
return nil
case <-ctx.Done():
return ctx.Err()
Expand Down

0 comments on commit 7795237

Please sign in to comment.