Skip to content

Commit

Permalink
Merge pull request segmentio#6 from segmentio/reader-improvements
Browse files Browse the repository at this point in the history
reader improvements
  • Loading branch information
achille-roussel authored Jun 2, 2017
2 parents 4243aa1 + 5178862 commit 51a0752
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 41 deletions.
8 changes: 5 additions & 3 deletions circle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ test:
--volume ${PWD}:/go/src/github.com/${CIRCLE_PROJECT_USERNAME}/${CIRCLE_PROJECT_REPONAME}
--workdir /go/src/github.com/${CIRCLE_PROJECT_USERNAME}/${CIRCLE_PROJECT_REPONAME}
segment/golang:latest
go.test='sleep 30 && govendor test -v -race -cover .'
go.bench='true' # skip on CI
go.test='sleep 10 && govendor test -v -covermode atomic .'
go.bench='true'
# - sleep to give kafka time to start
# - disable the race detector because there's too much concurrency going on in the tests and it takes too long to complete
# - disable benchmarks because they rely on having kafka running
17 changes: 15 additions & 2 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,16 +357,29 @@ func (c *Conn) ReadOffset(t time.Time) (int64, error) {
return c.readOffset(timestamp(t))
}

// ReadFirstOffset returns the first offset available on the connection.
func (c *Conn) ReadFirstOffset() (int64, error) {
return c.readOffset(-2)
}

// ReadLastOffset returns the last offset available on the conncetion.
func (c *Conn) ReadLastOffset() (int64, error) {
return c.readOffset(-1)
}

// ReadOffsets returns the absolute first and last offsets of the topic used by
// the connection.
func (c *Conn) ReadOffsets() (first int64, last int64, err error) {
// We have to submit two different requests to fetch the first and last
// offsets because kafka refuses requests that ask for multiple offsets
// on the same topic and partition.
if last, err = c.readOffset(-1); err != nil {
if first, err = c.ReadFirstOffset(); err != nil {
return
}
if last, err = c.ReadLastOffset(); err != nil {
first = 0 // don't leak the value on error
return
}
first, err = c.readOffset(-2)
return
}

Expand Down
8 changes: 8 additions & 0 deletions dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,14 @@ type Resolver interface {
}

func sleep(ctx context.Context, duration time.Duration) bool {
if duration != 0 {
select {
default:
return true
case <-ctx.Done():
return false
}
}
timer := time.NewTimer(duration)
defer timer.Stop()
select {
Expand Down
66 changes: 49 additions & 17 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,13 @@ func (r *reader) run(ctx context.Context, offset int64) {
}

conn, start, err := r.initialize(ctx, offset)
if err != nil {
switch err {
case nil:
case OffsetOutOfRange:
// This would happen if the requested offset is passed the last
// offset on the partition leader. In that case we're just going
// to retry later hoping that enough data has been produced.
default:
// Wait 4 attempts before reporting the first errors, this helps
// mitigate situations where the kafka server is temporarily
// unavailable.
Expand All @@ -284,42 +290,68 @@ func (r *reader) run(ctx context.Context, offset int64) {
// to the connection we know we'll want to restart from this offset.
offset = start

errcount := 0
readLoop:
for {
if !sleep(ctx, backoff(errcount, 100*time.Millisecond, time.Second)) {
conn.Close()
return
}

switch offset, err = r.read(ctx, offset, conn); err {
case nil:
case RequestTimedOut:
case context.Canceled: // another reader has taken over
case nil, RequestTimedOut:
// Timeout on the kafka side, this can be safely retried.
errcount = 0
continue
case OffsetOutOfRange:
// We may be reading past the last offset, will retry later.
case context.Canceled:
// Another reader has taken over, we can safely quit.
conn.Close()
return
default:
conn.Close()
break readLoop
if _, ok := err.(Error); ok {
r.sendError(ctx, err)
} else {
conn.Close()
break readLoop
}
}

errcount++
}
}
}

func (r *reader) initialize(ctx context.Context, offset int64) (conn *Conn, start int64, err error) {
var whence int

if offset < 0 {
offset = 0
} else {
whence = 1
}
for i := 0; i != len(r.brokers) && conn == nil; i++ {
var broker = r.brokers[i]
var first int64

for _, broker := range r.brokers {
if conn, err = r.dialer.DialLeader(ctx, "tcp", broker, r.topic, r.partition); err != nil {
continue
}

if start, err = conn.Seek(offset, whence); err != nil {
// This deadline controls how long the offset negotiation may take.
conn.SetDeadline(time.Now().Add(10 * time.Second))

if first, err = conn.ReadFirstOffset(); err != nil {
conn.Close()
continue
conn = nil
break
}

// In case the reader was configured with an offset before the first
// offset, skipping directly to the first offset.
if offset < first {
offset = first
}

break
if start, err = conn.Seek(offset, 1); err != nil {
conn.Close()
conn = nil
break
}
}

return
Expand Down
28 changes: 9 additions & 19 deletions reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,6 @@ func TestReader(t *testing.T) {
function: testReaderReadMessages,
},

{
scenario: "setting the offset to an invalid value should return an error on the next Read call",
function: testReaderSetInvalidOffset,
},

{
scenario: "setting the offset to random values should return the expected messages when Read is called",
function: testReaderSetRandomOffset,
Expand All @@ -45,9 +40,10 @@ func TestReader(t *testing.T) {
defer cancel()

r := NewReader(ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: makeTopic(),
MaxWait: 500 * time.Millisecond,
Brokers: []string{"localhost:9092"},
Topic: makeTopic(),
MinBytes: 1,
MaxBytes: 10e6,
})
defer r.Close()
testFunc(t, ctx, r)
Expand All @@ -68,12 +64,15 @@ func testReaderReadMessages(t *testing.T, ctx context.Context, r *Reader) {
const N = 1000
prepareReader(t, ctx, r, makeTestSequence(N)...)

var offset int64

for i := 0; i != N; i++ {
m, err := r.ReadMessage(ctx)
if err != nil {
t.Error(err)
t.Error("reading message at offset", offset, "failed:", err)
return
}
offset = m.Offset + 1
v, _ := strconv.Atoi(string(m.Value))
if v != i {
t.Error("message at index", i, "has wrong value:", v)
Expand All @@ -82,15 +81,6 @@ func testReaderReadMessages(t *testing.T, ctx context.Context, r *Reader) {
}
}

func testReaderSetInvalidOffset(t *testing.T, ctx context.Context, r *Reader) {
r.SetOffset(42)

_, err := r.ReadMessage(ctx)
if err == nil {
t.Error(err)
}
}

func testReaderSetRandomOffset(t *testing.T, ctx context.Context, r *Reader) {
const N = 10
prepareReader(t, ctx, r, makeTestSequence(N)...)
Expand All @@ -100,7 +90,7 @@ func testReaderSetRandomOffset(t *testing.T, ctx context.Context, r *Reader) {
r.SetOffset(int64(offset))
m, err := r.ReadMessage(ctx)
if err != nil {
t.Error(err)
t.Error("seeking to offset", offset, "failed:", err)
return
}
v, _ := strconv.Atoi(string(m.Value))
Expand Down

0 comments on commit 51a0752

Please sign in to comment.