-
Notifications
You must be signed in to change notification settings - Fork 78
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Host.Kafka] Leverage autocommit to achieve faster consumption #131
Comments
Hi @zarusz!!! Currently, messages are being auto-committed by the consumer. However, I need to handle the commit manually because my consumer is encountering an exception during message processing. Despite the error, the library is still committing the offset, which is not the desired behavior. Is there a way to disable the auto-commit feature and manually commit the messages only after successful processing? Correct me if what I mentioned earlier is correct. |
So yes, the kafka provider moves forward which is not desired in your case. This is a standardized way to tap into error handling (in case you haven't seen it): However, what is the desired error handling? If not auto commit and move forward then what else? I could expose something similar as for RabbitMQ if that would work for you to give you a bit more control See option 1 (new feature). Options 2 and 3 should be achievable now. public class CustomKafkaConsumerErrorHandler<T>(IMessageBus Bus) : IKafkaConsumerErrorHandler<T>
{
// Inject needed dependencies via construction
public async Task<bool> OnHandleError(T message, Func<Task> retry, IConsumerContext consumerContext, Exception exception)
{
// Option1: New feature to stop consumer
// consumerContext.StopConsumer();
// Option 2: Retry once more
// await retry();
// Option3: or forward the message to another topic:
// await Bus.Publish(message, path: "errors");
return true; // signify the error has been handled, it will advance offset to next message
}
} Let me know. |
How can I generate manual commit on the "SlimMessageBus.Host.Kafka" library? Since the "EnableAutoCommit" property is set to "false" in the "CreateConsumer" method of the "KafkaGroupConsumer" class. Additionally, what should be applied so that the "OnStart" method of the "KafkaGroupConsumer" class does not fire? since the exercise was done to modify this value of the "EnableAutoCommit" property to "true", but the "OnStart" method is still being triggered, which generates actions to process the message and commit the message |
The current version of the provides only supports manual commits managed by the library, which you can set every N messages or after every T time. Looking at the docs right now, it's not documented so I will add it here soon. The consumer implementation does override the
By default when the Bus starts it will auto start all consumers. What I am getting from this conversation including feedback (from @themiken earlier) is that we need these features:
This ticket was originally about enabling auto commit controlled purely by Kafka (to increase throughput) Please help me understand given what I've explained above, what is missing feature wise and I can look at adding it. We'd have to look at the proposed API in this ticket, iterate few times to get it right and then I could follow with an enhancement. |
Thank you very much in advance for answering our questions. To better understand our problem, it is all related to the loss of messages when consuming them, either because an error occurs in our application layer or because an error occurs in the "SlimMessageBus" library. In our tests, linking the library as a project, we have identified that in the "ConsumerLoop" method of the "KafkaGroupConsumer" class, the "OnPartitionEndReached(consumeResult.TopicPartitionOffset);" instruction is executed, which causes it to generate commits to the offset of partitions that have not been processed and therefore generate the loss of these messages, and then execute in the same loop statement the instruction "await OnMessage(consumeResult).ConfigureAwait(false);", which processes the message obtained from the identified offset and commits the corresponding offset. To avoid the above, the value of the "EnablePartitionEof" property found in the "CreateConsumer" method of the "KafkaGroupConsumer" class was modified to "false" and thus avoid executing the "OnPartitionEndReached(consumeResult.TopicPartitionOffset);" instruction. These actions could be controlled if the "EnablePartitionEof" property could be configured from the user, so that the "OnPartitionEndReached(consumeResult.TopicPartitionOffset);" instruction is not executed. Additionally, it was found that when applying the previously mentioned setting with the "await OnMessage(consumeResult).ConfigureAwait(false);" instruction and having around 5 events/messages in queue, commits are generated only for some messages, not for all 5 events/messages, even so they can be processed without incident in our application layer. Following up on this latest development, we found that after processing the offset message in our application and then applying the commit, it was not generated, since the return value of the "Increment" method of the "CheckpointTrigger" class was "false" preventing the commit from being made, this caused it to consume an event again at an offset that had already been processed, but without generating a commit, duplicating events/messages in the application. We would appreciate your advice to achieve the correct generation of commits, without generating loss of events/messages, whether the control is held by the library or by the user. |
@jarb46 thanks for reporting and the detail provided. I have few additional questions:
Let me analyze the problem here on my side. Also as part of #300, I clarified the documentation. As a side note there is this Source Link which should allow you to step into the library source. |
@jarb46 I've identified the issue around the PartitionEOF and fixed it in #301. I need to address a second part of the problem, so I will report here once I have an update. |
@jarb46 the issue with PartitionEOF has been resolved. Here is the latest release - please give it a try! |
Thank you so much for your effort, @zarusz !!!. |
Hi @zarusz! We conducted tests on the version across different services and observed the following: There is a mismatch in the offset commit when processing messages. The consumer group offset is incorrectly shifted to the next number when performing the commit, causing a lag that does not correctly reflect the processing of the last message. Despite this behavior, no messages are lost; all messages are processed correctly.
This behavior is consistent across all partitions of the topic. When adding new consumers to the group, it is evident that the lag remains equal to (# partitions x 1). In production environments, this results in cumulative lag across all partitions, although no messages are lost. The offset commit mismatch seems to be due to incorrect handling of offsets in the consumer group, causing an apparent lag without data loss. This behavior affects lag monitoring in production environments, potentially causing confusion about the actual state of message processing. |
@themiken The offset reported by Kafka in the Partion EOF (Reached end of partition) indicates the next offset that will be assigned in case a message arrives after it. It is not the value we should be committing with, but we should be committing the index of the last message that was processed. So, what SMB in the Partition EOF event is to commit that last processed message offset (on the partition-topic) - not the next index that arrived in the Partition EOF event. This was actually the bug I've fixed last time that caused messages to be skipped. For example, in the case on starting with brand new topics the EOF event will report 0 - meaning if a message arrives it will get 0 index. So it works properly. |
Signed-off-by: Tomasz Maruszak <maruszaktomasz@gmail.com>
Signed-off-by: Tomasz Maruszak <maruszaktomasz@gmail.com>
@themiken perhaps this warrants another issue to be created, as we went off track here with the original intent of this ticket. Reading this:
In your, example the constant lag of 1 made me search more on this, and I found there is some inconsistency in the API with the Confluent.Kafka. I have created an RC release for you to try out now: With that it also uses latest Confluent.Kafka lib. Let me know your findings. |
Signed-off-by: Tomasz Maruszak <maruszaktomasz@gmail.com>
Signed-off-by: Tomasz Maruszak <maruszaktomasz@gmail.com>
…nt lib, remove cloudkarafka Signed-off-by: Tomasz Maruszak <maruszaktomasz@gmail.com>
…nt lib, remove cloudkarafka Signed-off-by: Tomasz Maruszak <maruszaktomasz@gmail.com>
Currently, the Kafka provider uses a manual commit approach (every N message, every M seconds).
We should also give the option to perform auto commits by Kafka. This should significantly increase consumption speed.
The text was updated successfully, but these errors were encountered: