Skip to content

Commit

Permalink
Set offset by timestamp. (segmentio#221)
Browse files Browse the repository at this point in the history
Ray Jenkins authored Mar 1, 2019
1 parent 33ba772 commit b8e28f8
Showing 2 changed files with 77 additions and 3 deletions.
32 changes: 32 additions & 0 deletions reader.go
Original file line number Diff line number Diff line change
@@ -1483,6 +1483,38 @@ func (r *Reader) SetOffset(offset int64) error {
return err
}

// SetOffsetAt changes the offset from which the next batch of messages will be
// read given the timestamp t.
//
// The method fails if the unable to connect partition leader, or unable to read the offset
// given the ts, or if the reader has been closed.
func (r *Reader) SetOffsetAt(ctx context.Context, t time.Time) error {
r.mutex.Lock()
if r.closed {
r.mutex.Unlock()
return io.ErrClosedPipe
}
r.mutex.Unlock()

for _, broker := range r.config.Brokers {
conn, err := r.config.Dialer.DialLeader(ctx, "tcp", broker, r.config.Topic, r.config.Partition)
if err != nil {
continue
}

deadline, _ := ctx.Deadline()
conn.SetDeadline(deadline)
offset, err := conn.ReadOffset(t)
conn.Close()
if err != nil {
return err
}

return r.SetOffset(offset)
}
return fmt.Errorf("error setting offset for timestamp %+v", t)
}

// Stats returns a snapshot of the reader stats since the last time the method
// was called, or since the reader was created if it is called for the first
// time.
48 changes: 45 additions & 3 deletions reader_test.go
Original file line number Diff line number Diff line change
@@ -38,6 +38,11 @@ func TestReader(t *testing.T) {
function: testReaderSetRandomOffset,
},

{
scenario: "setting the offset by TimeStamp",
function: testReaderSetOffsetAt,
},

{
scenario: "calling Lag returns the lag of the last message read from kafka",
function: testReaderLag,
@@ -163,6 +168,43 @@ func testReaderSetRandomOffset(t *testing.T, ctx context.Context, r *Reader) {
}
}

func testReaderSetOffsetAt(t *testing.T, ctx context.Context, r *Reader) {
// We make 2 batches of messages here with a brief 2 second pause
// to ensure messages 0...9 will be written a few seconds before messages 10...19
// We'll then fetch the timestamp for message offset 10 and use that timestamp to set
// our reader
const N = 10
prepareReader(t, ctx, r, makeTestSequence(N)...)
time.Sleep(time.Second * 2)
prepareReader(t, ctx, r, makeTestSequence(N)...)

var ts time.Time
for i := 0; i < N*2; i++ {
m, err := r.ReadMessage(ctx)
if err != nil {
t.Error("error reading message", err)
}
// grab the time for the 10th message
if i == 10 {
ts = m.Time
}
}

err := r.SetOffsetAt(ctx, ts)
if err != nil {
t.Fatal("error setting offset by timestamp", err)
}

m, err := r.ReadMessage(context.Background())
if err != nil {
t.Fatal("error reading message", err)
}

if m.Offset != 10 {
t.Errorf("expected offset of 10, received offset %d", m.Offset)
}
}

func testReaderLag(t *testing.T, ctx context.Context, r *Reader) {
const N = 5
prepareReader(t, ctx, r, makeTestSequence(N)...)
@@ -1137,9 +1179,10 @@ func testReaderConsumerGroupReadContentAcrossPartitions(t *testing.T, ctx contex

// Build a struct to implement the ReadPartitions interface.
type MockConnWatcher struct {
count int
count int
partitions [][]Partition
}

func (m *MockConnWatcher) ReadPartitions(topics ...string) (partitions []Partition, err error) {
partitions = m.partitions[m.count]
// cap the count at len(partitions) -1 so ReadPartitions doesn't even go out of bounds
@@ -1182,12 +1225,11 @@ func testReaderConsumerGroupRebalanceOnPartitionAdd(t *testing.T, ctx context.Co
r.config.PartitionWatchInterval = watchTime
rg.Go(r.partitionWatcher(conn))
rg.Wait()
if time.Now().Sub(now).Seconds() > r.config.PartitionWatchInterval.Seconds() * 4 {
if time.Now().Sub(now).Seconds() > r.config.PartitionWatchInterval.Seconds()*4 {
t.Error("partitionWatcher didn't see update")
}
}


func testReaderConsumerGroupRebalance(t *testing.T, ctx context.Context, r *Reader) {
r2 := NewReader(r.config)
defer r.Close()

0 comments on commit b8e28f8

Please sign in to comment.