Skip to content

Commit

Permalink
cleanup in preparation of batch reader implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
Achille Roussel committed May 31, 2017
1 parent 369023d commit 3258cb9
Show file tree
Hide file tree
Showing 8 changed files with 752 additions and 679 deletions.
12 changes: 12 additions & 0 deletions batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package kafka

import "sync"

type Batch struct {
conn *Conn
lock *sync.Mutex
}

func (batch *Batch) Read(b []byte) (int, error) {
return 0, nil
}
178 changes: 87 additions & 91 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type Conn struct {
topic string
partition int32
correlationID int32
fetchMaxBytes int32
fetchMinSize int32
}

Expand Down Expand Up @@ -106,27 +107,30 @@ func NewConnWith(conn net.Conn, config ConnConfig) *Conn {
panic(fmt.Sprintf("invalid partition number: %d", config.Partition))
}

return &Conn{
c := &Conn{
conn: conn,
rbuf: *bufio.NewReader(conn),
wbuf: *bufio.NewWriter(conn),
clientID: config.ClientID,
topic: config.Topic,
partition: int32(config.Partition),
offset: -2,
}

// The fetch request needs to ask for a MaxBytes value that is at least
// enough to load the control data of the response. To avoid having to
// recompute it on every read, it is cached here in the Conn value.
fetchMinSize: sizeof(fetchResponseV1{
Topics: []fetchResponseTopicV1{{
TopicName: config.Topic,
Partitions: []fetchResponsePartitionV1{{
Partition: int32(config.Partition),
}},
// The fetch request needs to ask for a MaxBytes value that is at least
// enough to load the control data of the response. To avoid having to
// recompute it on every read, it is cached here in the Conn value.
c.fetchMinSize = sizeof(fetchResponseV1{
Topics: []fetchResponseTopicV1{{
TopicName: config.Topic,
Partitions: []fetchResponsePartitionV1{{
Partition: int32(config.Partition),
MessageSet: messageSet{{}},
}},
}),
}
}},
})
c.fetchMaxBytes = math.MaxInt32 - c.fetchMinSize
return c
}

// Close closes the kafka connection.
Expand Down Expand Up @@ -276,111 +280,99 @@ func (c *Conn) Read(b []byte) (int, error) {
//
// Unlike Read, the method doesn't modify the offset of the connection.
func (c *Conn) ReadAt(b []byte, offset int64) (int, int64, error) {
maxBytes := len(b)
maxFetch := int(c.fetchMaxBytes)
if maxBytes > maxFetch {
maxBytes = maxFetch
}

var adjustedDeadline time.Time
var ok bool
var n int
var err = c.readOperation(
func(deadline time.Time, id int32) error {
now := time.Now()
adjustedDeadline = adjustDeadlineForRTT(deadline, now, defaultRTT)
return c.writeRequest(fetchRequest, v1, id, fetchRequestV1{
adj := adjustDeadlineForRTT(deadline, now, defaultRTT)
err := c.writeRequest(fetchRequest, v1, id, fetchRequestV1{
ReplicaID: -1,
MinBytes: 1,
MaxWaitTime: milliseconds(deadlineToTimeout(adjustedDeadline, now)),
MaxWaitTime: milliseconds(deadlineToTimeout(adj, now)),
Topics: []fetchRequestTopicV1{{
TopicName: c.topic,
Partitions: []fetchRequestPartitionV1{{
Partition: c.partition,
MaxBytes: c.fetchMinSize + int32(len(b)),
MaxBytes: c.fetchMinSize + int32(maxBytes),
FetchOffset: offset,
}},
}},
})
adjustedDeadline = adj
return err
},
func(deadline time.Time, size int) error {
// Skipping the throttle time since there's no way to report this
// information with the Read method.
size, err := discardInt32(&c.rbuf, size)
var r = &c.rbuf

_, size, err := readFetchResponseHeader(r, size)
if err != nil {
return err
}

// Consume the stream of fetched topic and partition, we skip the
// topic name because we sent the request for a single topic.
return expectZeroSize(streamArray(&c.rbuf, size, func(r *bufio.Reader, size int) (int, error) {
size, err := discardString(&c.rbuf, size)
if err != nil {
return size, err
}

// As an "optimization" kafka truncates the returned response
// after producing MaxBytes, which could then cause the code to
// return errShortRead.
// Because we read at least c.fetchMinSize bytes we should be
// able to decode all the control values for the first message,
// and errShortRead should only happen when reading the message
// key or value.
// I'm not sure this is rock solid and there may be some weird
// edge cases...
// There's just so much we can do with questionable design.
return streamArray(r, size, func(r *bufio.Reader, size int) (int, error) {
// Partition header, followed by the message set.
var p struct {
Partition int32
ErrorCode int16
HighwaterMarkOffset int64
MessageSetSize int32
}
for size != 0 {
var moffset int64

size, err := read(r, size, &p)
if err != nil {
return ignoreShortRead(size, err)
}
if p.ErrorCode != 0 {
return size, Error(p.ErrorCode)
}

done := false

for size != 0 && err == nil {
var msgOffset int64
var msgSize int32

if msgOffset, msgSize, size, err = readMessageOffsetAndSize(r, size); err != nil {
continue
if moffset, _, _, size, err = readMessage(r, size,
func(r *bufio.Reader, size int, nbytes int) (int, error) {
if nbytes > size {
nbytes = size
}

if done {
// If the fetch returned any trailing messages we
// just discard them all because we only load one
// message into the read buffer.
size, err = discardN(r, size, int(msgSize))
continue
return discardN(r, size, nbytes)
},
func(r *bufio.Reader, size int, nbytes int) (int, error) {
n = nbytes // return value of the ReadAt method
if nbytes > size {
nbytes = size
}
done = true

// Message header, followed by the key and value.
if _, _, size, err = readMessageHeader(r, size); err != nil {
continue
if nbytes > len(b) {
nbytes = len(b)
}
if n, size, err = readMessageBytes(r, size, b); err != nil {
continue
}
if n, size, err = readMessageBytes(r, size, b); err != nil {
continue
}
offset = msgOffset
}
nbytes, err := io.ReadFull(r, b[:nbytes])
return size - nbytes, err
},
); err != nil {
break
}

return ignoreShortRead(size, err)
})
}))
// When the messages are compressed kafka may return messages at
// an earlier offset than the one that was requested, apparently
// it's the client's responsibility to ignore those.
if moffset >= offset {
offset, ok = moffset, true
// There may be trailing bytes if more than one message was
// returned, in which case we simply ignore them all.
size, err = discardN(r, size, size)
break
}
}

// As an "optimization" kafka truncates the returned response
// after producing MaxBytes, which could then cause the code to
// return errShortRead.
// Because we read at least c.fetchMinSize bytes we should be
// able to decode all the control values for the first message,
// and errShortRead should only happen when reading the message
// key or value.
// I'm not sure this is rock solid and there may be some weird
// edge cases...
// There's just so much we can do with questionable design.
if err == errShortRead {
size, err = discardN(r, size, size)
}
return expectZeroSize(size, err)
},
)

if err == nil {
if n > len(b) {
n, err = len(b), io.ErrShortBuffer
} else if n == 0 && time.Now().After(adjustedDeadline) {
if n == 0 && time.Now().After(adjustedDeadline) {
// Because we use the adjusted deadline we could end up returning
// before the actual deadline occurred. This is necessary otherwise
// timing out the connection for read could end up leaving it in an
Expand All @@ -389,6 +381,10 @@ func (c *Conn) ReadAt(b []byte, offset int64) (int, int64, error) {
// the connection open, the trade off being to lose precision on the
// read deadline management.
err = RequestTimedOut
} else if !ok {
n, err = 0, io.ErrShortBuffer
} else if n > len(b) {
n, err = len(b), io.ErrShortBuffer
}
}

Expand Down Expand Up @@ -430,7 +426,7 @@ func (c *Conn) readOffset(t int64) (offset int64, err error) {
})
},
func(deadline time.Time, size int) error {
return expectZeroSize(streamArray(&c.rbuf, size, func(r *bufio.Reader, size int) (int, error) {
return expectZeroSize(readArrayWith(&c.rbuf, size, func(r *bufio.Reader, size int) (int, error) {
// We skip the topic name because we've made a request for
// a single topic.
size, err := discardString(r, size)
Expand All @@ -440,7 +436,7 @@ func (c *Conn) readOffset(t int64) (offset int64, err error) {

// Reading the array of partitions, there will be only one
// partition which gives the offset we're looking for.
return streamArray(r, size, func(r *bufio.Reader, size int) (int, error) {
return readArrayWith(r, size, func(r *bufio.Reader, size int) (int, error) {
var p partitionOffsetV1
size, err := read(r, size, &p)
if err != nil {
Expand Down Expand Up @@ -578,7 +574,7 @@ func (c *Conn) WriteMessages(msgs ...Message) (int, error) {
})
},
func(deadline time.Time, size int) error {
return expectZeroSize(streamArray(&c.rbuf, size, func(r *bufio.Reader, size int) (int, error) {
return expectZeroSize(readArrayWith(&c.rbuf, size, func(r *bufio.Reader, size int) (int, error) {
// Skip the topic, we've produced the message to only one topic,
// no need to waste resources loading it in memory.
size, err := discardString(r, size)
Expand All @@ -588,7 +584,7 @@ func (c *Conn) WriteMessages(msgs ...Message) (int, error) {

// Read the list of partitions, there should be only one since
// we've produced a message to a single partition.
size, err = streamArray(r, size, func(r *bufio.Reader, size int) (int, error) {
size, err = readArrayWith(r, size, func(r *bufio.Reader, size int) (int, error) {
var p produceResponsePartitionV2
size, err := read(r, size, &p)
if err == nil && p.ErrorCode != 0 {
Expand Down
93 changes: 93 additions & 0 deletions discard.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package kafka

import (
"bufio"
"fmt"
"reflect"
)

func discardN(r *bufio.Reader, sz int, n int) (int, error) {
n, err := r.Discard(n)
return sz - n, err
}

func discardInt8(r *bufio.Reader, sz int) (int, error) {
return discardN(r, sz, 1)
}

func discardInt16(r *bufio.Reader, sz int) (int, error) {
return discardN(r, sz, 2)
}

func discardInt32(r *bufio.Reader, sz int) (int, error) {
return discardN(r, sz, 4)
}

func discardInt64(r *bufio.Reader, sz int) (int, error) {
return discardN(r, sz, 8)
}

func discardString(r *bufio.Reader, sz int) (int, error) {
return readStringWith(r, sz, func(r *bufio.Reader, sz int, len int16) (int, error) {
return discardN(r, sz, int(len))
})
}

func discardBytes(r *bufio.Reader, sz int) (int, error) {
return readBytesWith(r, sz, func(r *bufio.Reader, sz int, len int32) (int, error) {
return discardN(r, sz, int(len))
})
}

func discard(r *bufio.Reader, sz int, a interface{}) (int, error) {
switch a.(type) {
case int8:
return discardInt8(r, sz)
case int16:
return discardInt16(r, sz)
case int32:
return discardInt32(r, sz)
case int64:
return discardInt64(r, sz)
case string:
return discardString(r, sz)
case []byte:
return discardBytes(r, sz)
}
switch v := reflect.ValueOf(a); v.Kind() {
case reflect.Struct:
return discardStruct(r, sz, v)
case reflect.Slice:
return discardSlice(r, sz, v)
default:
panic(fmt.Sprintf("unsupported type: %T", a))
}
}

func discardStruct(r *bufio.Reader, sz int, v reflect.Value) (int, error) {
var err error
for i, n := 0, v.NumField(); i != n; i++ {
if sz, err = discard(r, sz, v.Field(i)); err != nil {
break
}
}
return sz, err
}

func discardSlice(r *bufio.Reader, sz int, v reflect.Value) (int, error) {
var zero = reflect.Zero(v.Type().Elem())
var err error
var len int32

if sz, err = readInt32(r, sz, &len); err != nil {
return sz, err
}

for n := int(len); n > 0; n-- {
if sz, err = discard(r, sz, zero); err != nil {
break
}
}

return sz, err
}
Loading

0 comments on commit 3258cb9

Please sign in to comment.