Skip to content

Commit

Permalink
Internally, kafka-go offset is consistent with itself. Unfortunately,…
Browse files Browse the repository at this point in the history
… not with the rest of the world. The commit offset should be the offset of the next message to read and NOT the last message read. Verified by running sarama and kafka-go sequentially to verify they picked up each others offsets. (segmentio#62)

Seeing as you were working in commits, I thought I would hop on your commit.
  • Loading branch information
savaki authored and achille-roussel committed Jan 10, 2018
1 parent 7619f5f commit 9066af4
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 10 deletions.
2 changes: 1 addition & 1 deletion commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func makeCommit(msg Message) commit {
return commit{
topic: msg.Topic,
partition: msg.Partition,
offset: msg.Offset,
offset: msg.Offset + 1,
}
}

Expand Down
22 changes: 22 additions & 0 deletions commit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package kafka

import "testing"

func TestMakeCommit(t *testing.T) {
msg := Message{
Topic: "blah",
Partition: 1,
Offset: 2,
}

commit := makeCommit(msg)
if commit.topic != msg.Topic {
t.Errorf("bad topic: expected %v; got %v", msg.Topic, commit.topic)
}
if commit.partition != msg.Partition {
t.Errorf("bad partition: expected %v; got %v", msg.Partition, commit.partition)
}
if commit.offset != msg.Offset+1 {
t.Errorf("expected committed offset to be 1 greater than msg offset")
}
}
3 changes: 0 additions & 3 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,9 +483,6 @@ func (r *Reader) fetchOffsets(subs map[string][]int32) (map[int]int64, error) {
for _, partition := range partitions {
if partition == pr.Partition {
offset := pr.Offset
if offset >= 0 {
offset++ // advance to next offset
}
offsetsByPartition[int(partition)] = offset
}
}
Expand Down
12 changes: 6 additions & 6 deletions reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1081,16 +1081,16 @@ func TestOffsetStash(t *testing.T) {
Given: offsetStash{},
Messages: []Message{newMessage(0, 0)},
Expected: offsetStash{
topic: {0: 0},
topic: {0: 1},
},
},
"ignores earlier offsets": {
Given: offsetStash{
topic: {0: 1},
topic: {0: 2},
},
Messages: []Message{newMessage(0, 0)},
Expected: offsetStash{
topic: {0: 1},
topic: {0: 2},
},
},
"uses latest offset": {
Expand All @@ -1101,7 +1101,7 @@ func TestOffsetStash(t *testing.T) {
newMessage(0, 1),
},
Expected: offsetStash{
topic: {0: 3},
topic: {0: 4},
},
},
"uses latest offset, across multiple topics": {
Expand All @@ -1115,8 +1115,8 @@ func TestOffsetStash(t *testing.T) {
},
Expected: offsetStash{
topic: {
0: 3,
1: 6,
0: 4,
1: 7,
},
},
},
Expand Down

0 comments on commit 9066af4

Please sign in to comment.