forked from segmentio/kafka-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkafka.go
387 lines (323 loc) · 10 KB
/
kafka.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
package kafka
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/Shopify/sarama"
"github.com/pkg/errors"
)
const (
OffsetNewest int64 = -1
OffsetOldest int64 = -2
)
type ReaderConfig struct {
Brokers []string
Topic string
Partition int
// Kafka requests wait for `RequestMaxWaitTime` OR `RequestMinBytes`, but
// always stops at `RequestMaxBytes`.
RequestMaxWaitTime time.Duration
RequestMinBytes int
RequestMaxBytes int
}
type kafkaReader struct {
client sarama.Client
partition int32
topic string
// Channels used by the async task
events chan Message
errors chan error
// Wait until the async task has finished. This is used to ensure that the task
// won't be sending on the channels and that we can close them.
asyncWait sync.WaitGroup
// Determine if an async task has been spawned
spawned bool
// Cancel the async task
cancel context.CancelFunc
// The current offset that is updated by `Read`
offset int64
highmark int64
// kafka fetch configuration
maxWaitTime time.Duration
minBytes int
maxBytes int
}
// Create a new Kafka reader given a topic and partition.
func NewReader(config ReaderConfig) (Reader, error) {
conf := sarama.NewConfig()
conf.Version = sarama.V0_10_1_0
client, err := sarama.NewClient(config.Brokers, conf)
if err != nil {
return nil, err
}
if config.Partition < 0 {
return nil, errors.New("partitions cannot be negative.")
}
if config.RequestMinBytes < 0 {
return nil, errors.New("minimum bytes for requests must be greater than 0")
}
if config.RequestMaxBytes < 0 {
return nil, errors.New("maximum bytes for requests must be greater than 0")
}
// MaxBytes of 0 would make the task spin continuously without
// actually returning data.
if config.RequestMaxBytes == 0 {
config.RequestMaxBytes = 1000000
}
return &kafkaReader{
topic: config.Topic,
partition: int32(config.Partition),
client: client,
offset: 0,
// async task fields
events: make(chan Message),
errors: make(chan error),
// Provide a noop context canceling function.
cancel: func() {},
spawned: false,
// Request-level parameters
maxWaitTime: config.RequestMaxWaitTime,
minBytes: config.RequestMinBytes,
maxBytes: config.RequestMaxBytes,
}, nil
}
func (kafka *kafkaReader) Lag() int64 {
return atomic.LoadInt64(&kafka.highmark) - kafka.offset
}
// Read the next message from the underlying asynchronous task fetching from Kafka.
// `Read` will block until a message is ready to be read. The asynchronous task
// will also block until `Read` is called.
func (kafka *kafkaReader) Read(ctx context.Context) (Message, error) {
select {
case <-ctx.Done():
return Message{}, ctx.Err()
case msg := <-kafka.events:
// Update the local offset to the current message.
kafka.offset = msg.Offset
return msg, nil
case err := <-kafka.errors:
return Message{}, err
}
}
// Cancel the asynchronous task, flush the events/errors channel to avoid blocking
// the goroutine and wait for the async task to close.
func (kafka *kafkaReader) closeAsync() {
kafka.cancel()
// Reference the channels locally to avoid having to synchronize within
// the goroutine.
events := kafka.events
errors := kafka.errors
// Avoid blocking the async goroutine by emptying the channels.
go func() {
for _ = range events {
}
for _ = range errors {
}
}()
// Wait for the async task to finish canceling. The channels cannot be closed
// until the task has returned otherwise it may panic.
kafka.asyncWait.Wait()
close(events)
close(errors)
// Re-establish the channels for future uses.
kafka.events = make(chan Message)
kafka.errors = make(chan error)
}
// Given an offset spawn an asynchronous task that will read messages from Kafka. The offset
// can be a real Kafka offset or the special -1/-2.
//
// -1 = Newest Offset
// -2 = Oldest Offset
//
// If you want to start at the beginning of the partition, use `-2`.
// If you want to start at the end of the partition, use `-1`.
//
// Calling `Seek` again will cancel the previous task and create a new one.
func (kafka *kafkaReader) Seek(ctx context.Context, offset int64) (int64, error) {
select {
case <-ctx.Done():
return 0, ctx.Err()
default:
}
// Closing the async task before another Seek operation will avoid the channels to have partial data
// from a previous Seek operation.
if kafka.spawned {
kafka.closeAsync()
}
offset, err := kafka.getOffset(ctx, offset)
if err != nil {
return 0, err
}
asyncCtx, cancel := context.WithCancel(ctx)
kafka.cancel = cancel
kafka.asyncWait.Add(1)
go kafka.fetchMessagesAsync(asyncCtx, kafka.events, kafka.errors, offset)
kafka.spawned = true
return offset, nil
}
// Get the real offset from Kafka. If a non-negative offset is provided this method won't
// do anything and will return the same offset. If the offset provided is -1/-2 then it will
// make a call to Kafka to fetch the real offset.
func (kafka *kafkaReader) getOffset(ctx context.Context, offset int64) (int64, error) {
// An offset of -1/-2 means the partition has no offset state associated with it yet.
// -1 = Newest Offset
// -2 = Oldest Offset
if offset != -1 && offset != -2 {
return offset, nil
}
errs := make(chan error)
val := make(chan int64)
broker, err := kafka.getLeader(ctx)
if err != nil {
return 0, err
}
go func() {
request := &sarama.OffsetRequest{Version: 1}
request.AddBlock(kafka.topic, kafka.partition, int64(offset), 1)
offsetResponse, err := broker.GetAvailableOffsets(request)
if err != nil {
errs <- errors.Wrap(err, "failed to fetch available offsets")
return
}
block := offsetResponse.GetBlock(kafka.topic, kafka.partition)
if block == nil {
errs <- errors.Wrap(err, "fetching available offsets returned 0 blocks")
return
}
if block.Err != sarama.ErrNoError {
errs <- errors.Wrap(err, "fetching available offsets failed")
return
}
if block.Offset != 0 {
val <- block.Offset - 1
} else {
val <- block.Offset
}
}()
select {
case <-ctx.Done():
return 0, ctx.Err()
case err := <-errs:
return 0, err
case res := <-val:
return res, nil
}
}
// Return the current offset. This will return `0` if `Seek` hasn't been called.
func (kafka *kafkaReader) Offset() int64 {
return kafka.offset
}
func (kafka *kafkaReader) getMessages(ctx context.Context, offset int64) (*sarama.FetchResponse, error) {
errs := make(chan error)
val := make(chan *sarama.FetchResponse)
broker, err := kafka.getLeader(ctx)
if err != nil {
return nil, err
}
go func() {
// The request will wait at most `maxWaitMs` (milliseconds) OR at most `minBytes`,
// which ever happens first.
request := sarama.FetchRequest{
MaxWaitTime: int32(kafka.maxWaitTime.Seconds() / 1000),
MinBytes: int32(kafka.minBytes),
}
request.AddBlock(kafka.topic, kafka.partition, offset, int32(kafka.maxBytes))
res, err := broker.Fetch(&request)
if err != nil {
errs <- errors.Wrap(err, "kafka reader failed to fetch a block")
} else {
val <- res
}
}()
select {
case <-ctx.Done():
return nil, ctx.Err()
case err := <-errs:
return nil, err
case res := <-val:
return res, nil
}
}
// Find the broker that is the given partition's leader. Failure to fetch the leader is either
// the result of an invalid topic/partition OR the broker/leader is unavailable. This can happen
// due to a leader election happening (and thus the leader has changed).
func (kafka *kafkaReader) getLeader(ctx context.Context) (*sarama.Broker, error) {
errs := make(chan error)
val := make(chan *sarama.Broker)
go func() {
broker, err := kafka.client.Leader(kafka.topic, kafka.partition)
if err != nil {
errs <- errors.Wrap(err, "failed to find leader for topic/partition")
} else {
val <- broker
}
}()
select {
case <-ctx.Done():
return nil, ctx.Err()
case err := <-errs:
return nil, err
case broker := <-val:
return broker, nil
}
}
// Internal asynchronous task to fetch messages from Kafka and send to an internal
// channel.
func (kafka *kafkaReader) fetchMessagesAsync(ctx context.Context, eventsCh chan<- Message, errorsCh chan<- error, offset int64) {
defer kafka.asyncWait.Done()
for {
res, err := kafka.getMessages(ctx, offset)
if err != nil && err == context.Canceled {
return
} else if err != nil {
errorsCh <- err
continue
}
partition := res.GetBlock(kafka.topic, kafka.partition)
if partition == nil {
errorsCh <- fmt.Errorf("kafka topic/partition is invalid (topic: %s, partition: %d)", kafka.topic, kafka.partition)
continue
}
// Possible errors: https://godoc.org/github.com/Shopify/sarama#KError
if partition.Err != sarama.ErrNoError {
errorsCh <- errors.Wrap(partition.Err, "kafka block returned an error")
continue
}
// Update the high watermark offset to determine the lag of the reader.
atomic.StoreInt64(&kafka.highmark, partition.HighWaterMarkOffset)
// Bump the current offset to the last offset in the message set. The new offset will
// be used the next time we fetch a block from Kafka.
//
// This doesn't commit the offset in any way, it only allows the iterator to continue to
// make progress.
msgSet := partition.MsgSet.Messages
if len(msgSet) == 0 {
continue
}
// Kafka returns messages from a disk and may end up returning partial messages. The consumers need to prune them
// and re-request them, if needed.
if partition.MsgSet.PartialTrailingMessage {
// Remove the trailing message. The next offset will fetch it in-full.
msgSet = msgSet[:len(msgSet)-1]
}
for _, msg := range msgSet {
// Give the message to the iterator. This will block if the consumer of the iterator
// is blocking or not calling `.Next(..)`. This allows the Kafka reader to stay in-sync
// with the consumer.
eventsCh <- Message{
Offset: msg.Offset,
Key: msg.Msg.Key,
Value: msg.Msg.Value,
}
// Update the offset for the next batch of messages
offset = msg.Offset + 1
}
}
}
// Shutdown the async task and the Kafka client.
func (kafka *kafkaReader) Close() (err error) {
kafka.closeAsync()
return kafka.client.Close()
}