Skip to content

Commit

Permalink
automatically discover first/last offset in client fetch requests (se…
Browse files Browse the repository at this point in the history
  • Loading branch information
Achille authored Apr 23, 2021
1 parent 8131dae commit 9a279b7
Showing 1 changed file with 39 additions and 2 deletions.
41 changes: 39 additions & 2 deletions fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ type FetchRequest struct {
// Address of the kafka broker to send the request to.
Addr net.Addr

// Topic, partition, and offset to retrieve records from.
// Topic, partition, and offset to retrieve records from. The offset may be
// one of the special FirstOffset or LastOffset constants, in which case the
// request will automatically discover the first or last offset of the
// partition and submit the request for these.
Topic string
Partition int
Offset int64
Expand Down Expand Up @@ -90,6 +93,40 @@ func (c *Client) Fetch(ctx context.Context, req *FetchRequest) (*FetchResponse,
timeout = maxWait
}

offset := req.Offset
switch offset {
case FirstOffset, LastOffset:
topic, partition := req.Topic, req.Partition

r, err := c.ListOffsets(ctx, &ListOffsetsRequest{
Addr: req.Addr,
Topics: map[string][]OffsetRequest{
topic: []OffsetRequest{{
Partition: partition,
Timestamp: offset,
}},
},
})
if err != nil {
return nil, fmt.Errorf("kafka.(*Client).Fetch: %w", err)
}

for _, p := range r.Topics[topic] {
if p.Partition == partition {
if p.Error != nil {
return nil, fmt.Errorf("kafka.(*Client).Fetch: %w", p.Error)
}
switch offset {
case FirstOffset:
offset = p.FirstOffset
case LastOffset:
offset = p.LastOffset
}
break
}
}
}

m, err := c.roundTrip(ctx, req.Addr, &fetchAPI.Request{
ReplicaID: -1,
MaxWaitTime: milliseconds(timeout),
Expand All @@ -103,7 +140,7 @@ func (c *Client) Fetch(ctx context.Context, req *FetchRequest) (*FetchResponse,
Partitions: []fetchAPI.RequestPartition{{
Partition: int32(req.Partition),
CurrentLeaderEpoch: -1,
FetchOffset: req.Offset,
FetchOffset: offset,
LogStartOffset: -1,
PartitionMaxBytes: int32(req.MaxBytes),
}},
Expand Down

0 comments on commit 9a279b7

Please sign in to comment.