Skip to content

Commit

Permalink
ReadBatchWith bug fix and new MaxWaitTime option (segmentio#482)
Browse files Browse the repository at this point in the history
This fixes a bug where ReadBatchWith could incorrectly return `io.EOF`
in certain `errShortRead` scenarios.  Since `io.EOF` means the batch
was handled correctly, this could cause consumers to continue to use
a connection that is in a bad state.

This also adds a MaxWait option that can be used to improve the
way that the conn deadline and the fetch request interact.  Instead
of trying to calculate a correct timeout and change the max wait
time, this allows the caller to explicitly configure both the wait
and the deadline independently of one another.
  • Loading branch information
Steve van Loben Sels authored Aug 13, 2020
1 parent b283236 commit 0dd85d9
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 4 deletions.
29 changes: 25 additions & 4 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,14 @@ type ReadBatchConfig struct {
// ReadUncommitted makes all records visible. With ReadCommitted only
// non-transactional and committed records are visible.
IsolationLevel IsolationLevel

// MaxWait is the amount of time for the broker while waiting to hit the
// min/max byte targets. This setting is independent of any network-level
// timeouts or deadlines.
//
// For backward compatibility, when this field is left zero, kafka-go will
// infer the max wait from the connection's read deadline.
MaxWait time.Duration
}

type IsolationLevel int8
Expand Down Expand Up @@ -790,7 +798,20 @@ func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch {

id, err := c.doRequest(&c.rdeadline, func(deadline time.Time, id int32) error {
now := time.Now()
deadline = adjustDeadlineForRTT(deadline, now, defaultRTT)
var timeout time.Duration
if cfg.MaxWait > 0 {
// explicitly-configured case: no changes are made to the deadline,
// and the timeout is sent exactly as specified.
timeout = cfg.MaxWait
} else {
// default case: use the original logic to adjust the conn's
// deadline.T
deadline = adjustDeadlineForRTT(deadline, now, defaultRTT)
timeout = deadlineToTimeout(deadline, now)
}
// save this variable outside of the closure for later use in detecting
// truncated messages.
adjustedDeadline = deadline
switch fetchVersion {
case v10:
return c.wb.writeFetchRequestV10(
Expand All @@ -801,7 +822,7 @@ func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch {
offset,
cfg.MinBytes,
cfg.MaxBytes+int(c.fetchMinSize),
deadlineToTimeout(deadline, now),
timeout,
int8(cfg.IsolationLevel),
)
case v5:
Expand All @@ -813,7 +834,7 @@ func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch {
offset,
cfg.MinBytes,
cfg.MaxBytes+int(c.fetchMinSize),
deadlineToTimeout(deadline, now),
timeout,
int8(cfg.IsolationLevel),
)
default:
Expand All @@ -825,7 +846,7 @@ func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch {
offset,
cfg.MinBytes,
cfg.MaxBytes+int(c.fetchMinSize),
deadlineToTimeout(deadline, now),
timeout,
)
}
})
Expand Down
51 changes: 51 additions & 0 deletions conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,11 @@ func TestConn(t *testing.T) {
function: testConnReadWatermarkFromBatch,
},

{
scenario: "read a batch using explicit max wait time",
function: testConnReadBatchWithMaxWait,
},

{
scenario: "describe groups retrieves all groups when no groupID specified",
function: testConnDescribeGroupRetrievesAllGroups,
Expand Down Expand Up @@ -542,6 +547,52 @@ func testConnReadWatermarkFromBatch(t *testing.T, conn *Conn) {
batch.Close()
}

func testConnReadBatchWithMaxWait(t *testing.T, conn *Conn) {
if _, err := conn.WriteMessages(makeTestSequence(10)...); err != nil {
t.Fatal(err)
}

const maxBytes = 10e6 // 10 MB

value := make([]byte, 10e3) // 10 KB

cfg := ReadBatchConfig{
MinBytes: maxBytes, // use max for both so that we hit max wait time
MaxBytes: maxBytes,
MaxWait: 500 * time.Millisecond,
}

// set aa read deadline so the batch will succeed.
conn.SetDeadline(time.Now().Add(time.Second))
batch := conn.ReadBatchWith(cfg)

for i := 0; i < 10; i++ {
_, err := batch.Read(value)
if err != nil {
if err = batch.Close(); err != nil {
t.Fatalf("error trying to read batch message: %s", err)
}
}

if batch.HighWaterMark() != 10 {
t.Fatal("expected highest offset (watermark) to be 10")
}
}

batch.Close()

// reset the offset and ensure that the conn deadline takes precedence over
// the max wait
conn.Seek(0, SeekAbsolute)
conn.SetDeadline(time.Now().Add(50 * time.Millisecond))
batch = conn.ReadBatchWith(cfg)
if err := batch.Err(); err == nil {
t.Fatal("should have timed out, but got no error")
} else if netErr, ok := err.(net.Error); !ok || !netErr.Timeout() {
t.Fatalf("should have timed out, but got: %v", err)
}
}

func waitForCoordinator(t *testing.T, conn *Conn, groupID string) {
// ensure that kafka has allocated a group coordinator. oddly, issue doesn't
// appear to happen if the kafka been running for a while.
Expand Down

0 comments on commit 0dd85d9

Please sign in to comment.