forked from segmentio/kafka-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathreader_test.go
134 lines (115 loc) · 2.87 KB
/
reader_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package kafka
import (
"context"
"math/rand"
"strconv"
"testing"
"time"
)
func TestReader(t *testing.T) {
t.Parallel()
tests := []struct {
scenario string
function func(*testing.T, context.Context, *Reader)
}{
{
scenario: "calling Read with a context that has been canceled should return an error",
function: testReaderReadCanceled,
},
{
scenario: "all messages of the stream should be made available when calling ReadMessage repeatedly",
function: testReaderReadMessages,
},
{
scenario: "setting the offset to an invalid value should return an error on the next Read call",
function: testReaderSetInvalidOffset,
},
{
scenario: "setting the offset to random values should return the expected messages when Read is called",
function: testReaderSetRandomOffset,
},
}
for _, test := range tests {
testFunc := test.function
t.Run(test.scenario, func(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
r := NewReader(ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: makeTopic(),
MaxWait: 500 * time.Millisecond,
})
defer r.Close()
testFunc(t, ctx, r)
})
}
}
func testReaderReadCanceled(t *testing.T, ctx context.Context, r *Reader) {
ctx, cancel := context.WithCancel(ctx)
cancel()
if _, err := r.ReadMessage(ctx); err != context.Canceled {
t.Error(err)
}
}
func testReaderReadMessages(t *testing.T, ctx context.Context, r *Reader) {
const N = 1000
prepareReader(t, ctx, r, makeTestSequence(N)...)
for i := 0; i != N; i++ {
m, err := r.ReadMessage(ctx)
if err != nil {
t.Error(err)
return
}
v, _ := strconv.Atoi(string(m.Value))
if v != i {
t.Error("message at index", i, "has wrong value:", v)
return
}
}
}
func testReaderSetInvalidOffset(t *testing.T, ctx context.Context, r *Reader) {
r.SetOffset(42)
_, err := r.ReadMessage(ctx)
if err == nil {
t.Error(err)
}
}
func testReaderSetRandomOffset(t *testing.T, ctx context.Context, r *Reader) {
const N = 10
prepareReader(t, ctx, r, makeTestSequence(N)...)
for i := 0; i != 2*N; i++ {
offset := rand.Intn(N)
r.SetOffset(int64(offset))
m, err := r.ReadMessage(ctx)
if err != nil {
t.Error(err)
return
}
v, _ := strconv.Atoi(string(m.Value))
if v != offset {
t.Error("message at offset", offset, "has wrong value:", v)
return
}
}
}
func makeTestSequence(n int) []Message {
msgs := make([]Message, n)
for i := 0; i != n; i++ {
msgs[i] = Message{
Value: []byte(strconv.Itoa(i)),
}
}
return msgs
}
func prepareReader(t *testing.T, ctx context.Context, r *Reader, msgs ...Message) {
config := r.Config()
conn, err := DialLeader(ctx, "tcp", "localhost:9092", config.Topic, config.Partition)
if err != nil {
t.Fatal(err)
}
defer conn.Close()
if _, err := conn.WriteMessages(msgs...); err != nil {
t.Fatal(err)
}
}