You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Also, the ReadMessage commits the message automatically.
If consumer groups are used, ReadMessage will automatically commit the offset when called. Note that this could result in an offset being committed before the message is fully processed.
If more fine-grained control of when offsets are committed is required, it is recommended to use FetchMessage with CommitMessages instead.
func main() {
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "my-topic",
Partition: 0,
MaxBytes: 10e6, // 10MB
})
for {
m, err := r.ReadMessage(context.Background())
if err != nil {
break
}
fmt.Printf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value))
r.CommitMessages(context.Background(), m) // commit offset
}
}
I submitted the offset, but each time I execute the program, it starts from the beginning, resulting in repeated consumption.
===============First Execution===========
message at offset 0: = one!
message at offset 1: = two!
message at offset 2: = three!
============== Second Execution =========
message at offset 0: = one!
message at offset 1: = two!
message at offset 2: = three!
The text was updated successfully, but these errors were encountered: