-
Notifications
You must be signed in to change notification settings - Fork 875
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kafka consumer - lag is always 1 per partition, receiving duplicate last message per partition #1380
Comments
You enabled auto-commit. It's expected to receive duplicates. You can read more here. BTW, I don't think you need to store the offset manually as you are not doing anything differently from auto offset storing. |
I did try disabling auto-commit and committing it manually from different thread using method consumer.CommitOffsets(offsets); still getting duplicate message when consumer restarts. I don't want duplicate messages to be processed. how to solve this scenario |
i didn't look at your code, but this behavior is often due to a misunderstanding of what committed offsets represent - committed offsets are interpreted as the next message to consume, i.e. last consumed message + 1. |
I've experiencing similar issue. All fine comes when offset commited each message and handled separately. Flow:
Batch handling takes mainly under 10 secs
For commiting multiple messages I use this method: Tryied separately comming all messages in foreach with this method: Only if one message handled per time and commited one by one app restart, doesn't repeats last message |
How are you getting offsets in
|
This was exactly my problem. Setting the committed offset to last consumed message + 1 as @mhowlett explained fixed it. Thanks! |
Can you update StoreOffset to increment the offset, as it does for the other methods? Is there some with doing so? We recently noticed the same offset of 1. We fixed this internally, but would like to know the reasoning behind other consumer methods incrementing before commit and storage. |
Ran into same issue. It is really surprising to see different behaviour of 2 overloads of same method. Changing this difference in behaviour is breaking and can lead to data loss. It means the only way out is to document it. I will make a PR. |
Description
I have subscribed to a topic having 4 partitions. The consumer is receiving duplicate last message per partition. The consumer had read everything in a topic however it showing lag as 1 per partition when I restart the consumer last message is again getting received. There are 3 messages per partition in the topic, nothing is being produced in the topic still the same behavior.
Please suggest.
How to reproduce
Commit the offsets in another thread at every 10 secs, if there is something to commit.
_consumer.Subscribe(topics);
var commitTask = Task.Run(() => { CommitThread(_consumer, pc); }, _options.CancellationToken);
while (!IsCancelled(0))
{
try
{
var consumeResult = _consumer.Consume(_options.CancellationToken);
if (consumeResult.IsPartitionEOF) continue;
//here processor holds all the successful read offset to be committed.
private void CommitOffsets(IConsumer<string, T> consumer, KafkaParallelProcessor<T, S> processor)
{
var offsets = processor.GetOffsetsToCommit();
if (!offsets.Any()) return;
Checklist
Please provide the following information:
"bootstrap.servers": "servers",
"session.timeout.ms": "120000",
"heartbeat.interval.ms": "15000",
"api.version.request": "true",
"fetch.message.max.bytes": "102400",
"log.connection.close": "false",
"auto.offset.reset": "earliest",
"enable.auto.commit": true,
"enable.auto.offset.store": false ] Client configuration.
The text was updated successfully, but these errors were encountered: