Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Host.Kafka] Leverage autocommit to achieve faster consumption #131

Open
zarusz opened this issue Oct 26, 2022 · 13 comments
Open

[Host.Kafka] Leverage autocommit to achieve faster consumption #131

zarusz opened this issue Oct 26, 2022 · 13 comments

Comments

@zarusz
Copy link
Owner

zarusz commented Oct 26, 2022

Currently, the Kafka provider uses a manual commit approach (every N message, every M seconds).
We should also give the option to perform auto commits by Kafka. This should significantly increase consumption speed.

@themiken
Copy link

themiken commented Sep 4, 2024

Hi @zarusz!!!

Currently, messages are being auto-committed by the consumer. However, I need to handle the commit manually because my consumer is encountering an exception during message processing. Despite the error, the library is still committing the offset, which is not the desired behavior.

Is there a way to disable the auto-commit feature and manually commit the messages only after successful processing?

Correct me if what I mentioned earlier is correct.

@zarusz
Copy link
Owner Author

zarusz commented Sep 4, 2024

So yes, the kafka provider moves forward which is not desired in your case.

This is a standardized way to tap into error handling (in case you haven't seen it):
https://github.com/zarusz/SlimMessageBus/blob/master/docs/intro.md#error-handling

However, what is the desired error handling? If not auto commit and move forward then what else?

I could expose something similar as for RabbitMQ if that would work for you to give you a bit more control
https://github.com/zarusz/SlimMessageBus/blob/master/docs/provider_rabbitmq.md#custom-consumer-error-handler

See option 1 (new feature). Options 2 and 3 should be achievable now.

public class CustomKafkaConsumerErrorHandler<T>(IMessageBus Bus) : IKafkaConsumerErrorHandler<T>
{
    // Inject needed dependencies via construction

    public async Task<bool> OnHandleError(T message, Func<Task> retry, IConsumerContext consumerContext, Exception exception)
    {
        // Option1: New feature to stop consumer
        // consumerContext.StopConsumer();
        
        // Option 2: Retry once more
        // await retry(); 
        
        // Option3: or forward the message to another topic: 
        // await Bus.Publish(message, path: "errors");
     
        return true; // signify the error has been handled, it will advance offset to next message
    }
}

Let me know.

@jarb46
Copy link

jarb46 commented Sep 10, 2024

How can I generate manual commit on the "SlimMessageBus.Host.Kafka" library? Since the "EnableAutoCommit" property is set to "false" in the "CreateConsumer" method of the "KafkaGroupConsumer" class. Additionally, what should be applied so that the "OnStart" method of the "KafkaGroupConsumer" class does not fire? since the exercise was done to modify this value of the "EnableAutoCommit" property to "true", but the "OnStart" method is still being triggered, which generates actions to process the message and commit the message

@zarusz
Copy link
Owner Author

zarusz commented Sep 11, 2024

How can I generate manual commit on the "SlimMessageBus.Host.Kafka" library? Since the "EnableAutoCommit" property is set to "false" in the "CreateConsumer" method of the "KafkaGroupConsumer" class.

The current version of the provides only supports manual commits managed by the library, which you can set every N messages or after every T time. Looking at the docs right now, it's not documented so I will add it here soon.

The consumer implementation does override the EnableAutoCommit here to take over the the control on when to commit.

Additionally, what should be applied so that the "OnStart" method of the "KafkaGroupConsumer" class does not fire? since the exercise was done to modify this value of the "EnableAutoCommit" property to "true", but the "OnStart" method is still being triggered, which generates actions to process the message and commit the message

By default when the Bus starts it will auto start all consumers.
Wheather the consumers should start or to stop them during runtime could be controlled see here.

What I am getting from this conversation including feedback (from @themiken earlier) is that we need these features:

  • user driven manual commits (right now they are manually but controller by the library every N messages / T seconds)
  • improved error handling along the lines of what I proposed here.

This ticket was originally about enabling auto commit controlled purely by Kafka (to increase throughput)

Please help me understand given what I've explained above, what is missing feature wise and I can look at adding it. We'd have to look at the proposed API in this ticket, iterate few times to get it right and then I could follow with an enhancement.

@jarb46
Copy link

jarb46 commented Sep 11, 2024

Thank you very much in advance for answering our questions.

To better understand our problem, it is all related to the loss of messages when consuming them, either because an error occurs in our application layer or because an error occurs in the "SlimMessageBus" library.

In our tests, linking the library as a project, we have identified that in the "ConsumerLoop" method of the "KafkaGroupConsumer" class, the "OnPartitionEndReached(consumeResult.TopicPartitionOffset);" instruction is executed, which causes it to generate commits to the offset of partitions that have not been processed and therefore generate the loss of these messages, and then execute in the same loop statement the instruction "await OnMessage(consumeResult).ConfigureAwait(false);", which processes the message obtained from the identified offset and commits the corresponding offset. To avoid the above, the value of the "EnablePartitionEof" property found in the "CreateConsumer" method of the "KafkaGroupConsumer" class was modified to "false" and thus avoid executing the "OnPartitionEndReached(consumeResult.TopicPartitionOffset);" instruction. These actions could be controlled if the "EnablePartitionEof" property could be configured from the user, so that the "OnPartitionEndReached(consumeResult.TopicPartitionOffset);" instruction is not executed. Additionally, it was found that when applying the previously mentioned setting with the "await OnMessage(consumeResult).ConfigureAwait(false);" instruction and having around 5 events/messages in queue, commits are generated only for some messages, not for all 5 events/messages, even so they can be processed without incident in our application layer. Following up on this latest development, we found that after processing the offset message in our application and then applying the commit, it was not generated, since the return value of the "Increment" method of the "CheckpointTrigger" class was "false" preventing the commit from being made, this caused it to consume an event again at an offset that had already been processed, but without generating a commit, duplicating events/messages in the application.

We would appreciate your advice to achieve the correct generation of commits, without generating loss of events/messages, whether the control is held by the library or by the user.

@zarusz
Copy link
Owner Author

zarusz commented Sep 11, 2024

@jarb46 thanks for reporting and the detail provided.

I have few additional questions:

  • What version of SMB.Host.Kafka are you using?
  • Did you pull the latest transitive dependency Confluent.Kafka? I noticed there was some breaking change introduced between 2.3.0 and latest causing this issue (might be unrelated).

Let me analyze the problem here on my side.

Also as part of #300, I clarified the documentation.

As a side note there is this Source Link which should allow you to step into the library source.

@zarusz
Copy link
Owner Author

zarusz commented Sep 12, 2024

@jarb46 I've identified the issue around the PartitionEOF and fixed it in #301.
There is a preview version if you want to try it: https://www.nuget.org/packages/SlimMessageBus.Host.Kafka/2.5.3-rc1

I need to address a second part of the problem, so I will report here once I have an update.

@zarusz
Copy link
Owner Author

zarusz commented Sep 18, 2024

@jarb46 the issue with PartitionEOF has been resolved.
Moreover, as mentioned earlier the commit settings have been documented now.
The Kafka client has been updated to the latest.

Here is the latest release - please give it a try!
https://github.com/zarusz/SlimMessageBus/releases/tag/2.5.3
https://www.nuget.org/packages/SlimMessageBus.Host.Kafka/2.5.3

@themiken
Copy link

Thank you so much for your effort, @zarusz !!!.
At the moment, we are testing the changes.
We will keep you updated.

@themiken
Copy link

themiken commented Oct 24, 2024

Hi @zarusz! We conducted tests on the version across different services and observed the following:

There is a mismatch in the offset commit when processing messages. The consumer group offset is incorrectly shifted to the next number when performing the commit, causing a lag that does not correctly reflect the processing of the last message. Despite this behavior, no messages are lost; all messages are processed correctly.

  1. A new topic is created with a replication factor of 2 and 10 partitions to simulate a production environment.
    image1

  2. The service starts with the log level set to DEBUG. The log shows the registration of consumers and the creation of a consumer group with 10 partitions, corresponding to the number of partitions in the topic. Additionally, it is observed that the initial offset of each partition is 0, indicating that there are no previous messages in the topic.

     [Product] [14:41:26 INF] [Creating producers for Main bus...] [] {"Category": "Event"} <s:SlimMessageBus.Host.Kafka.KafkaMessageBus>
     [Product] [14:41:27 DBG] [Producer settings: ["[bootstrap.servers, mykafkabroker.com:9094]", "[linger.ms, 1]", "[socket.nagle.disable, True]"]] [] {"Category": "Event"} <s:SlimMessageBus.Host.Kafka.KafkaMessageBus>
     [Product] [14:41:27 INF] [Starting consumers for Main bus...] [] {"Category": "Event"} <s:SlimMessageBus.Host.MessageBusBase>
     [Product] [14:41:27 INF] [Creating consumers for Main bus...] [] {"Category": "Event"} <s:SlimMessageBus.Host.MessageBusBase>
     [Product] [14:41:27 INF] [Creating consumer group ProductUpdatedDomainEventGroup] [] {"Category": "Event"} <s:SlimMessageBus.Host.Kafka.KafkaMessageBus>
     [Product] [14:41:27 INF] [Creating for Group: ProductUpdatedDomainEventGroup, Topics: ProductUpdatedDomainEventTopic] [] {"Category": "Event"} <s:SlimMessageBus.Host.Kafka.KafkaGroupConsumer>
     [Product] [14:41:27 INF] [Started consumers for Main bus] [] {"Category": "Event"} <s:SlimMessageBus.Host.MessageBusBase>
     [Product] [14:41:27 INF] [The DomainEventBackgroundService was initialize] [] {"Category": "Event"} <s:Siigo.Core.MessageBus.Services.DomainEventBackgroundService>
     [Product] [14:41:27 INF] [Group [ProductUpdatedDomainEventGroup]: Subscribing to topics: ProductUpdatedDomainEventTopic] [] {"Category": "Event"} <s:SlimMessageBus.Host.Kafka.KafkaGroupConsumer>
     [Product] [14:41:27 INF] [Group [ProductUpdatedDomainEventGroup]: Consumer loop started] [] {"Category": "Event"} <s:SlimMessageBus.Host.Kafka.KafkaGroupConsumer>
     [Product] [14:41:27 DBG] [Load provider connection strategy: MultiTenant] [] {"Category": "Event"} <s:Siigo.Core.Provider.Providers.ProviderBase>
     [Product] [14:41:27 DBG] [The connection SiigoRedisCore registered successfully] [] {"Category": "Event"} <s:Siigo.Core.DistributedCache.Redis.RedisConnectionFactory>
     [Product] [14:41:28 WRN] [Overriding address(es) 'http://localhost:5000'. Binding to endpoints defined via IConfiguration and/or UseKestrel() instead.] [] {"Category": "Event"} <s:Microsoft.AspNetCore.Server.Kestrel>
     [Product] [14:41:36 DBG] [Group [ProductUpdatedDomainEventGroup]: Assigned partition, Topic: ProductUpdatedDomainEventTopic, Partition: [0]] [] {"Category": "Event"} <s:SlimMessageBus.Host.Kafka.KafkaGroupConsumer>
     [Product] [14:41:36 INF] [Creating consumer for Group: ProductUpdatedDomainEventGroup, Topic: ProductUpdatedDomainEventTopic, Partition: [0]] [] {"Category": "Event"} <s:SlimMessageBus.Host.Kafka.KafkaPartitionConsumer>
     [Product] [14:41:36 DBG] [Group [ProductUpdatedDomainEventGroup]: Assigned partition, Topic: ProductUpdatedDomainEventTopic, Partition: [1]] [] {"Category": "Event"} <s:SlimMessageBus.Host.Kafka.KafkaGroupConsumer>
     [Product] [14:41:36 INF] [Creating consumer for Group: ProductUpdatedDomainEventGroup, Topic: ProductUpdatedDomainEventTopic, Partition: [1]] [] {"Category": "Event"} <s:SlimMessageBus.Host.Kafka.KafkaPartitionConsumer>
     [Product] [14:41:36 DBG] [Group [ProductUpdatedDomainEventGroup]: Assigned partition, Topic: ProductUpdatedDomainEventTopic, Partition: [2]] [] {"Category": "Event"} <s:SlimMessageBus.Host.Kafka.KafkaGroupConsumer>
     [Product] [14:41:36 INF] [Creating consumer for Group: ProductUpdatedDomainEventGroup, Topic: ProductUpdatedDomainEventTopic, Partition: [2]] [] {"Category": "Event"} <s:SlimMessageBus.Host.Kafka.KafkaPartitionConsumer>
     [Product] [14:41:36 DBG] [Group [ProductUpdatedDomainEventGroup]: Assigned partition, Topic: ProductUpdatedDomainEventTopic, Partition: [3]] [] {"Category": "Event"} <s:SlimMessageBus.Host.Kafka.KafkaGroupConsumer>
     [Product] [14:41:36 INF] [Creating consumer for Group: ProductUpdatedDomainEventGroup, Topic: ProductUpdatedDomainEventTopic, Partition: [3]] [] {"Category": "Event"} <s:SlimMessageBus.Host.Kafka.KafkaPartitionConsumer>
     [Product] [14:41:36 DBG] [Group [ProductUpdatedDomainEventGroup]: Assigned partition, Topic: ProductUpdatedDomainEventTopic, Partition: [4]] [] {"Category": "Event"} <s:SlimMessageBus.Host.Kafka.KafkaGroupConsumer>
     [Product] [14:41:36 INF] [Creating consumer for Group: ProductUpdatedDomainEventGroup, Topic: ProductUpdatedDomainEventTopic, Partition: [4]] [] {"Category": "Event"} <s:SlimMessageBus.Host.Kafka.KafkaPartitionConsumer>
     [Product] [14:41:36 DBG] [Group [ProductUpdatedDomainEventGroup]: Assigned partition, Topic: ProductUpdatedDomainEventTopic, Partition: [5]] [] {"Category": "Event"} <s:SlimMessageBus.Host.Kafka.KafkaGroupConsumer>
     [Product] [14:41:36 INF] [Creating consumer for Group: ProductUpdatedDomainEventGroup, Topic: ProductUpdatedDomainEventTopic, Partition: [5]] [] {"Category": "Event"} <s:SlimMessageBus.Host.Kafka.KafkaPartitionConsumer>
     [Product] [14:41:36 DBG] [Group [ProductUpdatedDomainEventGroup]: Assigned partition, Topic: ProductUpdatedDomainEventTopic, Partition: [6]] [] {"Category": "Event"} <s:SlimMessageBus.Host.Kafka.KafkaGroupConsumer>
     [Product] [14:41:36 INF] [Creating consumer for Group: ProductUpdatedDomainEventGroup, Topic: ProductUpdatedDomainEventTopic, Partition: [6]] [] {"Category": "Event"} <s:SlimMessageBus.Host.Kafka.KafkaPartitionConsumer>
     [Product] [14:41:36 DBG] [Group [ProductUpdatedDomainEventGroup]: Assigned partition, Topic: ProductUpdatedDomainEventTopic, Partition: [7]] [] {"Category": "Event"} <s:SlimMessageBus.Host.Kafka.KafkaGroupConsumer>
     [Product] [14:41:36 INF] [Creating consumer for Group: ProductUpdatedDomainEventGroup, Topic: ProductUpdatedDomainEventTopic, Partition: [7]] [] {"Category": "Event"} <s:SlimMessageBus.Host.Kafka.KafkaPartitionConsumer>
     [Product] [14:41:36 DBG] [Group [ProductUpdatedDomainEventGroup]: Assigned partition, Topic: ProductUpdatedDomainEventTopic, Partition: [8]] [] {"Category": "Event"} <s:SlimMessageBus.Host.Kafka.KafkaGroupConsumer>
     [Product] [14:41:36 INF] [Creating consumer for Group: ProductUpdatedDomainEventGroup, Topic: ProductUpdatedDomainEventTopic, Partition: [8]] [] {"Category": "Event"} <s:SlimMessageBus.Host.Kafka.KafkaPartitionConsumer>
     [Product] [14:41:36 DBG] [Group [ProductUpdatedDomainEventGroup]: Assigned partition, Topic: ProductUpdatedDomainEventTopic, Partition: [9]] [] {"Category": "Event"} <s:SlimMessageBus.Host.Kafka.KafkaGroupConsumer>
     [Product] [14:41:36 INF] [Creating consumer for Group: ProductUpdatedDomainEventGroup, Topic: ProductUpdatedDomainEventTopic, Partition: [9]] [] {"Category": "Event"} <s:SlimMessageBus.Host.Kafka.KafkaPartitionConsumer>
     [Product] [14:41:37 DBG] [Group [ProductUpdatedDomainEventGroup]: Reached end of partition, Topic: ProductUpdatedDomainEventTopic, Partition: [2], Offset: 0] [] {"Category": "Event"} <s:SlimMessageBus.Host.Kafka.KafkaGroupConsumer>
     [Product] [14:41:37 DBG] [Group [ProductUpdatedDomainEventGroup]: Reached end of partition, Topic: ProductUpdatedDomainEventTopic, Partition: [9], Offset: 0] [] {"Category": "Event"} <s:SlimMessageBus.Host.Kafka.KafkaGroupConsumer>
     [Product] [14:41:37 DBG] [Group [ProductUpdatedDomainEventGroup]: Reached end of partition, Topic: ProductUpdatedDomainEventTopic, Partition: [1], Offset: 0] [] {"Category": "Event"} <s:SlimMessageBus.Host.Kafka.KafkaGroupConsumer>
     [Product] [14:41:37 DBG] [Group [ProductUpdatedDomainEventGroup]: Reached end of partition, Topic: ProductUpdatedDomainEventTopic, Partition: [8], Offset: 0] [] {"Category": "Event"} <s:SlimMessageBus.Host.Kafka.KafkaGroupConsumer>
     [Product] [14:41:37 DBG] [Group [ProductUpdatedDomainEventGroup]: Reached end of partition, Topic: ProductUpdatedDomainEventTopic, Partition: [5], Offset: 0] [] {"Category": "Event"} <s:SlimMessageBus.Host.Kafka.KafkaGroupConsumer>
     [Product] [14:41:38 DBG] [Group [ProductUpdatedDomainEventGroup]: Reached end of partition, Topic: ProductUpdatedDomainEventTopic, Partition: [7], Offset: 0] [] {"Category": "Event"} <s:SlimMessageBus.Host.Kafka.KafkaGroupConsumer>
     [Product] [14:41:38 DBG] [Group [ProductUpdatedDomainEventGroup]: Reached end of partition, Topic: ProductUpdatedDomainEventTopic, Partition: [4], Offset: 0] [] {"Category": "Event"} <s:SlimMessageBus.Host.Kafka.KafkaGroupConsumer>
     [Product] [14:41:38 DBG] [Group [ProductUpdatedDomainEventGroup]: Reached end of partition, Topic: ProductUpdatedDomainEventTopic, Partition: [6], Offset: 0] [] {"Category": "Event"} <s:SlimMessageBus.Host.Kafka.KafkaGroupConsumer>
     [Product] [14:41:38 DBG] [Group [ProductUpdatedDomainEventGroup]: Reached end of partition, Topic: ProductUpdatedDomainEventTopic, Partition: [3], Offset: 0] [] {"Category": "Event"} <s:SlimMessageBus.Host.Kafka.KafkaGroupConsumer>
     [Product] [14:41:38 DBG] [Group [ProductUpdatedDomainEventGroup]: Reached end of partition, Topic: ProductUpdatedDomainEventTopic, Partition: [0], Offset: 0] [] {"Category": "Event"} <s:SlimMessageBus.Host.Kafka.KafkaGroupConsumer>
    
  3. A test message is sent to the topic, which is received and processed correctly by the microservice. However, when committing the message, the service reports that it has reached the end of the partition at offset 1, even though the processed message has an offset of 0. The lag shows a discrepancy of 1 between the processed message’s offset (0) and the new end offset (1).

image2

[Product] [15:49:44 DBG] [No message type header was present, defaulting to the only declared message type Siigo.MyService.Domain.Aggregates.Product.Events.ProductUpdatedDomainEvent] [] {"Category": "Event"} <s:SlimMessageBus.Host.MessageProcessor>
[Product] [15:49:44 DBG] [Type Siigo.MyService.Domain.Aggregates.Product.Events.ProductUpdatedDomainEvent deserialized from JSON {"Id": 0} to Siigo.MyService.Domain.Aggregates.Product.Events.ProductUpdatedDomainEvent] [] {"Category": "Event"} <s:SlimMessageBus.Host.Serialization.Json.JsonMessageSerializer>
[Product] [15:49:44 DBG] [Creating message scope for Siigo.MyService.Domain.Aggregates.Product.Events.ProductUpdatedDomainEvent of type Siigo.MyService.Domain.Aggregates.Product.Events.ProductUpdatedDomainEvent] [] {"Category": "Event"} <s:SlimMessageBus.Host.MessageBusBase>
[Product] [15:49:44 INF] [Message receive!!] [] {"Category": "Event"} <s:Siigo.MyService.Domain.Aggregates.Product.Events.ProductUpdatedDomainEvent>
[Product] [15:49:44 DBG] [Group [ProductUpdatedDomainEventGroup]: Reached end of partition, Topic: ProductUpdatedDomainEventTopic, Partition: [0], Offset: 1] [] {"Category": "Event"} <s:SlimMessageBus.Host.Kafka.KafkaGroupConsumer>
[Product] [15:49:44 DBG] [Group [ProductUpdatedDomainEventGroup]: Commit at Offset: 0, Partition: [0], Topic: ProductUpdatedDomainEventTopic] [] {"Category": "Event"} <s:SlimMessageBus.Host.Kafka.KafkaPartitionConsumer>
[Product] [15:49:44 DBG] [Group [ProductUpdatedDomainEventGroup]: Commit Offset, Topic: ProductUpdatedDomainEventTopic, Partition: [0], Offset: 0] [] {"Category": "Event"} <s:SlimMessageBus.Host.Kafka.KafkaGroupConsumer>

Screenshot from 2024-10-24 15-52-23

Screenshot from 2024-10-24 15-53-05

  1. Four additional messages are sent, all processed correctly. However, the offset continues to exhibit the same erroneous behavior: the commit is performed at a previous offset, causing the lag to increase. The last message has an offset of 4, but the system reports an end offset of 5.

image4

[Product] [15:54:32 DBG] [Group [ProductUpdatedDomainEventGroup]: Received message with Topic: ProductUpdatedDomainEventTopic, Partition: [0], Offset: 2, payload size: 9] [] {"Category": "Event"} <s:SlimMessageBus.Host.Kafka.KafkaGroupConsumer>
[Product] [15:54:32 DBG] [No message type header was present, defaulting to the only declared message type Siigo.MyService.Domain.Aggregates.Product.Events.ProductUpdatedDomainEvent] [] {"Category": "Event"} <s:SlimMessageBus.Host.MessageProcessor>
[Product] [15:54:32 DBG] [Type Siigo.MyService.Domain.Aggregates.Product.Events.ProductUpdatedDomainEvent deserialized from JSON {"Id": 2} to Siigo.MyService.Domain.Aggregates.Product.Events.ProductUpdatedDomainEvent] [] {"Category": "Event"} <s:SlimMessageBus.Host.Serialization.Json.JsonMessageSerializer>
[Product] [15:54:32 DBG] [Creating message scope for Siigo.MyService.Domain.Aggregates.Product.Events.ProductUpdatedDomainEvent of type Siigo.MyService.Domain.Aggregates.Product.Events.ProductUpdatedDomainEvent] [] {"Category": "Event"} <s:SlimMessageBus.Host.MessageBusBase>
[Product] [15:54:32 INF] [Message receive!!] [] {"Category": "Event"} <s:Siigo.MyService.Domain.Aggregates.Product.Events.ProductUpdatedDomainEvent>
[Product] [15:54:33 DBG] [Group [ProductUpdatedDomainEventGroup]: Reached end of partition, Topic: ProductUpdatedDomainEventTopic, Partition: [0], Offset: 3] [] {"Category": "Event"} <s:SlimMessageBus.Host.Kafka.KafkaGroupConsumer>
[Product] [15:54:33 DBG] [Group [ProductUpdatedDomainEventGroup]: Commit at Offset: 2, Partition: [0], Topic: ProductUpdatedDomainEventTopic] [] {"Category": "Event"} <s:SlimMessageBus.Host.Kafka.KafkaPartitionConsumer>
[Product] [15:54:33 DBG] [Group [ProductUpdatedDomainEventGroup]: Commit Offset, Topic: ProductUpdatedDomainEventTopic, Partition: [0], Offset: 2] [] {"Category": "Event"} <s:SlimMessageBus.Host.Kafka.KafkaGroupConsumer>
[Product] [15:54:40 DBG] [Group [ProductUpdatedDomainEventGroup]: Received message with Topic: ProductUpdatedDomainEventTopic, Partition: [0], Offset: 3, payload size: 9] [] {"Category": "Event"} <s:SlimMessageBus.Host.Kafka.KafkaGroupConsumer>
[Product] [15:54:40 DBG] [No message type header was present, defaulting to the only declared message type Siigo.MyService.Domain.Aggregates.Product.Events.ProductUpdatedDomainEvent] [] {"Category": "Event"} <s:SlimMessageBus.Host.MessageProcessor>
[Product] [15:54:40 DBG] [Type Siigo.MyService.Domain.Aggregates.Product.Events.ProductUpdatedDomainEvent deserialized from JSON {"Id": 3} to Siigo.MyService.Domain.Aggregates.Product.Events.ProductUpdatedDomainEvent] [] {"Category": "Event"} <s:SlimMessageBus.Host.Serialization.Json.JsonMessageSerializer>
[Product] [15:54:40 DBG] [Creating message scope for Siigo.MyService.Domain.Aggregates.Product.Events.ProductUpdatedDomainEvent of type Siigo.MyService.Domain.Aggregates.Product.Events.ProductUpdatedDomainEvent] [] {"Category": "Event"} <s:SlimMessageBus.Host.MessageBusBase>
[Product] [15:54:40 INF] [Message receive!!] [] {"Category": "Event"} <s:Siigo.MyService.Domain.Aggregates.Product.Events.ProductUpdatedDomainEvent>
[Product] [15:54:40 DBG] [Group [ProductUpdatedDomainEventGroup]: Reached end of partition, Topic: ProductUpdatedDomainEventTopic, Partition: [0], Offset: 4] [] {"Category": "Event"} <s:SlimMessageBus.Host.Kafka.KafkaGroupConsumer>
[Product] [15:54:40 DBG] [Group [ProductUpdatedDomainEventGroup]: Commit at Offset: 3, Partition: [0], Topic: ProductUpdatedDomainEventTopic] [] {"Category": "Event"} <s:SlimMessageBus.Host.Kafka.KafkaPartitionConsumer>
[Product] [15:54:40 DBG] [Group [ProductUpdatedDomainEventGroup]: Commit Offset, Topic: ProductUpdatedDomainEventTopic, Partition: [0], Offset: 3] [] {"Category": "Event"} <s:SlimMessageBus.Host.Kafka.KafkaGroupConsumer>
[Product] [15:54:48 DBG] [Group [ProductUpdatedDomainEventGroup]: Received message with Topic: ProductUpdatedDomainEventTopic, Partition: [0], Offset: 4, payload size: 9] [] {"Category": "Event"} <s:SlimMessageBus.Host.Kafka.KafkaGroupConsumer>
[Product] [15:54:48 DBG] [No message type header was present, defaulting to the only declared message type Siigo.MyService.Domain.Aggregates.Product.Events.ProductUpdatedDomainEvent] [] {"Category": "Event"} <s:SlimMessageBus.Host.MessageProcessor>
[Product] [15:54:48 DBG] [Type Siigo.MyService.Domain.Aggregates.Product.Events.ProductUpdatedDomainEvent deserialized from JSON {"Id": 4} to Siigo.MyService.Domain.Aggregates.Product.Events.ProductUpdatedDomainEvent] [] {"Category": "Event"} <s:SlimMessageBus.Host.Serialization.Json.JsonMessageSerializer>
[Product] [15:54:48 DBG] [Creating message scope for Siigo.MyService.Domain.Aggregates.Product.Events.ProductUpdatedDomainEvent of type Siigo.MyService.Domain.Aggregates.Product.Events.ProductUpdatedDomainEvent] [] {"Category": "Event"} <s:SlimMessageBus.Host.MessageBusBase>
[Product] [15:54:48 INF] [Message receive!!] [] {"Category": "Event"} <s:Siigo.MyService.Domain.Aggregates.Product.Events.ProductUpdatedDomainEvent>
[Product] [15:54:48 DBG] [Group [ProductUpdatedDomainEventGroup]: Reached end of partition, Topic: ProductUpdatedDomainEventTopic, Partition: [0], Offset: 5] [] {"Category": "Event"} <s:SlimMessageBus.Host.Kafka.KafkaGroupConsumer>
[Product] [15:54:48 DBG] [Group [ProductUpdatedDomainEventGroup]: Commit at Offset: 4, Partition: [0], Topic: ProductUpdatedDomainEventTopic] [] {"Category": "Event"} <s:SlimMessageBus.Host.Kafka.KafkaPartitionConsumer>
[Product] [15:54:48 DBG] [Group [ProductUpdatedDomainEventGroup]: Commit Offset, Topic: ProductUpdatedDomainEventTopic, Partition: [0], Offset: 4] [] {"Category": "Event"} <s:SlimMessageBus.Host.Kafka.KafkaGroupConsumer>

This behavior is consistent across all partitions of the topic. When adding new consumers to the group, it is evident that the lag remains equal to (# partitions x 1). In production environments, this results in cumulative lag across all partitions, although no messages are lost.

The offset commit mismatch seems to be due to incorrect handling of offsets in the consumer group, causing an apparent lag without data loss. This behavior affects lag monitoring in production environments, potentially causing confusion about the actual state of message processing.

@zarusz
Copy link
Owner Author

zarusz commented Oct 25, 2024

@themiken The offset reported by Kafka in the Partion EOF (Reached end of partition) indicates the next offset that will be assigned in case a message arrives after it. It is not the value we should be committing with, but we should be committing the index of the last message that was processed. So, what SMB in the Partition EOF event is to commit that last processed message offset (on the partition-topic) - not the next index that arrived in the Partition EOF event. This was actually the bug I've fixed last time that caused messages to be skipped.

For example, in the case on starting with brand new topics the EOF event will report 0 - meaning if a message arrives it will get 0 index.
Then when first message arrives, it actually has index 0, then if no subsequent messages arrive an EOF event with index 1 is reported, and SMB issues a commit for the last processed index (on that partition-topic) which is 0 in this case.

So it works properly.
Unless you see a message with the same index being processed twice?

@themiken
Copy link

Correct, I understand the purpose of Partition EOF better now.

But according to this behavior, there shouldn't be any lag, right?

This is exactly the behavior I’m experiencing. Messages are processed perfectly, but there is a lag of 1 for each partition.

In this example, a service with 10 consumers and 10 partitions has a lag of 10 (all messages have been processed, with no loss of messages/offsets or duplicates).

Screenshot from 2024-10-25 15-44-25

Screenshot from 2024-10-25 15-44-37

slimpartitions

zarusz added a commit that referenced this issue Oct 26, 2024
Signed-off-by: Tomasz Maruszak <maruszaktomasz@gmail.com>
zarusz added a commit that referenced this issue Oct 26, 2024
Signed-off-by: Tomasz Maruszak <maruszaktomasz@gmail.com>
@zarusz
Copy link
Owner Author

zarusz commented Oct 26, 2024

@themiken perhaps this warrants another issue to be created, as we went off track here with the original intent of this ticket.

Reading this:

The difference between log-end-offset and current-offset is called consumer lag.

A small constant lag value often exists in a healthy cluster and is nothing to worry about. However, a continuously increasing lag value or a sudden spike in lag value often indicates a problem.

In your, example the constant lag of 1 made me search more on this, and I found there is some inconsistency in the API with the Confluent.Kafka.

I have created an RC release for you to try out now:
https://www.nuget.org/packages/SlimMessageBus.Host.Kafka/2.5.4-rc2

With that it also uses latest Confluent.Kafka lib.

Let me know your findings.

zarusz added a commit that referenced this issue Oct 26, 2024
Signed-off-by: Tomasz Maruszak <maruszaktomasz@gmail.com>
zarusz added a commit that referenced this issue Oct 26, 2024
Signed-off-by: Tomasz Maruszak <maruszaktomasz@gmail.com>
zarusz added a commit that referenced this issue Oct 26, 2024
…nt lib, remove cloudkarafka

Signed-off-by: Tomasz Maruszak <maruszaktomasz@gmail.com>
zarusz added a commit that referenced this issue Oct 26, 2024
…nt lib, remove cloudkarafka

Signed-off-by: Tomasz Maruszak <maruszaktomasz@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants