Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into 0.4
Browse files Browse the repository at this point in the history
  • Loading branch information
Achille Roussel committed Sep 5, 2020
2 parents 8766085 + ff55ecc commit 6bd4c78
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 13 deletions.
29 changes: 25 additions & 4 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,14 @@ type ReadBatchConfig struct {
// ReadUncommitted makes all records visible. With ReadCommitted only
// non-transactional and committed records are visible.
IsolationLevel IsolationLevel

// MaxWait is the amount of time for the broker while waiting to hit the
// min/max byte targets. This setting is independent of any network-level
// timeouts or deadlines.
//
// For backward compatibility, when this field is left zero, kafka-go will
// infer the max wait from the connection's read deadline.
MaxWait time.Duration
}

type IsolationLevel int8
Expand Down Expand Up @@ -773,7 +781,20 @@ func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch {

id, err := c.doRequest(&c.rdeadline, func(deadline time.Time, id int32) error {
now := time.Now()
deadline = adjustDeadlineForRTT(deadline, now, defaultRTT)
var timeout time.Duration
if cfg.MaxWait > 0 {
// explicitly-configured case: no changes are made to the deadline,
// and the timeout is sent exactly as specified.
timeout = cfg.MaxWait
} else {
// default case: use the original logic to adjust the conn's
// deadline.T
deadline = adjustDeadlineForRTT(deadline, now, defaultRTT)
timeout = deadlineToTimeout(deadline, now)
}
// save this variable outside of the closure for later use in detecting
// truncated messages.
adjustedDeadline = deadline
switch fetchVersion {
case v10:
return c.wb.writeFetchRequestV10(
Expand All @@ -784,7 +805,7 @@ func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch {
offset,
cfg.MinBytes,
cfg.MaxBytes+int(c.fetchMinSize),
deadlineToTimeout(deadline, now),
timeout,
int8(cfg.IsolationLevel),
)
case v5:
Expand All @@ -796,7 +817,7 @@ func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch {
offset,
cfg.MinBytes,
cfg.MaxBytes+int(c.fetchMinSize),
deadlineToTimeout(deadline, now),
timeout,
int8(cfg.IsolationLevel),
)
default:
Expand All @@ -808,7 +829,7 @@ func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch {
offset,
cfg.MinBytes,
cfg.MaxBytes+int(c.fetchMinSize),
deadlineToTimeout(deadline, now),
timeout,
)
}
})
Expand Down
51 changes: 51 additions & 0 deletions conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,11 @@ func TestConn(t *testing.T) {
function: testConnReadWatermarkFromBatch,
},

{
scenario: "read a batch using explicit max wait time",
function: testConnReadBatchWithMaxWait,
},

{
scenario: "describe groups retrieves all groups when no groupID specified",
function: testConnDescribeGroupRetrievesAllGroups,
Expand Down Expand Up @@ -540,6 +545,52 @@ func testConnReadWatermarkFromBatch(t *testing.T, conn *Conn) {
batch.Close()
}

func testConnReadBatchWithMaxWait(t *testing.T, conn *Conn) {
if _, err := conn.WriteMessages(makeTestSequence(10)...); err != nil {
t.Fatal(err)
}

const maxBytes = 10e6 // 10 MB

value := make([]byte, 10e3) // 10 KB

cfg := ReadBatchConfig{
MinBytes: maxBytes, // use max for both so that we hit max wait time
MaxBytes: maxBytes,
MaxWait: 500 * time.Millisecond,
}

// set aa read deadline so the batch will succeed.
conn.SetDeadline(time.Now().Add(time.Second))
batch := conn.ReadBatchWith(cfg)

for i := 0; i < 10; i++ {
_, err := batch.Read(value)
if err != nil {
if err = batch.Close(); err != nil {
t.Fatalf("error trying to read batch message: %s", err)
}
}

if batch.HighWaterMark() != 10 {
t.Fatal("expected highest offset (watermark) to be 10")
}
}

batch.Close()

// reset the offset and ensure that the conn deadline takes precedence over
// the max wait
conn.Seek(0, SeekAbsolute)
conn.SetDeadline(time.Now().Add(50 * time.Millisecond))
batch = conn.ReadBatchWith(cfg)
if err := batch.Err(); err == nil {
t.Fatal("should have timed out, but got no error")
} else if netErr, ok := err.(net.Error); !ok || !netErr.Timeout() {
t.Fatalf("should have timed out, but got: %v", err)
}
}

func waitForCoordinator(t *testing.T, conn *Conn, groupID string) {
// ensure that kafka has allocated a group coordinator. oddly, issue doesn't
// appear to happen if the kafka been running for a while.
Expand Down
24 changes: 18 additions & 6 deletions dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ type Dialer struct {
// Unique identifier for client connections established by this Dialer.
ClientID string

// Optionally specifies the function that the dialer uses to establish
// network connections. If nil, net.(*Dialer).DialContext is used instead.
//
// When DialFunc is set, LocalAddr, DualStack, FallbackDelay, and KeepAlive
// are ignored.
DialFunc func(ctx context.Context, network string, address string) (net.Conn, error)

// Timeout is the maximum amount of time a dial will wait for a connect to
// complete. If Deadline is also set, it may fail earlier.
//
Expand Down Expand Up @@ -329,12 +336,17 @@ func (d *Dialer) dialContext(ctx context.Context, network string, address string
}
}

conn, err := (&net.Dialer{
LocalAddr: d.LocalAddr,
DualStack: d.DualStack,
FallbackDelay: d.FallbackDelay,
KeepAlive: d.KeepAlive,
}).DialContext(ctx, network, address)
dial := d.DialFunc
if dial == nil {
dial = (&net.Dialer{
LocalAddr: d.LocalAddr,
DualStack: d.DualStack,
FallbackDelay: d.FallbackDelay,
KeepAlive: d.KeepAlive,
}).DialContext
}

conn, err := dial(ctx, network, address)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion example_consumergroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func ExampleConsumerGroupParallelReaders() {
case kafka.ErrGenerationEnded:
// generation has ended. commit offsets. in a real app,
// offsets would be committed periodically.
gen.CommitOffsets(map[string]map[int]int64{"my-topic": {partition: offset}})
gen.CommitOffsets(map[string]map[int]int64{"my-topic": {partition: offset + 1}})
return
case nil:
fmt.Printf("received message %s/%d/%d : %s\n", msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
Expand Down
6 changes: 5 additions & 1 deletion reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,11 @@ func (r *Reader) Close() error {
// The method returns io.EOF to indicate that the reader has been closed.
//
// If consumer groups are used, ReadMessage will automatically commit the
// offset when called.
// offset when called. Note that this could result in an offset being committed
// before the message is fully processed.
//
// If more fine grained control of when offsets are committed is required, it
// is recommended to use FetchMessage with CommitMessages instead.
func (r *Reader) ReadMessage(ctx context.Context) (Message, error) {
m, err := r.FetchMessage(ctx)
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,6 @@ func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error {
werr[i] = batch.err
}
}

return werr
}

Expand Down

0 comments on commit 6bd4c78

Please sign in to comment.