You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Consider the scenario ([P] = publisher, [R] = reader)
[P] testtopic is created (empty)
[R] new Reader is created with StartOffset: LastOffset
=> new ConsumerGroup initialized; with map[{topic:testtopic partition:0}:-1]
=> new reader starts, which seeks to offset 0 and start polling
[P] publish message to testtopic [message with offset 0 became available]
[R] tries to reader.read message, got connection error; it retries and fails N times in a row
[R] reestablishes connection
[R] get offset from ConsumerGroup: -1, which is "LastOffset"
[R] starts reader which seeks to offset 1
[R] polls from offset 1
At this point message with offset [0] is lost.
Basically, ANY amount of published messages until the first successful CommitMessages is lost.
Kafka Version
kafka-go v0.4.47
To Reproduce
Issue captured by running large system under deterministic distributed simulator.
I am not sure how to construct hermetic test here as I am not familiar with your code.
Expected Behavior
Message with offset 0 is read and processed, at least once.
Observed Behavior
Few initial messages is lost:
...
00:00:53.23 info kafka-go@v0.4.47/consumergroup.go:952 joined group READERCONSUMERGROUP as member READER@READER (github.com/segmentio/kafka-go)-6dab7dfe-da33-4878-a70a-d1cf303436c9 in generation 3
00:00:53.23 info kafka-go@v0.4.47/consumergroup.go:1012 selected as leader for group, READERCONSUMERGROUP
00:00:53.27 info kafka-go@v0.4.47/consumergroup.go:1040 using 'range' balancer to assign group, READERCONSUMERGROUP
00:00:53.27 info kafka-go@v0.4.47/consumergroup.go:1042 found member: READER@READER (github.com/segmentio/kafka-go)-6dab7dfe-da33-4878-a70a-d1cf303436c9/[]byte(nil)
00:00:53.27 info kafka-go@v0.4.47/consumergroup.go:1045 found topic/partition: TOPICNAME/0
00:00:53.27 info kafka-go@v0.4.47/consumergroup.go:966 assigned member/topic/partitions READER@READER (github.com/segmentio/kafka-go)-6dab7dfe-da33-4878-a70a-d1cf303436c9/TOPICNAME/[0]
00:00:53.27 info kafka-go@v0.4.47/consumergroup.go:973 joinGroup succeeded for response, READERCONSUMERGROUP. generationID=3, memberID=READER@READER (github.com/segmentio/kafka-go)-6dab7dfe-da33-4878-a70a-d1cf303436c9
00:00:53.27 info kafka-go@v0.4.47/consumergroup.go:806 Joined group READERCONSUMERGROUP as member READER@READER (github.com/segmentio/kafka-go)-6dab7dfe-da33-4878-a70a-d1cf303436c9 in generation 3
00:00:53.27 info kafka-go@v0.4.47/consumergroup.go:1139 Syncing 1 assignments for generation 3 as member READER@READER (github.com/segmentio/kafka-go)-6dab7dfe-da33-4878-a70a-d1cf303436c9
00:00:53.29 info kafka-go@v0.4.47/consumergroup.go:1104 sync group finished for group, READERCONSUMERGROUP
00:00:53.31 info kafka-go@v0.4.47/reader.go:141 subscribed to topics and partitions: map[{topic:TOPICNAME partition:0}:-1]
00:00:53.31 info kafka-go@v0.4.47/consumergroup.go:467 started heartbeat for group, READERCONSUMERGROUP [3s]
00:00:53.31 info kafka-go@v0.4.47/reader.go:1272 initializing kafka reader for partition 0 of TOPICNAME starting at offset last offset
00:00:53.31 info kafka-go@v0.4.47/reader.go:274 started commit for group READERCONSUMERGROUP
00:00:53.43 info kafka-go@v0.4.47/reader.go:1473 the kafka reader for partition 0 of TOPICNAME is seeking to offset 0
00:00:53.57 info kafka-go@v0.4.47/reader.go:1375 no messages received from kafka within the allocated time for partition 0 of TOPICNAME at offset 0: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
...
00:03:00.06 info kafka-go@v0.4.47/reader.go:1427 the kafka reader got an unknown error reading partition 0 of TOPICNAME at offset 0: read tcp 11.0.0.4:36119->11.0.0.5:9093: i/o timeout
00:03:00.16 info kafka-go@v0.4.47/reader.go:1272 initializing kafka reader for partition 0 of TOPICNAME starting at offset 0
00:03:00.28 info kafka-go@v0.4.47/reader.go:1473 the kafka reader for partition 0 of TOPICNAME is seeking to offset 0
00:03:00.42 info kafka-go@v0.4.47/reader.go:1375 no messages received from kafka within the allocated time for partition 0 of TOPICNAME at offset 0: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
00:03:00.52 info kafka-go@v0.4.47/reader.go:1427 the kafka reader got an unknown error reading partition 0 of TOPICNAME at offset 0: read tcp 11.0.0.4:47346->11.0.0.5:9093: i/o timeout
00:03:00.62 info kafka-go@v0.4.47/reader.go:1272 initializing kafka reader for partition 0 of TOPICNAME starting at offset 0
00:03:07.74 info kafka-go@v0.4.47/reader.go:1473 the kafka reader for partition 0 of TOPICNAME is seeking to offset 0
00:03:07.88 info kafka-go@v0.4.47/reader.go:1427 the kafka reader got an unknown error reading partition 0 of TOPICNAME at offset 0: read tcp 11.0.0.4:27880->11.0.0.5:9093: i/o timeout
00:03:07.98 info kafka-go@v0.4.47/reader.go:1272 initializing kafka reader for partition 0 of TOPICNAME starting at offset 0
00:03:09.1 info kafka-go@v0.4.47/reader.go:1473 the kafka reader for partition 0 of TOPICNAME is seeking to offset 0
00:03:09.24 info kafka-go@v0.4.47/reader.go:1427 the kafka reader got an unknown error reading partition 0 of TOPICNAME at offset 0: read tcp 11.0.0.4:31747->11.0.0.5:9093: i/o timeout
00:03:09.34 info kafka-go@v0.4.47/reader.go:1272 initializing kafka reader for partition 0 of TOPICNAME starting at offset 0
00:03:13.48 info kafka-go@v0.4.47/reader.go:1473 the kafka reader for partition 0 of TOPICNAME is seeking to offset 0
00:03:17.64 info kafka-go@v0.4.47/reader.go:1427 the kafka reader got an unknown error reading partition 0 of TOPICNAME at offset 0: read tcp 11.0.0.4:51137->11.0.0.5:9093: i/o timeout
00:03:17.74 info kafka-go@v0.4.47/reader.go:1272 initializing kafka reader for partition 0 of TOPICNAME starting at offset 0
00:03:24.86 info kafka-go@v0.4.47/reader.go:1473 the kafka reader for partition 0 of TOPICNAME is seeking to offset 0
00:03:28 info kafka-go@v0.4.47/reader.go:1427 the kafka reader got an unknown error reading partition 0 of TOPICNAME at offset 0: read tcp 11.0.0.4:49546->11.0.0.5:9093: i/o timeout
00:03:28.1 info kafka-go@v0.4.47/reader.go:1272 initializing kafka reader for partition 0 of TOPICNAME starting at offset 0
00:03:31.24 info kafka-go@v0.4.47/reader.go:1473 the kafka reader for partition 0 of TOPICNAME is seeking to offset 0
00:03:31.31 info kafka-go@v0.4.47/consumergroup.go:470 stopped heartbeat for group READERCONSUMERGROUP
00:03:31.31 info kafka-go@v0.4.47/reader.go:277 stopped commit for group READERCONSUMERGROUP
00:03:38.18 info kafka-go@v0.4.47/reader.go:1302 error initializing the kafka reader for partition 0 of TOPICNAME: read tcp 11.0.0.4:58618->11.0.0.5:9093: i/o timeout
00:03:40.3 info kafka-go@v0.4.47/consumergroup.go:952 joined group READERCONSUMERGROUP as member READER@READER (github.com/segmentio/kafka-go)-6dab7dfe-da33-4878-a70a-d1cf303436c9 in generation 4
...
00:03:46.42 info kafka-go@v0.4.47/reader.go:141 subscribed to topics and partitions: map[{topic:TOPICNAME partition:0}:-1]
^^^ still -1 in the ConsumerGroup
00:03:46.42 info kafka-go@v0.4.47/consumergroup.go:467 started heartbeat for group, READERCONSUMERGROUP [3s]
00:03:46.42 info kafka-go@v0.4.47/reader.go:1272 initializing kafka reader for partition 0 of TOPICNAME starting at offset last offset
00:03:46.42 info kafka-go@v0.4.47/reader.go:274 started commit for group READERCONSUMERGROUP
00:03:54.59 info kafka-go@v0.4.47/reader.go:1473 the kafka reader for partition 0 of TOPICNAME is seeking to offset 2
^^^ At this point messages at offset 0 and 1 is lost
00:03:57.53 info kafka-go@v0.4.47/reader.go:1302 error initializing the kafka reader for partition 0 of TOPICNAME: read tcp 11.0.0.4:48613->11.0.0.5:9093: i/o timeout
00:03:57.63 info kafka-go@v0.4.47/reader.go:1272 initializing kafka reader for partition 0 of TOPICNAME starting at offset last offset
00:03:58.75 info kafka-go@v0.4.47/reader.go:1473 the kafka reader for partition 0 of TOPICNAME is seeking to offset 3
^^^ At this point message at offset 2 is also lost
00:03:58.89 info kafka-go@v0.4.47/reader.go:1427 the kafka reader got an unknown error reading partition 0 of TOPICNAME at offset 3: read tcp 11.0.0.4:60875->11.0.0.5:9093: i/o timeout
00:03:58.99 info kafka-go@v0.4.47/reader.go:1272 initializing kafka reader for partition 0 of TOPICNAME starting at offset 3
00:04:00.13 info kafka-go@v0.4.47/reader.go:1473 the kafka reader for partition 0 of TOPICNAME is seeking to offset 3
00:04:00.27 info kafka-go@v0.4.47/reader.go:1375 no messages received from kafka within the allocated time for partition 0 of TOPICNAME at offset 4: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
00:04:00.27 info reader/reader.go:143 FetchMessage result: {Topic:TOPICNAME Partition:0 Offset:3 HighWaterMark:4 Key:[] Value:....}
^^^ first delivered message to the app -- from the offset 3
00:04:00.37 info kafka-go@v0.4.47/reader.go:1427 the kafka reader got an unknown error reading partition 0 of TOPICNAME at offset 4: read tcp 11.0.0.4:22278->11.0.0.5:9093: i/o timeout
...
The text was updated successfully, but these errors were encountered:
Not sure how good is that approach, but pushing the initial offset if it differs to the parent and commit it from FetchMessage itself seems like fixes the behaviour:
diff -u ~/go/pkg/mod/cache/download/github.com/segmentio/kafka-go/@v/github.com/segmentio/kafka-go@v0.4.47/reader.go ./reader.go
--- ~/go/pkg/mod/cache/download/github.com/segmentio/kafka-go/@v/github.com/segmentio/kafka-go@v0.4.47/reader.go 1979-12-31 00:00:00.000000000 +0100
+++ ./reader.go 2024-08-14 13:09:29.231441428 +0200
@@ -835,6 +835,16 @@
return Message{}, io.EOF
}
+ // "initial offset" notification
+ if m.error == nil && m.version == version && m.message.HighWaterMark==-1 && m.watermark == -1 {
+ if r.useConsumerGroup() {
+ if err := r.CommitMessages(ctx, m.message); err != nil {
+ return Message{}, err
+ }
+ }
+ continue
+ }
+
if m.version >= version {
r.mutex.Lock()
@@ -1312,6 +1322,14 @@
// Now we're sure to have an absolute offset number, may anything happen
// to the connection we know we'll want to restart from this offset.
+ if offset != start {
+ // If we had to seek from non-absolute offset, inform Reader about initial point
+ // So it get remembered in case of reconnection.
+ if err = r.sendMessage(ctx, Message{Topic:r.topic, Partition:r.partition, Offset:start-1, HighWaterMark:-1}, -1); err != nil {
+ conn.Close()
+ return
+ }
+ }
offset = start
errcount := 0
Just as an idea to fix, i've verified it fixes the issue -- but I believe the code may and should be better & tested :) -- so I don't have ready-to-use patch here.
Describe the bug
Consider the scenario ([P] = publisher, [R] = reader)
[P] testtopic is created (empty)
[R] new Reader is created with StartOffset: LastOffset
=> new ConsumerGroup initialized; with map[{topic:testtopic partition:0}:-1]
=> new reader starts, which seeks to offset 0 and start polling
[P] publish message to testtopic [message with offset 0 became available]
[R] tries to reader.read message, got connection error; it retries and fails N times in a row
[R] reestablishes connection
[R] get offset from ConsumerGroup: -1, which is "LastOffset"
[R] starts reader which seeks to offset 1
[R] polls from offset 1
At this point message with offset [0] is lost.
Basically, ANY amount of published messages until the first successful CommitMessages is lost.
Kafka Version
kafka-go v0.4.47
To Reproduce
Issue captured by running large system under deterministic distributed simulator.
I am not sure how to construct hermetic test here as I am not familiar with your code.
Expected Behavior
Message with offset 0 is read and processed, at least once.
Observed Behavior
Few initial messages is lost:
The text was updated successfully, but these errors were encountered: