Skip to content

Commit

Permalink
add more tests + update consul-go
Browse files Browse the repository at this point in the history
  • Loading branch information
thehydroimpulse committed May 23, 2017
1 parent bb13d57 commit 6723f87
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 16 deletions.
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ kafka:
KAFKA_ADVERTISED_HOST_NAME: "localhost"
KAFKA_ADVERTISED_PORT: "9092"
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"

zookeeper:
image: wurstmeister/zookeeper
Expand Down
92 changes: 79 additions & 13 deletions kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,22 @@ package kafka

import (
"context"
"fmt"
"testing"
"time"

"github.com/Shopify/sarama"
"github.com/google/uuid"
)

func produce(t *testing.T, msg []byte) int64 {
type Producer func(t *testing.T, msg []byte) int64

func produce(t *testing.T, topic string, msg []byte) int64 {
producer := newProducer(t)
defer producer.Close()

message := sarama.ProducerMessage{
Topic: "test",
Topic: topic,
Value: sarama.ByteEncoder(msg),
}

Expand All @@ -25,6 +29,12 @@ func produce(t *testing.T, msg []byte) int64 {
return offset
}

func makeProducer(t *testing.T, topic string) func(t *testing.T, msg []byte) int64 {
return func(t *testing.T, msg []byte) int64 {
return produce(t, topic, msg)
}
}

func newProducer(t *testing.T) sarama.SyncProducer {
conf := sarama.NewConfig()
conf.Version = sarama.V0_10_0_0
Expand All @@ -46,7 +56,7 @@ func newProducer(t *testing.T) sarama.SyncProducer {
func TestReader(t *testing.T) {
tests := []struct {
scenario string
test func(t *testing.T, ctx context.Context, reader Reader)
test func(t *testing.T, ctx context.Context, reader Reader, producer Producer)
}{
{
"should close without errors",
Expand Down Expand Up @@ -77,13 +87,20 @@ func TestReader(t *testing.T) {
"seek and read newest offset",
seekReadNewestOffset,
},

{
"sequentially consume a reader",
testReaderConsume,
},
}

for _, test := range tests {
t.Run(test.scenario, func(t *testing.T) {
topic := uuid.New().String()

config := ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "test",
Topic: topic,
Partition: 0,
RequestMaxWaitTime: 100 * time.Millisecond,
RequestMinBytes: 100,
Expand All @@ -97,19 +114,21 @@ func TestReader(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

test.test(t, ctx, reader)
producer := makeProducer(t, topic)

test.test(t, ctx, reader, producer)
})
}
}

func closeNoErrors(t *testing.T, ctx context.Context, reader Reader) {
func closeNoErrors(t *testing.T, ctx context.Context, reader Reader, producer Producer) {
err := reader.Close()
if err != nil {
t.Fatalf("unexpected error while closing the reader: %s", err.Error())
}
}

func readCancelContext(t *testing.T, ctx context.Context, reader Reader) {
func readCancelContext(t *testing.T, ctx context.Context, reader Reader, producer Producer) {
ctx, cancel := context.WithCancel(ctx)
cancel()

Expand All @@ -119,8 +138,8 @@ func readCancelContext(t *testing.T, ctx context.Context, reader Reader) {
}
}

func seekNewestOffset(t *testing.T, ctx context.Context, reader Reader) {
offset := produce(t, []byte("foobar xxx"))
func seekNewestOffset(t *testing.T, ctx context.Context, reader Reader, producer Producer) {
offset := producer(t, []byte("foobar xxx"))

newOffset, err := reader.Seek(ctx, -1)
if err != nil {
Expand All @@ -134,7 +153,7 @@ func seekNewestOffset(t *testing.T, ctx context.Context, reader Reader) {
reader.Close()
}

func seekOldestOffset(t *testing.T, ctx context.Context, reader Reader) {
func seekOldestOffset(t *testing.T, ctx context.Context, reader Reader, producer Producer) {
offset, err := reader.Seek(ctx, -2)
if err != nil {
t.Fatalf("seek returned an error: %s", err.Error())
Expand All @@ -147,7 +166,9 @@ func seekOldestOffset(t *testing.T, ctx context.Context, reader Reader) {
reader.Close()
}

func seekReadOldestOffset(t *testing.T, ctx context.Context, reader Reader) {
func seekReadOldestOffset(t *testing.T, ctx context.Context, reader Reader, producer Producer) {
offset := producer(t, []byte("foobar xxx"))

_, err := reader.Seek(ctx, -2)
if err != nil {
t.Fatalf("seek returned an error: %s", err.Error())
Expand All @@ -162,11 +183,15 @@ func seekReadOldestOffset(t *testing.T, ctx context.Context, reader Reader) {
t.Fatalf("unexpected message value")
}

if offset != msg.Offset {
t.Fatalf("offsets do not match")
}

reader.Close()
}

func seekReadNewestOffset(t *testing.T, ctx context.Context, reader Reader) {
lastOffset := produce(t, []byte("hello 123"))
func seekReadNewestOffset(t *testing.T, ctx context.Context, reader Reader, producer Producer) {
lastOffset := producer(t, []byte("hello 123"))

_, err := reader.Seek(ctx, -1)
if err != nil {
Expand Down Expand Up @@ -194,3 +219,44 @@ func seekReadNewestOffset(t *testing.T, ctx context.Context, reader Reader) {

reader.Close()
}

func testReaderConsume(t *testing.T, ctx context.Context, reader Reader, producer Producer) {
msgs := make([][]byte, 168)
for i := range msgs {
msgs[i] = []byte(fmt.Sprintf("consume job/%03d", i))
producer(t, msgs[i])
}

cur := reader.Offset()

for i, _ := range msgs {
if cur2, err := reader.Seek(ctx, cur); err != nil {
t.Error("seeking to the current position failed:", err)
return
} else if cur != cur2 {
t.Error("seeking to the current position should have produced the same cursor:")
t.Logf("expected: %q", cur)
t.Logf("found: %q", cur2)
return
}

msg, err := reader.Read(ctx)
if err != nil {
t.Errorf("reading the msg at %q from the stream failed: %s", cur, err)
return
}

if string(msgs[i]) != string(msg.Value) {
t.Errorf("the msg returned at %d doesn't match:", cur)
t.Logf("expected: %#v", string(msgs[i]))
t.Logf("found: %#v", string(msg.Value))
return
}

cur = reader.Offset() + 1
}

// if _, err := reader.Read(ctx); err != centrifuge.EOS {
// t.Error("reading from the stream after all jobs were consumed should have returned centrifuge.EOS, got:", err)
// }
}
6 changes: 3 additions & 3 deletions vendor/vendor.json
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,10 @@
"revisionTime": "2017-05-03T02:48:19Z"
},
{
"checksumSHA1": "S3rMIE6XMyghZFKRHCbmnsa2OtA=",
"checksumSHA1": "mIrn8/XHw4wY6/dcLc8sn1MxpxI=",
"path": "github.com/segmentio/consul-go",
"revision": "216e08fa5e3b81f2bdd7f47f447ec5f301ed1ced",
"revisionTime": "2017-05-22T03:35:38Z"
"revision": "4c6bbe1223d2fc0ebad0b28e39761aef2ff9d3cf",
"revisionTime": "2017-05-23T17:43:25Z"
},
{
"checksumSHA1": "PoXqLiPO6C5Z2gzS6nm1LIhYfMA=",
Expand Down

0 comments on commit 6723f87

Please sign in to comment.