Skip to content

Commit

Permalink
add ReadOffset + zero-allocation ReadOffsets
Browse files Browse the repository at this point in the history
  • Loading branch information
Achille Roussel committed May 31, 2017
1 parent 4725792 commit f86ba28
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 77 deletions.
106 changes: 48 additions & 58 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,68 +420,62 @@ func (c *Conn) ReadBatchAt(size int, offset int64) (*BatchReader, error) {
return &BatchReader{conn: c, id: id}, nil
}

// ReadOffset returns the offset of the first message with a timestamp equal or
// greater to t.
func (c *Conn) ReadOffset(t time.Time) (int64, error) {
return c.readOffset(timeToTimestamp(t))
}

// ReadOffsets returns the absolute first and last offsets of the topic used by
// the connection.
func (c *Conn) ReadOffsets() (first int64, last int64, err error) {
type result struct {
off int64
err error
}

resch := make(chan result, 2)
fetch := func(ptime int64) {
off := int64(0)
err := c.writeOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(offsetRequest, v1, id, listOffsetRequestV1{
ReplicaID: -1,
Topics: []listOffsetRequestTopicV1{{
TopicName: c.topic,
Partitions: []listOffsetRequestPartitionV1{{Partition: c.partition, Time: ptime}},
}},
})
},
func(deadline time.Time, size int) error {
return expectZeroSize(streamArray(&c.rbuf, size, func(r *bufio.Reader, size int) (int, error) {
// We skip the topic name because we've made a request for
// a single topic.
size, err := discardString(r, size)
if err != nil {
return size, err
}

// Reading the array of partitions, there will be only one
// partition which gives the offset we're looking for.
return streamArray(r, size, func(r *bufio.Reader, size int) (int, error) {
var p partitionOffsetV1
size, err := read(r, size, &p)
if err != nil {
return size, err
}
if p.ErrorCode != 0 {
return size, Error(p.ErrorCode)
}
off = p.Offset
return size, nil
})
}))
},
)
resch <- result{off, err}
}

// We have to submit two different requests to fetch the first and last
// offsets because kafka refuses requests that ask for multiple offsets
// on the same topic and partition.
go fetch(-1)
go fetch(-2)
if last, err = c.readOffset(-1); err != nil {
return
}
first, err = c.readOffset(-2)
return
}

res1 := <-resch
res2 := <-resch
func (c *Conn) readOffset(t int64) (offset int64, err error) {
err = c.writeOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(offsetRequest, v1, id, listOffsetRequestV1{
ReplicaID: -1,
Topics: []listOffsetRequestTopicV1{{
TopicName: c.topic,
Partitions: []listOffsetRequestPartitionV1{{Partition: c.partition, Time: t}},
}},
})
},
func(deadline time.Time, size int) error {
return expectZeroSize(streamArray(&c.rbuf, size, func(r *bufio.Reader, size int) (int, error) {
// We skip the topic name because we've made a request for
// a single topic.
size, err := discardString(r, size)
if err != nil {
return size, err
}

first = minInt64(res1.off, res2.off)
last = maxInt64(res1.off, res2.off)
err = coalesceErrors(res1.err, res2.err)
// Reading the array of partitions, there will be only one
// partition which gives the offset we're looking for.
return streamArray(r, size, func(r *bufio.Reader, size int) (int, error) {
var p partitionOffsetV1
size, err := read(r, size, &p)
if err != nil {
return size, err
}
if p.ErrorCode != 0 {
return size, Error(p.ErrorCode)
}
offset = p.Offset
return size, nil
})
}))
},
)
return
}

Expand Down Expand Up @@ -719,7 +713,3 @@ type timeout struct{}
func (*timeout) Error() string { return "kafka operation timeout" }
func (*timeout) Timeout() bool { return true }
func (*timeout) Temporary() bool { return false }

func timestamp() int64 {
return time.Now().UnixNano() / 1e6 // ns => ms
}
9 changes: 0 additions & 9 deletions error.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,12 +337,3 @@ func isTemporary(err error) bool {
})
return ok && e.Temporary()
}

func coalesceErrors(err ...error) error {
for _, e := range err {
if e != nil {
return e
}
}
return nil
}
19 changes: 9 additions & 10 deletions protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"hash/crc32"
"io"
"reflect"
"time"
)

type apiKey int16
Expand Down Expand Up @@ -798,16 +799,14 @@ func sizeofSlice(v reflect.Value) (size int32) {
return
}

func minInt64(a int64, b int64) int64 {
if a < b {
return a
}
return b
func timestamp() int64 {
return timeToTimestamp(time.Now())
}

func maxInt64(a int64, b int64) int64 {
if a > b {
return a
}
return b
func timeToTimestamp(t time.Time) int64 {
return t.UnixNano() / int64(time.Millisecond)
}

func timestampToTime(t int64) time.Time {
return time.Unix(t/1000, (t%1000)*int64(time.Millisecond))
}

0 comments on commit f86ba28

Please sign in to comment.