Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka consumer - lag is always 1 per partition, receiving duplicate last message per partition #1380

Open
4 tasks
abhijeetsi1 opened this issue Aug 10, 2020 · 8 comments
Labels

Comments

@abhijeetsi1
Copy link

abhijeetsi1 commented Aug 10, 2020

Description

I have subscribed to a topic having 4 partitions. The consumer is receiving duplicate last message per partition. The consumer had read everything in a topic however it showing lag as 1 per partition when I restart the consumer last message is again getting received. There are 3 messages per partition in the topic, nothing is being produced in the topic still the same behavior.
Please suggest.

How to reproduce

Commit the offsets in another thread at every 10 secs, if there is something to commit.
_consumer.Subscribe(topics);
var commitTask = Task.Run(() => { CommitThread(_consumer, pc); }, _options.CancellationToken);
while (!IsCancelled(0))
{
try
{
var consumeResult = _consumer.Consume(_options.CancellationToken);
if (consumeResult.IsPartitionEOF) continue;

                    AddMessageForProcessing(consumeResult);
                }
                catch (ConsumeException consumeException)
                {
                    LogKafkaException(_consumer, consumeException);
                }
            }                

//here processor holds all the successful read offset to be committed.

private void CommitOffsets(IConsumer<string, T> consumer, KafkaParallelProcessor<T, S> processor)
{
var offsets = processor.GetOffsetsToCommit();
if (!offsets.Any()) return;

        try
        {
            try
            {
                foreach (var topicPartitionOffset in offsets)
                {
                    consumer.StoreOffset(topicPartitionOffset);
                }

                processor.SaveCommittedOffsets(offsets);

              _logger.LogInformation($"Committed: {offsets.FormatOffsets()}");
            }
            catch (KafkaException e)
            {
                _logger.LogInformation($"Unable to commit offsets. Reason: {e.Error.Reason}\nOffsets: {offsets.FormatOffsets()}");
                throw;
            }
        }
        catch (Exception e)
        {
            _logger.LogError($"Unable to commit offsets: {offsets.FormatOffsets()}", e);
        }
    }

Checklist

Please provide the following information:

  • A complete (i.e. we can run it), minimal program demonstrating the problem. No need to supply a project file.
  • [ 1.4.3] Confluent.Kafka nuget version.
  • Apache Kafka version.
  • [ "group.id": "Test_13",
    "bootstrap.servers": "servers",
    "session.timeout.ms": "120000",
    "heartbeat.interval.ms": "15000",
    "api.version.request": "true",
    "fetch.message.max.bytes": "102400",
    "log.connection.close": "false",
    "auto.offset.reset": "earliest",
    "enable.auto.commit": true,
    "enable.auto.offset.store": false ] Client configuration.
  • [Windows] Operating system.
  • Provide logs (with "debug" : "..." as necessary in configuration).
  • Provide broker log excerpts.
  • [ Yes] Critical issue.
@jerrylzy
Copy link

jerrylzy commented Aug 11, 2020

You enabled auto-commit. It's expected to receive duplicates. You can read more here.

BTW, I don't think you need to store the offset manually as you are not doing anything differently from auto offset storing.

@abhijeetsi1
Copy link
Author

I did try disabling auto-commit and committing it manually from different thread using method consumer.CommitOffsets(offsets); still getting duplicate message when consumer restarts. I don't want duplicate messages to be processed. how to solve this scenario

@mhowlett
Copy link
Contributor

i didn't look at your code, but this behavior is often due to a misunderstanding of what committed offsets represent - committed offsets are interpreted as the next message to consume, i.e. last consumed message + 1.

@krisger
Copy link

krisger commented Aug 12, 2020

I've experiencing similar issue. All fine comes when offset commited each message and handled separately.
But when for example handling batch of 100 messages and only commiting highest offset per partition issue appears.

Flow:

  1. Consumer commits all highest offsets per partition
  2. No messages left to consume
  3. Restart application
  4. It re-consumes last messages per all partitions (if 3 partitions - 3 message will be reconsumed)
  5. If you repeat 3. step always 4. step occurs unless new messages appeared in consumer, then it's starts again from 1. step.

Batch handling takes mainly under 10 secs
Consumer settings:

EnableAutoCommit = false,
AutoOffsetReset = AutoOffsetReset.Earliest,
EnablePartitionEof = true,
HeartbeatIntervalMs = 10 secs
SessionTimeoutMs = 5 mins
MaxPollIntervalMs = 2 hours
BootstrapServers = "servers"

For commiting multiple messages I use this method:
void Commit(IEnumerable<TopicPartitionOffset> offsets);
I get no exception and do commit in while loop with try catch, if I get exception I try recommit same message

Tryied separately comming all messages in foreach with this method:
void Commit(ConsumeResult<TKey, TValue> result);
Same happened.

Only if one message handled per time and commited one by one app restart, doesn't repeats last message

@alex-basiuk
Copy link

How are you getting offsets in var offsets = processor.GetOffsetsToCommit();
You need to increment the offset manually if you use the StoreOffset(TopicPartitionOffset) method.
Have a look at

public void StoreOffset(ConsumeResult<TKey, TValue> result)

@lomholdt
Copy link

This was exactly my problem. Setting the committed offset to last consumed message + 1 as @mhowlett explained fixed it. Thanks!

@mgbeokwereb
Copy link

Can you update StoreOffset to increment the offset, as it does for the other methods? Is there some with doing so? We recently noticed the same offset of 1. We fixed this internally, but would like to know the reasoning behind other consumer methods incrementing before commit and storage.

@sazarubin
Copy link

Ran into same issue. It is really surprising to see different behaviour of 2 overloads of same method.

Changing this difference in behaviour is breaking and can lead to data loss. It means the only way out is to document it. I will make a PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

9 participants