Skip to content

Commit

Permalink
add support for reading the first and last offsets of a topic/partition
Browse files Browse the repository at this point in the history
  • Loading branch information
Achille Roussel committed May 31, 2017
1 parent 9fd3208 commit cc7fc22
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 99 deletions.
112 changes: 66 additions & 46 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,6 @@ func (c *Conn) Seek(offset int64, whence int) (int64, error) {
}

first, last, err := c.ReadOffsets()
fmt.Println("first =", first, "last =", last)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -252,7 +251,19 @@ func (c *Conn) Read(b []byte) (int, error) {
var n int
err = c.readOperation(
func(id int32) error {
return nil
return c.writeRequest(fetchRequest, v1, id, fetchRequestV1{
ReplicaID: -1,
MaxWaitTime: 100,
MinBytes: 1,
Topics: []fetchRequestTopicV1{{
TopicName: c.topic,
Partitions: []fetchRequestPartitionV1{{
Partition: c.partition,
FetchOffset: offset,
MaxBytes: int32(len(b) + 1),
}},
}},
})
},
func(size int) error {
return nil
Expand Down Expand Up @@ -302,7 +313,7 @@ func (c *Conn) ReadBatchAt(size int, offset int64) (*BatchReader, error) {

c.wlock.Lock()
c.conn.SetWriteDeadline(deadline)
err := c.writeRequest(fetchRequestKey, v1, id, nil) // TODO
err := c.writeRequest(fetchRequest, v1, id, nil) // TODO
c.wlock.Unlock()

if err != nil {
Expand All @@ -319,48 +330,57 @@ func (c *Conn) ReadBatchAt(size int, offset int64) (*BatchReader, error) {
// ReadOffsets returns the absolute first and last offsets of the topic used by
// the connection.
func (c *Conn) ReadOffsets() (first int64, last int64, err error) {
first, last = -1, -1
type result struct {
off int64
err error
}

err = c.writeOperation(
func(id int32) error {
return c.writeRequest(offsetRequestKey, v1, id, listOffsetRequestV1{
ReplicaID: -1,
Topics: []listOffsetRequestTopicV1{{
TopicName: c.topic,
Partitions: []listOffsetRequestPartitionV1{
{Partition: c.partition, Time: -2},
{Partition: c.partition, Time: -1},
},
}},
})
},
func(size int) error {
var res []listOffsetResponseV1
if err := c.readResponse(size, &res); err != nil {
return err
}
fmt.Printf("%#v\n", res)
for _, r := range res {
for _, p := range r.PartitionOffsets {
if p.ErrorCode != 0 {
return Error(p.ErrorCode)
}
if first < 0 || p.Offset < first {
first = p.Offset
}
if last < 0 || p.Offset > last {
last = p.Offset
resch := make(chan result, 2)
fetch := func(time int64) {
off := int64(0)
err = c.writeOperation(
func(id int32) error {
return c.writeRequest(offsetRequest, v1, id, listOffsetRequestV1{
ReplicaID: -1,
Topics: []listOffsetRequestTopicV1{{
TopicName: c.topic,
Partitions: []listOffsetRequestPartitionV1{{Partition: c.partition, Time: time}},
}},
})
},
func(size int) error {
var res []listOffsetResponseV1
if err := c.readResponse(size, &res); err != nil {
return err
}
for _, r := range res {
for _, p := range r.PartitionOffsets {
if p.ErrorCode != 0 {
return Error(p.ErrorCode)
} else {
off = p.Offset
return nil
}
}
}
}
return nil
},
)

if err == nil && (first < 0 || last < 0) {
err = UnknownTopicOrPartition
return UnknownTopicOrPartition
},
)
resch <- result{off, err}
}

// We have to submit two different requests to fetch the first and last
// offsets because kafka refuses requests that ask for multiple offsets
// on the same topic and partition.
go fetch(-1)
go fetch(-2)

res1 := <-resch
res2 := <-resch

first = minInt64(res1.off, res2.off)
last = maxInt64(res1.off, res2.off)
err = coalesceErrors(res1.err, res2.err)
return
}

Expand All @@ -379,10 +399,10 @@ func (c *Conn) ReadPartitions(topics ...string) (partitions []Partition, err err

err = c.readOperation(
func(id int32) error {
return c.writeRequest(metadataRequestKey, v0, id, topicMetadataRequest(topics))
return c.writeRequest(metadataRequest, v0, id, topicMetadataRequestV0(topics))
},
func(size int) error {
var res metadataResponse
var res metadataResponseV0
if err := c.readResponse(size, &res); err != nil {
return err
}
Expand Down Expand Up @@ -448,12 +468,12 @@ func (c *Conn) Write(b []byte) (int, error) {

err := c.writeOperation(
func(id int32) error {
return c.writeRequest(produceRequestKey, v2, id, produceRequest{
return c.writeRequest(produceRequest, v2, id, produceRequestV2{
RequiredAcks: -1,
Timeout: 10000,
Topics: []produceRequestTopic{{
Topics: []produceRequestTopicV2{{
TopicName: c.topic,
Partitions: []produceRequestPartition{{
Partitions: []produceRequestPartitionV2{{
Partition: c.partition,
MessageSetSize: sizeof(set),
MessageSet: set,
Expand All @@ -462,7 +482,7 @@ func (c *Conn) Write(b []byte) (int, error) {
})
},
func(size int) error {
var res produceResponse
var res produceResponseV2
if err := c.readResponse(size, &res); err != nil {
return err
}
Expand Down
6 changes: 5 additions & 1 deletion conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ import (
"time"
)

func init() {
rand.Seed(time.Now().UnixNano())
}

func TestConn(t *testing.T) {
tests := []struct {
scenario string
Expand Down Expand Up @@ -54,7 +58,7 @@ func TestConn(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

topic := fmt.Sprintf("kafka-go-%d", rand.Int63())
topic := fmt.Sprintf("kafka-go-%016x", rand.Int63())

conn, err := (&Dialer{
Resolver: &net.Resolver{},
Expand Down
2 changes: 1 addition & 1 deletion dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (d *Dialer) LookupLeader(ctx context.Context, network string, address strin
go func() {
for attempt := 0; true; attempt++ {
if attempt != 0 {
sleep(ctx, backoff(attempt, time.Second, time.Minute))
sleep(ctx, backoff(attempt, 100*time.Millisecond, 10*time.Second))
}

partitions, err := c.ReadPartitions(topic)
Expand Down
9 changes: 9 additions & 0 deletions error.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,3 +337,12 @@ func isTemporary(err error) bool {
})
return ok && e.Temporary()
}

func coalesceErrors(err ...error) error {
for _, e := range err {
if e != nil {
return e
}
}
return nil
}
Loading

0 comments on commit cc7fc22

Please sign in to comment.