Skip to content

Commit

Permalink
add writer tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Achille Roussel committed Jun 21, 2017
1 parent b2e5925 commit 0ea8d5d
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 2 deletions.
2 changes: 2 additions & 0 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,8 @@ func (c *Conn) ReadPartitions(topics ...string) (partitions []Partition, err err
return err
}

fmt.Printf("%#v\n", res)

brokers := make(map[int32]Broker, len(res.Brokers))
for _, b := range res.Brokers {
brokers[b.NodeID] = Broker{
Expand Down
4 changes: 3 additions & 1 deletion dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ func (d *Dialer) LookupLeader(ctx context.Context, network string, address strin
}
}
}

errch <- UnknownTopicOrPartition
}()

var brk Broker
Expand Down Expand Up @@ -220,7 +222,7 @@ type Resolver interface {
}

func sleep(ctx context.Context, duration time.Duration) bool {
if duration != 0 {
if duration == 0 {
select {
default:
return true
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ kafka:
- "9092:9092"
environment:
KAFKA_VERSION: "0.10.1.0"
KAFKA_CREATE_TOPICS: "events:2:1,test:1:1"
KAFKA_CREATE_TOPICS: "test-writer-0:3:1,test-writer-1:3:1"
KAFKA_ADVERTISED_HOST_NAME: "localhost"
KAFKA_ADVERTISED_PORT: "9092"
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
Expand Down
132 changes: 132 additions & 0 deletions writer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package kafka

import (
"context"
"io"
"testing"
"time"
)

func TestWriter(t *testing.T) {
tests := []struct {
scenario string
function func(*testing.T)
}{
{
scenario: "closing a writer right after creating it returns promptly with no error",
function: testWriterClose,
},

{
scenario: "writing 1 message through a writer using round-robin balancing produces 1 message to the first partition",
function: testWriterRoundRobin1,
},
}

t.Parallel()
// Kafka takes a while to create the initial topics and partitions...
time.Sleep(15 * time.Second)

for _, test := range tests {
testFunc := test.function
t.Run(test.scenario, func(t *testing.T) {
t.Parallel()
testFunc(t)
})
}
}

func newTestWriter(config WriterConfig) *Writer {
config.Brokers = []string{"localhost:9092"}
return NewWriter(config)
}

func testWriterClose(t *testing.T) {
w := newTestWriter(WriterConfig{
Topic: "test-writer-0",
})

if err := w.Close(); err != nil {
t.Error(err)
}
}

func testWriterRoundRobin1(t *testing.T) {
const topic = "test-writer-1"

offset, err := readOffset(topic, 0)
if err != nil {
t.Fatal(err)
}

w := newTestWriter(WriterConfig{
Topic: topic,
Balancer: &RoundRobin{},
})
defer w.Close()

if err := w.WriteMessages(context.Background(), Message{
Value: []byte("Hello World!"),
}); err != nil {
t.Error(err)
return
}

msgs, err := readPartition(topic, 0, offset)

if err != nil {
t.Error("error reading partition", err)
return
}

if len(msgs) != 1 {
t.Error("bad messages in partition", msgs)
return
}

for _, m := range msgs {
if string(m.Value) != "Hello World!" {
t.Error("bad messages in partition", msgs)
break
}
}
}

func readOffset(topic string, partition int) (offset int64, err error) {
var conn *Conn

if conn, err = DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition); err != nil {
return
}
defer conn.Close()

offset, err = conn.ReadLastOffset()
return
}

func readPartition(topic string, partition int, offset int64) (msgs []Message, err error) {
var conn *Conn

if conn, err = DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition); err != nil {
return
}
defer conn.Close()

conn.Seek(offset, 1)
conn.SetReadDeadline(time.Now().Add(100 * time.Millisecond))
batch := conn.ReadBatch(0, 1000000000)
defer batch.Close()

for {
var msg Message

if msg, err = batch.ReadMessage(); err != nil {
if err == io.EOF {
err = nil
}
return
}

msgs = append(msgs, msg)
}
}

0 comments on commit 0ea8d5d

Please sign in to comment.