Skip to content

Commit

Permalink
add comment explaining the policy around errShortRead when fetching m…
Browse files Browse the repository at this point in the history
…essages
  • Loading branch information
Achille Roussel committed May 31, 2017
1 parent f86ba28 commit 9af7077
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 6 deletions.
21 changes: 16 additions & 5 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,18 @@ func (c *Conn) ReadAt(b []byte, offset int64) (int, int64, error) {
if err != nil {
return size, err
}
return ignoreShortRead(streamArray(r, size, func(r *bufio.Reader, size int) (int, error) {

// As an "optimization" kafka truncates the returned response
// after producing MaxBytes, which could then cause the code to
// return errShortRead.
// Because we read at least c.fetchMinSize bytes we should be
// able to decode all the control values for the first message,
// and errShortRead should only happen when reading the message
// key or value.
// I'm not sure this is rock solid and there may be some weird
// edge cases...
// There's just so much we can do with questionable design.
return streamArray(r, size, func(r *bufio.Reader, size int) (int, error) {
// Partition header, followed by the message set.
var p struct {
Partition int32
Expand All @@ -333,7 +344,7 @@ func (c *Conn) ReadAt(b []byte, offset int64) (int, int64, error) {

size, err := read(r, size, &p)
if err != nil {
return size, err
return ignoreShortRead(size, err)
}
if p.ErrorCode != 0 {
return size, Error(p.ErrorCode)
Expand Down Expand Up @@ -362,7 +373,7 @@ func (c *Conn) ReadAt(b []byte, offset int64) (int, int64, error) {
if _, _, size, err = readMessageHeader(r, size); err != nil {
continue
}
if _, size, err = readMessageBytes(r, size, b); err != nil {
if n, size, err = readMessageBytes(r, size, b); err != nil {
continue
}
if n, size, err = readMessageBytes(r, size, b); err != nil {
Expand All @@ -371,8 +382,8 @@ func (c *Conn) ReadAt(b []byte, offset int64) (int, int64, error) {
offset = msgOffset
}

return size, err
}))
return ignoreShortRead(size, err)
})
}))
},
)
Expand Down
1 change: 0 additions & 1 deletion protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,6 @@ func readMessageBytes(r *bufio.Reader, sz int, b []byte) (n int, remain int, err
var count int

if remain, err = readInt32(r, sz, &bytesLen); err != nil {
fmt.Println(err)
return
}

Expand Down

0 comments on commit 9af7077

Please sign in to comment.