Skip to content

Commit

Permalink
add support for all kafka error codes
Browse files Browse the repository at this point in the history
  • Loading branch information
Achille Roussel committed May 31, 2017
1 parent 1ae0b59 commit 9fd3208
Show file tree
Hide file tree
Showing 6 changed files with 402 additions and 151 deletions.
56 changes: 28 additions & 28 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,6 @@ func (c *Conn) Seek(offset int64, whence int) (int64, error) {
}
switch whence {
case 0, 1, 2:
return 0, nil
default:
return 0, fmt.Errorf("the whence value has to be 0, 1, or 2 (whence = %d)", whence)
}
Expand All @@ -216,6 +215,7 @@ func (c *Conn) Seek(offset int64, whence int) (int64, error) {
}

first, last, err := c.ReadOffsets()
fmt.Println("first =", first, "last =", last)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -244,16 +244,22 @@ func (c *Conn) Seek(offset int64, whence int) (int64, error) {
//
// Read satisfies the io.Reader interface.
func (c *Conn) Read(b []byte) (int, error) {
batch, err := c.ReadBatch(1)
if err != nil {
return 0, err
}
n, err := batch.Read(b)
offset, err := c.Seek(c.Offset())
if err != nil {
batch.Close()
return 0, err
}
return n, batch.Close()
_ = offset
var n int
err = c.readOperation(
func(id int32) error {
return nil
},
func(size int) error {
return nil
},
)

return n, err
}

// ReadAt reads the message at the given absolute offset, returning the number
Expand Down Expand Up @@ -296,7 +302,7 @@ func (c *Conn) ReadBatchAt(size int, offset int64) (*BatchReader, error) {

c.wlock.Lock()
c.conn.SetWriteDeadline(deadline)
err := c.writeRequest(fetchRequest, v1, id, nil) // TODO
err := c.writeRequest(fetchRequestKey, v1, id, nil) // TODO
c.wlock.Unlock()

if err != nil {
Expand All @@ -317,39 +323,33 @@ func (c *Conn) ReadOffsets() (first int64, last int64, err error) {

err = c.writeOperation(
func(id int32) error {
return c.writeRequest(offsetRequest, v1, id, listOffsetRequest{
Topics: []listOffsetRequestTopic{{
return c.writeRequest(offsetRequestKey, v1, id, listOffsetRequestV1{
ReplicaID: -1,
Topics: []listOffsetRequestTopicV1{{
TopicName: c.topic,
Partitions: []listOffsetRequestPartition{
Partitions: []listOffsetRequestPartitionV1{
{Partition: c.partition, Time: -2},
{Partition: c.partition, Time: -1},
},
}},
})
},
func(size int) error {
var res []listOffsetResponse
var res []listOffsetResponseV1
if err := c.readResponse(size, &res); err != nil {
return err
}
fmt.Printf("%#v\n", res)
for _, r := range res {
if r.TopicName != c.topic {
continue
}
for _, p := range r.PartitionOffsets {
if p.Partition != c.partition {
continue
}
if p.ErrorCode != 0 {
return Error(p.ErrorCode)
}
for _, offset := range p.Offsets {
if first < 0 || offset < first {
first = offset
}
if last < 0 || offset > last {
last = offset
}
if first < 0 || p.Offset < first {
first = p.Offset
}
if last < 0 || p.Offset > last {
last = p.Offset
}
}
}
Expand Down Expand Up @@ -379,7 +379,7 @@ func (c *Conn) ReadPartitions(topics ...string) (partitions []Partition, err err

err = c.readOperation(
func(id int32) error {
return c.writeRequest(metadataRequest, v0, id, topicMetadataRequest(topics))
return c.writeRequest(metadataRequestKey, v0, id, topicMetadataRequest(topics))
},
func(size int) error {
var res metadataResponse
Expand Down Expand Up @@ -448,7 +448,7 @@ func (c *Conn) Write(b []byte) (int, error) {

err := c.writeOperation(
func(id int32) error {
return c.writeRequest(produceRequest, v2, id, produceRequestType{
return c.writeRequest(produceRequestKey, v2, id, produceRequest{
RequiredAcks: -1,
Timeout: 10000,
Topics: []produceRequestTopic{{
Expand Down
88 changes: 83 additions & 5 deletions conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package kafka
import (
"context"
"fmt"
"math/rand"
"net"
"sync/atomic"
"strconv"
"testing"
"time"
)
Expand All @@ -20,12 +21,30 @@ func TestConn(t *testing.T) {
},

{
scenario: "write messages to kafka",
scenario: "ensure the initial offset of a connection is the first offset",
function: testConnFirstOffset,
},

{
scenario: "write a single message to kafka",
function: testConnWrite,
},
}

id := int32(0)
{
scenario: "ensure the connection can seek to the first offset",
function: testConnSeekFirstOffset,
},

{
scenario: "ensure the connection can seek to the last offset",
function: testConnSeekLastOffset,
},

{
scenario: "ensure the connection can seek to a random offset",
function: testConnSeekRandomOffset,
},
}

for _, test := range tests {
testFunc := test.function
Expand All @@ -35,7 +54,7 @@ func TestConn(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

topic := fmt.Sprintf("kafka-go-%02d", atomic.AddInt32(&id, 1))
topic := fmt.Sprintf("kafka-go-%d", rand.Int63())

conn, err := (&Dialer{
Resolver: &net.Resolver{},
Expand All @@ -55,6 +74,14 @@ func testConnClose(t *testing.T, conn *Conn) {
}
}

func testConnFirstOffset(t *testing.T, conn *Conn) {
offset, whence := conn.Offset()

if offset != 0 && whence != 0 {
t.Error("bad first offset:", offset, whence)
}
}

func testConnWrite(t *testing.T, conn *Conn) {
b := []byte("Hello World!")
n, err := conn.Write(b)
Expand All @@ -67,3 +94,54 @@ func testConnWrite(t *testing.T, conn *Conn) {
t.Error("bad length returned by (*Conn).Write:", n)
}
}

func testConnSeekFirstOffset(t *testing.T, conn *Conn) {
for i := 0; i != 10; i++ {
if _, err := conn.Write([]byte(strconv.Itoa(i))); err != nil {
t.Fatal(err)
}
}

offset, err := conn.Seek(0, 0)
if err != nil {
t.Error(err)
}

if offset != 0 {
t.Error("bad offset:", offset)
}
}

func testConnSeekLastOffset(t *testing.T, conn *Conn) {
for i := 0; i != 10; i++ {
if _, err := conn.Write([]byte(strconv.Itoa(i))); err != nil {
t.Fatal(err)
}
}

offset, err := conn.Seek(0, 2)
if err != nil {
t.Error(err)
}

if offset != 10 {
t.Error("bad offset:", offset)
}
}

func testConnSeekRandomOffset(t *testing.T, conn *Conn) {
for i := 0; i != 10; i++ {
if _, err := conn.Write([]byte(strconv.Itoa(i))); err != nil {
t.Fatal(err)
}
}

offset, err := conn.Seek(3, 1)
if err != nil {
t.Error(err)
}

if offset != 3 {
t.Error("bad offset:", offset)
}
}
33 changes: 15 additions & 18 deletions dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,30 +133,27 @@ func (d *Dialer) LookupLeader(ctx context.Context, network string, address strin
errch := make(chan error, 1)

go func() {
var partitions []Partition
var err error

for attempt := 1; true; attempt++ {
partitions, err = c.ReadPartitions(topic)
if err == nil {
break
}
if isTemporary(err) {
for attempt := 0; true; attempt++ {
if attempt != 0 {
sleep(ctx, backoff(attempt, time.Second, time.Minute))
continue
}
errch <- err
return
}

for _, p := range partitions {
if p.ID == partition {
brkch <- p.Leader
partitions, err := c.ReadPartitions(topic)
if err != nil {
if isTemporary(err) {
continue
}
errch <- err
return
}
}

errch <- UnknownTopicOrPartition
for _, p := range partitions {
if p.ID == partition {
brkch <- p.Leader
return
}
}
}
}()

var brk Broker
Expand Down
Loading

0 comments on commit 9fd3208

Please sign in to comment.