From 8bf1a5c63b8aacbba2c8c91d522c77ca37b7f2bf Mon Sep 17 00:00:00 2001 From: Tomasz Maruszak Date: Wed, 11 Sep 2024 23:56:19 +0200 Subject: [PATCH] [Host.Kafka] Fix how PartitionEOF offset is comitted [Host.Kafka] Upgrade to latest Confluent.Kafka.Net library. [Host.Kafka] Extend the kafka docs. [Host.Kafka] Use test containers for Kafka int tests. Signed-off-by: Tomasz Maruszak --- .github/workflows/build.yml | 18 +-- docs/provider_kafka.md | 36 +++++- docs/provider_kafka.t.md | 43 +++++++- src/Host.Plugin.Properties.xml | 2 +- src/Infrastructure/docker-compose.yml | 10 +- .../Consumer/IKafkaPartitionConsumer.cs | 2 +- .../Consumer/KafkaGroupConsumer.cs | 36 +++--- .../Consumer/KafkaPartitionConsumer.cs | 38 +++++-- .../KafkaMessageBus.cs | 12 +- .../SlimMessageBus.Host.Kafka.csproj | 6 +- .../Consumer/KafkaGroupConsumerTests.cs | 46 ++++++++ .../KafkaPartitionConsumerForConsumersTest.cs | 14 +-- .../KafkaPartitionConsumerForResponsesTest.cs | 15 ++- .../GlobalUsings.cs | 2 +- .../KafkaMessageBusIt.cs | 104 ++++++------------ .../appsettings.json | 2 + .../OutboxTests.cs | 9 +- 17 files changed, 247 insertions(+), 148 deletions(-) create mode 100644 src/Tests/SlimMessageBus.Host.Kafka.Test/Consumer/KafkaGroupConsumerTests.cs diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 42da06df..2656ad51 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -52,6 +52,7 @@ jobs: working-directory: ./src - name: Install Coverlet + #if: false run: find . -name "*.Test.csproj" | xargs -t -I {} dotnet add {} package coverlet.collector working-directory: ./src @@ -119,10 +120,10 @@ jobs: azure_eventhub_connectionstring: ${{ secrets.azure_eventhub_connectionstring }} azure_storagecontainer_connectionstring: ${{ secrets.azure_storagecontainer_connectionstring }} - kafka_brokers: ${{ secrets.kafka_brokers }} - kafka_username: ${{ secrets.kafka_username }} - kafka_password: ${{ secrets.kafka_password }} - kafka_secure: ${{ secrets.kafka_secure }} + #kafka_brokers: ${{ secrets.kafka_brokers }} + #kafka_username: ${{ secrets.kafka_username }} + #kafka_password: ${{ secrets.kafka_password }} + #kafka_secure: ${{ secrets.kafka_secure }} _mqtt_server: ${{ secrets.mqtt_server }} _mqtt_port: ${{ secrets.mqtt_port }} @@ -137,10 +138,10 @@ jobs: sqlserver_connectionstring: ${{ secrets.sqlserver_connectionstring }} # Connects to the local Test Containers - _kafka_brokers: localhost:9092 - _kafka_username: user - _kafka_password: password - _kafka_secure: false + kafka_brokers: localhost:9092 + kafka_username: user + kafka_password: password + kafka_secure: false mqtt_server: localhost mqtt_port: 1883 @@ -192,6 +193,7 @@ jobs: name: .NET Tests path: ./test-results/*.trx reporter: dotnet-trx + fail-on-error: false - name: Copy NuGet packages shell: bash diff --git a/docs/provider_kafka.md b/docs/provider_kafka.md index c2fa1737..e6fc15cc 100644 --- a/docs/provider_kafka.md +++ b/docs/provider_kafka.md @@ -14,6 +14,7 @@ Please read the [Introduction](intro.md) before reading this provider documentat - [Consumers](#consumers) - [Offset Commit](#offset-commit) - [Consumer Error Handling](#consumer-error-handling) + - [Debugging](#debugging) - [Deployment](#deployment) ## Underlying client @@ -22,9 +23,9 @@ The SMB Kafka implementation uses [confluent-kafka-dotnet](https://github.com/co When troubleshooting or fine tuning it is worth reading the `librdkafka` and `confluent-kafka-dotnet` docs: -- [Introduction](https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md -- [Broker version compatibility](https://github.com/edenhill/librdkafka/wiki/Broker-version-compatibility) -- [Using SSL with librdkafka](https://github.com/edenhill/librdkafka/wiki/Using-SSL-with-librdkafka) +- [Introduction](https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md) +- [Broker version compatibility](https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#broker-version-compatibility) +- [Using SSL with librdkafka](https://github.com/confluentinc/librdkafka/wiki/Using-SSL-with-librdkafka) ## Configuration properties @@ -246,6 +247,35 @@ The error handler can perform the following actions: If no custom error handler is provided, the provider logs the exception and moves on to process the next message. +### Debugging + +Kafka uses a sophisticated protocol for partition assignment: + +- Partition assignments may change due to factors like rebalancing. +- A running consumer instance might not receive any partitions if there are more consumers than partitions for a given topic. + +To better understand what's happening, you can enable Debug level logging in your library, such as SlimMessageBus.Host.Kafka.KafkaGroupConsumer. + +At this logging level, you can track the lifecycle events of the consumer group: + +``` +[00:03:06 INF] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Subscribing to topics: 4p5ma6io-test-ping +[00:03:06 INF] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Consumer loop started +[00:03:12 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Assigned partition, Topic: 4p5ma6io-test-ping, Partition: [0] +[00:03:12 INF] SlimMessageBus.Host.Kafka.KafkaPartitionConsumer Creating consumer for Group: subscriber, Topic: 4p5ma6io-test-ping, Partition: [0] +[00:03:12 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Assigned partition, Topic: 4p5ma6io-test-ping, Partition: [1] +[00:03:12 INF] SlimMessageBus.Host.Kafka.KafkaPartitionConsumer Creating consumer for Group: subscriber, Topic: 4p5ma6io-test-ping, Partition: [1] +... +[00:03:15 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Received message with Topic: 4p5ma6io-test-ping, Partition: [1], Offset: 98578, payload size: 57 +[00:03:15 INF] SlimMessageBus.Host.Kafka.Test.KafkaMessageBusIt.PingConsumer Got message 073 on topic 4p5ma6io-test-ping. +[00:03:15 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Received message with Topic: 4p5ma6io-test-ping, Partition: [1], Offset: 98579, payload size: 57 +[00:03:15 INF] SlimMessageBus.Host.Kafka.Test.KafkaMessageBusIt.PingConsumer Got message 075 on topic 4p5ma6io-test-ping. +[00:03:16 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Reached end of partition, Topic: 4p5ma6io-test-ping, Partition: [0], Offset: 100403 +[00:03:16 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Commit Offset, Topic: 4p5ma6io-test-ping, Partition: [0], Offset: 100402 +[00:03:16 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Reached end of partition, Topic: 4p5ma6io-test-ping, Partition: [1], Offset: 98580 +[00:03:16 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Commit Offset, Topic: 4p5ma6io-test-ping, Partition: [1], Offset: 98579 +``` + ## Deployment The `librdkafka` distribution for Windows requires [Visual C++ Redistributable for 2013](https://www.microsoft.com/en-US/download/details.aspx?id=40784) installed on the server. More information can be found [here](https://www.microsoft.com/en-US/download/details.aspx?id=40784). \ No newline at end of file diff --git a/docs/provider_kafka.t.md b/docs/provider_kafka.t.md index e8c6a893..56e2fde3 100644 --- a/docs/provider_kafka.t.md +++ b/docs/provider_kafka.t.md @@ -14,6 +14,7 @@ Please read the [Introduction](intro.md) before reading this provider documentat - [Consumers](#consumers) - [Offset Commit](#offset-commit) - [Consumer Error Handling](#consumer-error-handling) + - [Debugging](#debugging) - [Deployment](#deployment) ## Underlying client @@ -22,13 +23,13 @@ The SMB Kafka implementation uses [confluent-kafka-dotnet](https://github.com/co When troubleshooting or fine tuning it is worth reading the `librdkafka` and `confluent-kafka-dotnet` docs: -- [Introduction](https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md -- [Broker version compatibility](https://github.com/edenhill/librdkafka/wiki/Broker-version-compatibility) -- [Using SSL with librdkafka](https://github.com/edenhill/librdkafka/wiki/Using-SSL-with-librdkafka) +- [Introduction](https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md) +- [Broker version compatibility](https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#broker-version-compatibility) +- [Using SSL with librdkafka](https://github.com/confluentinc/librdkafka/wiki/Using-SSL-with-librdkafka) ## Configuration properties -Producer, consumer and global configuration properties are described [here](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md). +Producer, consumer and global configuration properties are described [here](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md). The configuration on the underlying Kafka client can be adjusted like so: ```cs @@ -52,7 +53,7 @@ services.AddSlimMessageBus(mbb => ### Minimizing message latency -There is a good description [here](https://github.com/edenhill/librdkafka/wiki/How-to-decrease-message-latency) on improving the latency by applying producer/consumer settings on librdkafka. Here is how you enter the settings using SlimMessageBus: +There is a good description [here](https://github.com/confluentinc/librdkafka/wiki/How-to-decrease-message-latency) on improving the latency by applying producer/consumer settings on librdkafka. Here is how you enter the settings using SlimMessageBus: ```cs services.AddSlimMessageBus(mbb => @@ -210,7 +211,8 @@ mbb ### Offset Commit -In the current Kafka provider implementation, SMB handles the manual commit of topic-partition offsets for the consumer. This configuration is controlled through the following methods on the consumer builder: +In the current Kafka provider implementation, SMB handles the manual commit of topic-partition offsets for the consumer.Th +is configuration is controlled through the following methods on the consumer builder: - `CheckpointEvery(int)` – Commits the offset after a specified number of processed messages. - `CheckpointAfter(TimeSpan)` – Commits the offset after a specified time interval. @@ -236,6 +238,35 @@ The error handler can perform the following actions: If no custom error handler is provided, the provider logs the exception and moves on to process the next message. +### Debugging + +Kafka uses a sophisticated protocol for partition assignment: + +- Partition assignments may change due to factors like rebalancing. +- A running consumer instance might not receive any partitions if there are more consumers than partitions for a given topic. + +To better understand what's happening, you can enable Debug level logging in your library, such as SlimMessageBus.Host.Kafka.KafkaGroupConsumer. + +At this logging level, you can track the lifecycle events of the consumer group: + +``` +[00:03:06 INF] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Subscribing to topics: 4p5ma6io-test-ping +[00:03:06 INF] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Consumer loop started +[00:03:12 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Assigned partition, Topic: 4p5ma6io-test-ping, Partition: [0] +[00:03:12 INF] SlimMessageBus.Host.Kafka.KafkaPartitionConsumer Creating consumer for Group: subscriber, Topic: 4p5ma6io-test-ping, Partition: [0] +[00:03:12 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Assigned partition, Topic: 4p5ma6io-test-ping, Partition: [1] +[00:03:12 INF] SlimMessageBus.Host.Kafka.KafkaPartitionConsumer Creating consumer for Group: subscriber, Topic: 4p5ma6io-test-ping, Partition: [1] +... +[00:03:15 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Received message with Topic: 4p5ma6io-test-ping, Partition: [1], Offset: 98578, payload size: 57 +[00:03:15 INF] SlimMessageBus.Host.Kafka.Test.KafkaMessageBusIt.PingConsumer Got message 073 on topic 4p5ma6io-test-ping. +[00:03:15 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Received message with Topic: 4p5ma6io-test-ping, Partition: [1], Offset: 98579, payload size: 57 +[00:03:15 INF] SlimMessageBus.Host.Kafka.Test.KafkaMessageBusIt.PingConsumer Got message 075 on topic 4p5ma6io-test-ping. +[00:03:16 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Reached end of partition, Topic: 4p5ma6io-test-ping, Partition: [0], Offset: 100403 +[00:03:16 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Commit Offset, Topic: 4p5ma6io-test-ping, Partition: [0], Offset: 100402 +[00:03:16 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Reached end of partition, Topic: 4p5ma6io-test-ping, Partition: [1], Offset: 98580 +[00:03:16 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Commit Offset, Topic: 4p5ma6io-test-ping, Partition: [1], Offset: 98579 +``` + ## Deployment The `librdkafka` distribution for Windows requires [Visual C++ Redistributable for 2013](https://www.microsoft.com/en-US/download/details.aspx?id=40784) installed on the server. More information can be found [here](https://www.microsoft.com/en-US/download/details.aspx?id=40784). diff --git a/src/Host.Plugin.Properties.xml b/src/Host.Plugin.Properties.xml index c9dc3943..86fe0a43 100644 --- a/src/Host.Plugin.Properties.xml +++ b/src/Host.Plugin.Properties.xml @@ -4,7 +4,7 @@ - 2.5.3-rc1 + 2.5.3-rc2 \ No newline at end of file diff --git a/src/Infrastructure/docker-compose.yml b/src/Infrastructure/docker-compose.yml index e9af6726..037928ad 100644 --- a/src/Infrastructure/docker-compose.yml +++ b/src/Infrastructure/docker-compose.yml @@ -1,4 +1,4 @@ -version: '3.4' +version: "3.4" services: zookeeper: @@ -16,13 +16,13 @@ services: - "9092:9092" environment: KAFKA_ADVERTISED_HOST_NAME: localhost - KAFKA_CREATE_TOPICS: "user-test-ping:2:1,user-test-echo:2:1" + KAFKA_CREATE_TOPICS: "user-test-ping:2:1,user-test-echo:2:1,user-test-echo-resp:2:1" KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 depends_on: - zookeeper networks: - slim - + mqtt: container_name: slim.mqtt image: eclipse-mosquitto:2.0.18 @@ -74,7 +74,7 @@ services: - "11002:11002" networks: - slim - + nats: container_name: slim.nats image: nats:2.10 @@ -82,8 +82,6 @@ services: - 4222:4222 networks: - slim - networks: slim: {} - \ No newline at end of file diff --git a/src/SlimMessageBus.Host.Kafka/Consumer/IKafkaPartitionConsumer.cs b/src/SlimMessageBus.Host.Kafka/Consumer/IKafkaPartitionConsumer.cs index 401abcda..acd083bd 100644 --- a/src/SlimMessageBus.Host.Kafka/Consumer/IKafkaPartitionConsumer.cs +++ b/src/SlimMessageBus.Host.Kafka/Consumer/IKafkaPartitionConsumer.cs @@ -11,7 +11,7 @@ public interface IKafkaPartitionConsumer : IDisposable void OnPartitionAssigned(TopicPartition partition); Task OnMessage(ConsumeResult message); - void OnPartitionEndReached(TopicPartitionOffset offset); + void OnPartitionEndReached(); void OnPartitionRevoked(); void OnClose(); diff --git a/src/SlimMessageBus.Host.Kafka/Consumer/KafkaGroupConsumer.cs b/src/SlimMessageBus.Host.Kafka/Consumer/KafkaGroupConsumer.cs index 3fa0e413..5d36c4de 100644 --- a/src/SlimMessageBus.Host.Kafka/Consumer/KafkaGroupConsumer.cs +++ b/src/SlimMessageBus.Host.Kafka/Consumer/KafkaGroupConsumer.cs @@ -11,14 +11,14 @@ public class KafkaGroupConsumer : AbstractConsumer, IKafkaCommitController private Task _consumerTask; private CancellationTokenSource _consumerCts; - public KafkaMessageBus MessageBus { get; } + public KafkaMessageBusSettings ProviderSettings { get; } public string Group { get; } public IReadOnlyCollection Topics { get; } - public KafkaGroupConsumer(KafkaMessageBus messageBus, string group, IReadOnlyCollection topics, Func processorFactory) - : base(messageBus.LoggerFactory.CreateLogger()) + public KafkaGroupConsumer(ILoggerFactory loggerFactory, KafkaMessageBusSettings providerSettings, string group, IReadOnlyCollection topics, Func processorFactory) + : base(loggerFactory.CreateLogger()) { - MessageBus = messageBus; + ProviderSettings = providerSettings ?? throw new ArgumentNullException(nameof(providerSettings)); Group = group ?? throw new ArgumentNullException(nameof(group)); Topics = topics ?? throw new ArgumentNullException(nameof(topics)); @@ -58,17 +58,17 @@ protected IConsumer CreateConsumer(string group) var config = new ConsumerConfig { GroupId = group, - BootstrapServers = MessageBus.ProviderSettings.BrokerList + BootstrapServers = ProviderSettings.BrokerList }; - MessageBus.ProviderSettings.ConsumerConfig(config); + ProviderSettings.ConsumerConfig(config); // ToDo: add support for auto commit config.EnableAutoCommit = false; // Notify when we reach EoF, so that we can do a manual commit - config.EnablePartitionEof = true; + config.EnablePartitionEof = true; - var consumer = MessageBus.ProviderSettings.ConsumerBuilderFactory(config) - .SetStatisticsHandler((_, json) => OnStatistics(json)) + var consumer = ProviderSettings.ConsumerBuilderFactory(config) + .SetStatisticsHandler((_, json) => OnStatistics(json)) .SetPartitionsAssignedHandler((_, partitions) => OnPartitionAssigned(partitions)) .SetPartitionsRevokedHandler((_, partitions) => OnPartitionRevoked(partitions)) .SetOffsetsCommittedHandler((_, offsets) => OnOffsetsCommitted(offsets)) @@ -120,7 +120,7 @@ protected async virtual Task ConsumerLoop() } catch (ConsumeException e) { - var pollRetryInterval = MessageBus.ProviderSettings.ConsumerPollRetryInterval; + var pollRetryInterval = ProviderSettings.ConsumerPollRetryInterval; Logger.LogError(e, "Group [{Group}]: Error occurred while polling new messages (will retry in {RetryInterval}) - {Reason}", Group, pollRetryInterval, e.Error.Reason); await Task.Delay(pollRetryInterval, _consumerCts.Token).ConfigureAwait(false); @@ -134,7 +134,7 @@ protected async virtual Task ConsumerLoop() Logger.LogInformation("Group [{Group}]: Unsubscribing from topics", Group); _consumer.Unsubscribe(); - if (MessageBus.ProviderSettings.EnableCommitOnBusStop) + if (ProviderSettings.EnableCommitOnBusStop) { OnClose(); } @@ -201,7 +201,7 @@ protected virtual void OnPartitionEndReached(TopicPartitionOffset offset) Logger.LogDebug("Group [{Group}]: Reached end of partition, Topic: {Topic}, Partition: {Partition}, Offset: {Offset}", Group, offset.Topic, offset.Partition, offset.Offset); var processor = _processors[offset.TopicPartition]; - processor.OnPartitionEndReached(offset); + processor.OnPartitionEndReached(); } protected async virtual ValueTask OnMessage(ConsumeResult message) @@ -212,15 +212,21 @@ protected async virtual ValueTask OnMessage(ConsumeResult message) await processor.OnMessage(message).ConfigureAwait(false); } - protected virtual void OnOffsetsCommitted(CommittedOffsets e) + protected internal virtual void OnOffsetsCommitted(CommittedOffsets e) { if (e.Error.IsError || e.Error.IsFatal) { - Logger.LogWarning("Group [{Group}]: Failed to commit offsets: [{Offsets}], error: {error}", Group, string.Join(", ", e.Offsets), e.Error.Reason); + if (Logger.IsEnabled(LogLevel.Warning)) + { + Logger.LogWarning("Group [{Group}]: Failed to commit offsets: [{Offsets}], error: {ErrorMessage}", Group, string.Join(", ", e.Offsets), e.Error.Reason); + } } else { - Logger.LogTrace("Group [{Group}]: Successfully committed offsets: [{Offsets}]", Group, string.Join(", ", e.Offsets)); + if (Logger.IsEnabled(LogLevel.Debug)) + { + Logger.LogDebug("Group [{Group}]: Successfully committed offsets: [{Offsets}]", Group, string.Join(", ", e.Offsets)); + } } } diff --git a/src/SlimMessageBus.Host.Kafka/Consumer/KafkaPartitionConsumer.cs b/src/SlimMessageBus.Host.Kafka/Consumer/KafkaPartitionConsumer.cs index 22d75b08..1a8fa618 100644 --- a/src/SlimMessageBus.Host.Kafka/Consumer/KafkaPartitionConsumer.cs +++ b/src/SlimMessageBus.Host.Kafka/Consumer/KafkaPartitionConsumer.cs @@ -37,13 +37,16 @@ protected KafkaPartitionConsumer(ILoggerFactory loggerFactory, AbstractConsumerS _commitController = commitController; _messageProcessor = messageProcessor; - // ToDo: Add support for Kafka driven automatic commit + // ToDo: Add support for Kafka driven automatic commit (https://github.com/zarusz/SlimMessageBus/issues/131) CheckpointTrigger = CreateCheckpointTrigger(); } private ICheckpointTrigger CreateCheckpointTrigger() { - var f = new CheckpointTriggerFactory(LoggerFactory, (configuredCheckpoints) => $"The checkpoint settings ({nameof(BuilderExtensions.CheckpointAfter)} and {nameof(BuilderExtensions.CheckpointEvery)}) across all the consumers that use the same Topic {TopicPartition.Topic} and Group {Group} must be the same (found settings are: {string.Join(", ", configuredCheckpoints)})"); + var f = new CheckpointTriggerFactory( + LoggerFactory, + (configuredCheckpoints) => $"The checkpoint settings ({nameof(BuilderExtensions.CheckpointAfter)} and {nameof(BuilderExtensions.CheckpointEvery)}) across all the consumers that use the same Topic {TopicPartition.Topic} and Group {Group} must be the same (found settings are: {string.Join(", ", configuredCheckpoints)})"); + return f.Create(ConsumerSettings); } @@ -101,11 +104,22 @@ public async Task OnMessage(ConsumeResult message) _lastOffset = message.TopicPartitionOffset; var messageHeaders = message.ToHeaders(_headerSerializer); + + // Log in trace level all the message headers converted to string + if (_logger.IsEnabled(LogLevel.Trace)) + { + foreach (var header in messageHeaders) + { + _logger.LogTrace("Group [{Group}]: Topic: {Topic}, Partition: {Partition}, Offset: {Offset}, Message Header: {HeaderKey}={HeaderValue}", Group, message.TopicPartitionOffset.Topic, message.TopicPartitionOffset.Partition, message.TopicPartitionOffset.Offset, header.Key, header.Value); + } + } + var r = await _messageProcessor.ProcessMessage(message, messageHeaders, cancellationToken: _cancellationTokenSource.Token).ConfigureAwait(false); if (r.Exception != null) { - // ToDo: Retry logic - // The OnMessageFaulted was called at this point by the MessageProcessor. + // The IKafkaConsumerErrorHandler and OnMessageFaulted was called at this point by the MessageProcessor. + // We can only log and move to the next message, as the error handling is done by the MessageProcessor. + LogError(r.Exception, message); } if (CheckpointTrigger != null && CheckpointTrigger.Increment()) @@ -114,20 +128,20 @@ public async Task OnMessage(ConsumeResult message) } } catch (Exception e) - { - _logger.LogError(e, "Group [{Group}]: Error occurred while consuming a message at Topic: {Topic}, Partition: {Partition}, Offset: {Offset}", Group, message.Topic, message.Partition, message.Offset); + { + LogError(e, message); throw; } - } + } + + private void LogError(Exception e, ConsumeResult message) + => _logger.LogError(e, "Group [{Group}]: Error occurred while consuming a message at Topic: {Topic}, Partition: {Partition}, Offset: {Offset}, Error: {ErrorMessage}", Group, message.Topic, message.Partition, message.Offset, e.Message); - public void OnPartitionEndReached(TopicPartitionOffset offset) + public void OnPartitionEndReached() { if (CheckpointTrigger != null) { - if (offset != null) - { - Commit(offset); - } + Commit(_lastOffset); } } diff --git a/src/SlimMessageBus.Host.Kafka/KafkaMessageBus.cs b/src/SlimMessageBus.Host.Kafka/KafkaMessageBus.cs index 39e9f488..0a6c5580 100644 --- a/src/SlimMessageBus.Host.Kafka/KafkaMessageBus.cs +++ b/src/SlimMessageBus.Host.Kafka/KafkaMessageBus.cs @@ -62,7 +62,7 @@ protected override async Task CreateConsumers() void AddGroupConsumer(string group, IReadOnlyCollection topics, Func processorFactory) { _logger.LogInformation("Creating consumer group {ConsumerGroup}", group); - AddConsumer(new KafkaGroupConsumer(this, group, topics, processorFactory)); + AddConsumer(new KafkaGroupConsumer(LoggerFactory, ProviderSettings, group, topics, processorFactory)); } IKafkaPartitionConsumer ResponseProcessorFactory(TopicPartition tp, IKafkaCommitController cc) => new KafkaPartitionConsumerForResponses(LoggerFactory, Settings.RequestResponse, Settings.RequestResponse.GetGroup(), tp, cc, this, HeaderSerializer); @@ -147,8 +147,14 @@ protected override async Task> ProduceToTranspor ? GetMessagePartition(producerSettings, messageType, envelope.Message, path) : NoPartition; - _logger.LogTrace("Producing message {Message} of type {MessageType}, on topic {Topic}, partition {Partition}, key size {KeySize}, payload size {MessageSize}", - envelope.Message, messageType?.Name, path, partition, key?.Length ?? 0, messagePayload?.Length ?? 0); + _logger.LogDebug("Producing message {Message} of type {MessageType}, topic {Topic}, partition {Partition}, key size {KeySize}, payload size {MessageSize}, headers count {MessageHeaderCount}", + envelope.Message, + messageType?.Name, + path, + partition, + key?.Length ?? 0, + messagePayload?.Length ?? 0, + kafkaMessage.Headers?.Count ?? 0); // send the message to topic var task = partition == NoPartition diff --git a/src/SlimMessageBus.Host.Kafka/SlimMessageBus.Host.Kafka.csproj b/src/SlimMessageBus.Host.Kafka/SlimMessageBus.Host.Kafka.csproj index 11bc4c76..389d080d 100644 --- a/src/SlimMessageBus.Host.Kafka/SlimMessageBus.Host.Kafka.csproj +++ b/src/SlimMessageBus.Host.Kafka/SlimMessageBus.Host.Kafka.csproj @@ -10,7 +10,7 @@ - + @@ -18,4 +18,8 @@ + + + + diff --git a/src/Tests/SlimMessageBus.Host.Kafka.Test/Consumer/KafkaGroupConsumerTests.cs b/src/Tests/SlimMessageBus.Host.Kafka.Test/Consumer/KafkaGroupConsumerTests.cs new file mode 100644 index 00000000..4dc38daa --- /dev/null +++ b/src/Tests/SlimMessageBus.Host.Kafka.Test/Consumer/KafkaGroupConsumerTests.cs @@ -0,0 +1,46 @@ +namespace SlimMessageBus.Host.Kafka.Test; + +public class KafkaGroupConsumerTests +{ + private readonly Mock> _loggerMock = new(); + private readonly KafkaGroupConsumer _subject; + + public KafkaGroupConsumerTests() + { + var loggerFactoryMock = new Mock(); + loggerFactoryMock.Setup(x => x.CreateLogger(It.IsAny())).Returns(_loggerMock.Object); + + var processorFactoryMock = new Mock>(); + + var providerSettings = new KafkaMessageBusSettings("host"); + + var subjectMock = new Mock(loggerFactoryMock.Object, providerSettings, "group", new List { "topic" }, processorFactoryMock.Object) { CallBase = true }; + _subject = subjectMock.Object; + } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public void When_OnOffsetsCommitted_Given_Succeeded_Then_ShouldLog(bool debugEnabled) + { + // arrangen + _loggerMock.Setup(x => x.IsEnabled(LogLevel.Debug)).Returns(debugEnabled); + + var tpo = new TopicPartitionOffset("topic", 0, 10); + var committedOffset = new CommittedOffsets([new TopicPartitionOffsetError(tpo, null)], new Error(ErrorCode.NoError)); + + // act + _subject.OnOffsetsCommitted(committedOffset); + + // assert + + _loggerMock.Verify(x => x.Log(LogLevel.Information, It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny>()), Times.Once); + _loggerMock.Verify(x => x.IsEnabled(LogLevel.Debug), Times.Once); + if (debugEnabled) + { + _loggerMock.Verify(x => x.Log(LogLevel.Debug, It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny>()), Times.Once); + } + _loggerMock.VerifyNoOtherCalls(); + } + +} \ No newline at end of file diff --git a/src/Tests/SlimMessageBus.Host.Kafka.Test/Consumer/KafkaPartitionConsumerForConsumersTest.cs b/src/Tests/SlimMessageBus.Host.Kafka.Test/Consumer/KafkaPartitionConsumerForConsumersTest.cs index 7d08f81a..53fa9eb0 100644 --- a/src/Tests/SlimMessageBus.Host.Kafka.Test/Consumer/KafkaPartitionConsumerForConsumersTest.cs +++ b/src/Tests/SlimMessageBus.Host.Kafka.Test/Consumer/KafkaPartitionConsumerForConsumersTest.cs @@ -1,12 +1,6 @@ namespace SlimMessageBus.Host.Kafka.Test; -using Confluent.Kafka; - -using Microsoft.Extensions.Logging.Abstractions; - -using SlimMessageBus.Host; - -using ConsumeResult = Confluent.Kafka.ConsumeResult; +using ConsumeResult = ConsumeResult; public class KafkaPartitionConsumerForConsumersTest : IDisposable { @@ -41,7 +35,7 @@ public KafkaPartitionConsumerForConsumersTest() var headerSerializer = new StringValueSerializer(); - _subject = new Lazy(() => new KafkaPartitionConsumerForConsumers(massageBusMock.Bus.LoggerFactory, new[] { _consumerBuilder.ConsumerSettings }, group, _topicPartition, _commitControllerMock.Object, headerSerializer, massageBusMock.Bus)); + _subject = new Lazy(() => new KafkaPartitionConsumerForConsumers(massageBusMock.Bus.LoggerFactory, [_consumerBuilder.ConsumerSettings], group, _topicPartition, _commitControllerMock.Object, headerSerializer, massageBusMock.Bus)); } public void Dispose() @@ -64,7 +58,7 @@ public async Task When_OnPartitionEndReached_Then_ShouldCommit() await _subject.Value.OnMessage(message); // act - _subject.Value.OnPartitionEndReached(message.TopicPartitionOffset); + _subject.Value.OnPartitionEndReached(); // assert _commitControllerMock.Verify(x => x.Commit(message.TopicPartitionOffset), Times.Once); @@ -114,7 +108,7 @@ private ConsumeResult GetSomeMessage(int offsetAdd = 0) Topic = _topicPartition.Topic, Partition = _topicPartition.Partition, Offset = 10 + offsetAdd, - Message = new Message { Key = null, Value = new byte[] { 10, 20 } }, + Message = new Message { Key = null, Value = [10, 20] }, IsPartitionEOF = false, }; } diff --git a/src/Tests/SlimMessageBus.Host.Kafka.Test/Consumer/KafkaPartitionConsumerForResponsesTest.cs b/src/Tests/SlimMessageBus.Host.Kafka.Test/Consumer/KafkaPartitionConsumerForResponsesTest.cs index 6cd6f6ee..4d426782 100644 --- a/src/Tests/SlimMessageBus.Host.Kafka.Test/Consumer/KafkaPartitionConsumerForResponsesTest.cs +++ b/src/Tests/SlimMessageBus.Host.Kafka.Test/Consumer/KafkaPartitionConsumerForResponsesTest.cs @@ -2,9 +2,7 @@ using System.Text; -using Confluent.Kafka; - -using ConsumeResult = Confluent.Kafka.ConsumeResult; +using ConsumeResult = ConsumeResult; public class KafkaPartitionConsumerForResponsesTest : IDisposable { @@ -45,13 +43,17 @@ public void When_NewInstance_Then_TopicPartitionSet() } [Fact] - public void When_OnPartitionEndReached_Then_ShouldCommit() + public async Task When_OnPartitionEndReached_Then_ShouldCommit() { // arrange var partition = new TopicPartitionOffset(_topicPartition, new Offset(10)); + var message = GetSomeMessage(); + + _subject.OnPartitionAssigned(_topicPartition); + await _subject.OnMessage(message); // act - _subject.OnPartitionEndReached(partition); + _subject.OnPartitionEndReached(); // assert _commitControllerMock.Verify(x => x.Commit(partition), Times.Once); @@ -125,7 +127,7 @@ private ConsumeResult GetSomeMessage() Message = new Message { Key = null, - Value = new byte[] { 10, 20 }, + Value = [10, 20], Headers = new Headers { { "test-header", Encoding.UTF8.GetBytes("test-value") } @@ -145,3 +147,4 @@ public void Dispose() #endregion } + diff --git a/src/Tests/SlimMessageBus.Host.Kafka.Test/GlobalUsings.cs b/src/Tests/SlimMessageBus.Host.Kafka.Test/GlobalUsings.cs index 440af700..1af372fa 100644 --- a/src/Tests/SlimMessageBus.Host.Kafka.Test/GlobalUsings.cs +++ b/src/Tests/SlimMessageBus.Host.Kafka.Test/GlobalUsings.cs @@ -3,12 +3,12 @@ global using FluentAssertions; global using Microsoft.Extensions.Logging; +global using Microsoft.Extensions.Logging.Abstractions; global using Moq; global using SecretStore; -global using SlimMessageBus.Host; global using SlimMessageBus.Host.Serialization; global using Xunit; diff --git a/src/Tests/SlimMessageBus.Host.Kafka.Test/KafkaMessageBusIt.cs b/src/Tests/SlimMessageBus.Host.Kafka.Test/KafkaMessageBusIt.cs index ce9531d6..c4e94b73 100644 --- a/src/Tests/SlimMessageBus.Host.Kafka.Test/KafkaMessageBusIt.cs +++ b/src/Tests/SlimMessageBus.Host.Kafka.Test/KafkaMessageBusIt.cs @@ -23,10 +23,11 @@ namespace SlimMessageBus.Host.Kafka.Test; /// [Trait("Category", "Integration")] [Trait("Transport", "Kafka")] -public class KafkaMessageBusIt : BaseIntegrationTest -{ +public class KafkaMessageBusIt(ITestOutputHelper testOutputHelper) + : BaseIntegrationTest(testOutputHelper) +{ private const int NumberOfMessages = 77; - private string TopicPrefix { get; set; } + private string TopicPrefix { get; set; } private static void AddSsl(string username, string password, ClientConfig c) { @@ -37,10 +38,6 @@ private static void AddSsl(string username, string password, ClientConfig c) c.SaslMechanism = SaslMechanism.ScramSha256; c.SslCaLocation = "cloudkarafka_2023-10.pem"; } - - public KafkaMessageBusIt(ITestOutputHelper testOutputHelper) : base(testOutputHelper) - { - } protected override void SetupServices(ServiceCollection services, IConfigurationRoot configuration) { @@ -60,26 +57,26 @@ protected override void SetupServices(ServiceCollection services, IConfiguration cfg.BrokerList = kafkaBrokers; cfg.ProducerConfig = (config) => { + config.LingerMs = 5; // 5ms + config.SocketNagleDisable = true; + if (kafkaSecure) { AddSsl(kafkaUsername, kafkaPassword, config); } - config.LingerMs = 5; // 5ms - config.SocketNagleDisable = true; }; cfg.ConsumerConfig = (config) => { + config.FetchErrorBackoffMs = 1; + config.SocketNagleDisable = true; + // when the test containers start there is no consumer group yet, so we want to start from the beginning + config.AutoOffsetReset = AutoOffsetReset.Earliest; + if (kafkaSecure) { AddSsl(kafkaUsername, kafkaPassword, config); } - - config.FetchErrorBackoffMs = 1; - config.SocketNagleDisable = true; - - config.StatisticsIntervalMs = 500000; - config.AutoOffsetReset = AutoOffsetReset.Latest; }; }); mbb.AddServicesFromAssemblyContaining(); @@ -134,13 +131,13 @@ public async Task BasicPubSub() var messages = Enumerable .Range(0, NumberOfMessages) - .Select(i => new PingMessage { Counter = i, Timestamp = DateTime.UtcNow }) + .Select(i => new PingMessage(DateTime.UtcNow, i)) .ToList(); await Task.WhenAll(messages.Select(m => messageBus.Publish(m))); stopwatch.Stop(); - Logger.LogInformation("Published {0} messages in {1}", messages.Count, stopwatch.Elapsed); + Logger.LogInformation("Published {MessageCount} messages in {PublishTime}", messages.Count, stopwatch.Elapsed); // consume stopwatch.Restart(); @@ -148,7 +145,7 @@ public async Task BasicPubSub() await consumedMessages.WaitUntilArriving(newMessagesTimeout: 5); stopwatch.Stop(); - Logger.LogInformation("Consumed {0} messages in {1}", consumedMessages.Count, stopwatch.Elapsed); + Logger.LogInformation("Consumed {MessageCount} messages in {ConsumedTime}", consumedMessages.Count, stopwatch.Elapsed); // assert @@ -177,7 +174,7 @@ public async Task BasicReqResp() AddBusConfiguration(mbb => { - var topic = $"{TopicPrefix}test-echo"; + var topic = $"{TopicPrefix}test-echo"; mbb .Produce(x => { @@ -190,8 +187,8 @@ public async Task BasicReqResp() .WithHandler() .KafkaGroup("handler") .Instances(2) - .CheckpointEvery(1000) - .CheckpointAfter(TimeSpan.FromSeconds(60))) + .CheckpointEvery(100) + .CheckpointAfter(TimeSpan.FromSeconds(10))) .ExpectRequestResponses(x => { x.ReplyToTopic($"{TopicPrefix}test-echo-resp"); @@ -209,14 +206,21 @@ public async Task BasicReqResp() var requests = Enumerable .Range(0, NumberOfMessages) - .Select(i => new EchoRequest { Index = i, Message = $"Echo {i}" }) + .Select(i => new EchoRequest(i, $"Echo {i}")) .ToList(); var responses = new ConcurrentBag>(); await Task.WhenAll(requests.Select(async req => { - var resp = await kafkaMessageBus.Send(req); - responses.Add((req, resp)); + try + { + var resp = await kafkaMessageBus.Send(req); + responses.Add((req, resp)); + } + catch (Exception ex) + { + Logger.LogError(ex, "Failed to send request {RequestIndex:000}", req.Index); + } })); await responses.WaitUntilArriving(newMessagesTimeout: 5); @@ -228,72 +232,34 @@ await Task.WhenAll(requests.Select(async req => responses.All(x => x.Item1.Message == x.Item2.Message).Should().BeTrue(); } - private class PingMessage - { - public DateTime Timestamp { get; set; } - public int Counter { get; set; } - - #region Overrides of Object - - public override string ToString() => $"PingMessage(Counter={Counter}, Timestamp={Timestamp})"; - - #endregion - } + private record PingMessage(DateTime Timestamp, int Counter); record struct ConsumedMessage(PingMessage Message, int Partition); private class PingConsumer(ILogger logger, TestEventCollector messages) : IConsumer, IConsumerWithContext { - private readonly ILogger _logger = logger; - private readonly TestEventCollector _messages = messages; - public IConsumerContext Context { get; set; } - #region Implementation of IConsumer - public Task OnHandle(PingMessage message) { var transportMessage = Context.GetTransportMessage(); var partition = transportMessage.TopicPartition.Partition; - _messages.Add(new ConsumedMessage(message, partition)); + messages.Add(new ConsumedMessage(message, partition)); - _logger.LogInformation("Got message {0:000} on topic {1}.", message.Counter, Context.Path); + logger.LogInformation("Got message {MessageCounter:000} on topic {TopicName}.", message.Counter, Context.Path); return Task.CompletedTask; } - - #endregion - } - - private class EchoRequest /*: IRequest*/ - { - public int Index { get; set; } - public string Message { get; set; } - - #region Overrides of Object - - public override string ToString() => $"EchoRequest(Index={Index}, Message={Message})"; - - #endregion } - private class EchoResponse - { - public string Message { get; set; } - - #region Overrides of Object + private record EchoRequest(int Index, string Message); - public override string ToString() => $"EchoResponse(Message={Message})"; - - #endregion - } + private record EchoResponse(string Message); private class EchoRequestHandler : IRequestHandler { - public Task OnHandle(EchoRequest request) - { - return Task.FromResult(new EchoResponse { Message = request.Message }); - } + public Task OnHandle(EchoRequest request) + => Task.FromResult(new EchoResponse(request.Message)); } } \ No newline at end of file diff --git a/src/Tests/SlimMessageBus.Host.Kafka.Test/appsettings.json b/src/Tests/SlimMessageBus.Host.Kafka.Test/appsettings.json index f17805a7..066a96f5 100644 --- a/src/Tests/SlimMessageBus.Host.Kafka.Test/appsettings.json +++ b/src/Tests/SlimMessageBus.Host.Kafka.Test/appsettings.json @@ -5,6 +5,8 @@ "Override": { "SlimMessageBus": "Information", "SlimMessageBus.Host.Kafka.KafkaGroupConsumer": "Debug", + "SlimMessageBus.Host.Kafka.KafkaPartitionConsumer": "Verbose", + "SlimMessageBus.Host.Kafka.KafkaMessageBus": "Debug", "Microsoft": "Warning" } } diff --git a/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/OutboxTests.cs b/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/OutboxTests.cs index c5cefe3c..03b42a3a 100644 --- a/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/OutboxTests.cs +++ b/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/OutboxTests.cs @@ -27,7 +27,7 @@ static void AddKafkaSsl(string username, string password, ClientConfig c) void ConfigureExternalBus(MessageBusBuilder mbb) { - var topic = ""; + var topic = "test-ping"; if (_testParamBusType == BusType.Kafka) { var kafkaBrokers = Secrets.Service.PopulateSecrets(configuration["Kafka:Brokers"]); @@ -52,9 +52,8 @@ void ConfigureExternalBus(MessageBusBuilder mbb) { config.FetchErrorBackoffMs = 1; config.SocketNagleDisable = true; - - config.StatisticsIntervalMs = 500000; - config.AutoOffsetReset = AutoOffsetReset.Latest; + // when the test containers start there is no consumer group yet, so we want to start from the beginning + config.AutoOffsetReset = AutoOffsetReset.Earliest; if (kafkaSecure) { @@ -62,8 +61,6 @@ void ConfigureExternalBus(MessageBusBuilder mbb) } }; }); - - topic = $"{kafkaUsername}-test-ping"; } if (_testParamBusType == BusType.AzureSB) {