Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Host.Kafka] Fix and improve commit logic #301

Merged
merged 1 commit into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
[Host.Kafka] Fix how PartitionEOF offset is comitted
[Host.Kafka] Upgrade to latest Confluent.Kafka.Net library.
[Host.Kafka] Extend the kafka docs.
[Host.Kafka] Use test containers for Kafka int tests.

Signed-off-by: Tomasz Maruszak <maruszaktomasz@gmail.com>
  • Loading branch information
zarusz committed Sep 17, 2024
commit 9e4e400591cd051ed7232629ce64d0985d27abc8
18 changes: 10 additions & 8 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ jobs:
working-directory: ./src

- name: Install Coverlet
#if: false
run: find . -name "*.Test.csproj" | xargs -t -I {} dotnet add {} package coverlet.collector
working-directory: ./src

Expand Down Expand Up @@ -119,10 +120,10 @@ jobs:
azure_eventhub_connectionstring: ${{ secrets.azure_eventhub_connectionstring }}
azure_storagecontainer_connectionstring: ${{ secrets.azure_storagecontainer_connectionstring }}

kafka_brokers: ${{ secrets.kafka_brokers }}
kafka_username: ${{ secrets.kafka_username }}
kafka_password: ${{ secrets.kafka_password }}
kafka_secure: ${{ secrets.kafka_secure }}
#kafka_brokers: ${{ secrets.kafka_brokers }}
#kafka_username: ${{ secrets.kafka_username }}
#kafka_password: ${{ secrets.kafka_password }}
#kafka_secure: ${{ secrets.kafka_secure }}

_mqtt_server: ${{ secrets.mqtt_server }}
_mqtt_port: ${{ secrets.mqtt_port }}
Expand All @@ -137,10 +138,10 @@ jobs:
sqlserver_connectionstring: ${{ secrets.sqlserver_connectionstring }}

# Connects to the local Test Containers
_kafka_brokers: localhost:9092
_kafka_username: user
_kafka_password: password
_kafka_secure: false
kafka_brokers: localhost:9092
kafka_username: user
kafka_password: password
kafka_secure: false

mqtt_server: localhost
mqtt_port: 1883
Expand Down Expand Up @@ -192,6 +193,7 @@ jobs:
name: .NET Tests
path: ./test-results/*.trx
reporter: dotnet-trx
fail-on-error: false

- name: Copy NuGet packages
shell: bash
Expand Down
36 changes: 33 additions & 3 deletions docs/provider_kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Please read the [Introduction](intro.md) before reading this provider documentat
- [Consumers](#consumers)
- [Offset Commit](#offset-commit)
- [Consumer Error Handling](#consumer-error-handling)
- [Debugging](#debugging)
- [Deployment](#deployment)

## Underlying client
Expand All @@ -22,9 +23,9 @@ The SMB Kafka implementation uses [confluent-kafka-dotnet](https://github.com/co

When troubleshooting or fine tuning it is worth reading the `librdkafka` and `confluent-kafka-dotnet` docs:

- [Introduction](https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md
- [Broker version compatibility](https://github.com/edenhill/librdkafka/wiki/Broker-version-compatibility)
- [Using SSL with librdkafka](https://github.com/edenhill/librdkafka/wiki/Using-SSL-with-librdkafka)
- [Introduction](https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md)
- [Broker version compatibility](https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#broker-version-compatibility)
- [Using SSL with librdkafka](https://github.com/confluentinc/librdkafka/wiki/Using-SSL-with-librdkafka)

## Configuration properties

Expand Down Expand Up @@ -246,6 +247,35 @@ The error handler can perform the following actions:

If no custom error handler is provided, the provider logs the exception and moves on to process the next message.

### Debugging

Kafka uses a sophisticated protocol for partition assignment:

- Partition assignments may change due to factors like rebalancing.
- A running consumer instance might not receive any partitions if there are more consumers than partitions for a given topic.

To better understand what's happening, you can enable Debug level logging in your library, such as SlimMessageBus.Host.Kafka.KafkaGroupConsumer.

At this logging level, you can track the lifecycle events of the consumer group:

```
[00:03:06 INF] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Subscribing to topics: 4p5ma6io-test-ping
[00:03:06 INF] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Consumer loop started
[00:03:12 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Assigned partition, Topic: 4p5ma6io-test-ping, Partition: [0]
[00:03:12 INF] SlimMessageBus.Host.Kafka.KafkaPartitionConsumer Creating consumer for Group: subscriber, Topic: 4p5ma6io-test-ping, Partition: [0]
[00:03:12 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Assigned partition, Topic: 4p5ma6io-test-ping, Partition: [1]
[00:03:12 INF] SlimMessageBus.Host.Kafka.KafkaPartitionConsumer Creating consumer for Group: subscriber, Topic: 4p5ma6io-test-ping, Partition: [1]
...
[00:03:15 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Received message with Topic: 4p5ma6io-test-ping, Partition: [1], Offset: 98578, payload size: 57
[00:03:15 INF] SlimMessageBus.Host.Kafka.Test.KafkaMessageBusIt.PingConsumer Got message 073 on topic 4p5ma6io-test-ping.
[00:03:15 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Received message with Topic: 4p5ma6io-test-ping, Partition: [1], Offset: 98579, payload size: 57
[00:03:15 INF] SlimMessageBus.Host.Kafka.Test.KafkaMessageBusIt.PingConsumer Got message 075 on topic 4p5ma6io-test-ping.
[00:03:16 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Reached end of partition, Topic: 4p5ma6io-test-ping, Partition: [0], Offset: 100403
[00:03:16 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Commit Offset, Topic: 4p5ma6io-test-ping, Partition: [0], Offset: 100402
[00:03:16 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Reached end of partition, Topic: 4p5ma6io-test-ping, Partition: [1], Offset: 98580
[00:03:16 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Commit Offset, Topic: 4p5ma6io-test-ping, Partition: [1], Offset: 98579
```

## Deployment

The `librdkafka` distribution for Windows requires [Visual C++ Redistributable for 2013](https://www.microsoft.com/en-US/download/details.aspx?id=40784) installed on the server. More information can be found [here](https://www.microsoft.com/en-US/download/details.aspx?id=40784).
43 changes: 37 additions & 6 deletions docs/provider_kafka.t.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Please read the [Introduction](intro.md) before reading this provider documentat
- [Consumers](#consumers)
- [Offset Commit](#offset-commit)
- [Consumer Error Handling](#consumer-error-handling)
- [Debugging](#debugging)
- [Deployment](#deployment)

## Underlying client
Expand All @@ -22,13 +23,13 @@ The SMB Kafka implementation uses [confluent-kafka-dotnet](https://github.com/co

When troubleshooting or fine tuning it is worth reading the `librdkafka` and `confluent-kafka-dotnet` docs:

- [Introduction](https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md
- [Broker version compatibility](https://github.com/edenhill/librdkafka/wiki/Broker-version-compatibility)
- [Using SSL with librdkafka](https://github.com/edenhill/librdkafka/wiki/Using-SSL-with-librdkafka)
- [Introduction](https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md)
- [Broker version compatibility](https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#broker-version-compatibility)
- [Using SSL with librdkafka](https://github.com/confluentinc/librdkafka/wiki/Using-SSL-with-librdkafka)

## Configuration properties

Producer, consumer and global configuration properties are described [here](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md).
Producer, consumer and global configuration properties are described [here](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md).
The configuration on the underlying Kafka client can be adjusted like so:

```cs
Expand All @@ -52,7 +53,7 @@ services.AddSlimMessageBus(mbb =>

### Minimizing message latency

There is a good description [here](https://github.com/edenhill/librdkafka/wiki/How-to-decrease-message-latency) on improving the latency by applying producer/consumer settings on librdkafka. Here is how you enter the settings using SlimMessageBus:
There is a good description [here](https://github.com/confluentinc/librdkafka/wiki/How-to-decrease-message-latency) on improving the latency by applying producer/consumer settings on librdkafka. Here is how you enter the settings using SlimMessageBus:

```cs
services.AddSlimMessageBus(mbb =>
Expand Down Expand Up @@ -210,7 +211,8 @@ mbb

### Offset Commit

In the current Kafka provider implementation, SMB handles the manual commit of topic-partition offsets for the consumer. This configuration is controlled through the following methods on the consumer builder:
In the current Kafka provider implementation, SMB handles the manual commit of topic-partition offsets for the consumer.Th
is configuration is controlled through the following methods on the consumer builder:

- `CheckpointEvery(int)` – Commits the offset after a specified number of processed messages.
- `CheckpointAfter(TimeSpan)` – Commits the offset after a specified time interval.
Expand All @@ -236,6 +238,35 @@ The error handler can perform the following actions:

If no custom error handler is provided, the provider logs the exception and moves on to process the next message.

### Debugging

Kafka uses a sophisticated protocol for partition assignment:

- Partition assignments may change due to factors like rebalancing.
- A running consumer instance might not receive any partitions if there are more consumers than partitions for a given topic.

To better understand what's happening, you can enable Debug level logging in your library, such as SlimMessageBus.Host.Kafka.KafkaGroupConsumer.

At this logging level, you can track the lifecycle events of the consumer group:

```
[00:03:06 INF] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Subscribing to topics: 4p5ma6io-test-ping
[00:03:06 INF] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Consumer loop started
[00:03:12 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Assigned partition, Topic: 4p5ma6io-test-ping, Partition: [0]
[00:03:12 INF] SlimMessageBus.Host.Kafka.KafkaPartitionConsumer Creating consumer for Group: subscriber, Topic: 4p5ma6io-test-ping, Partition: [0]
[00:03:12 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Assigned partition, Topic: 4p5ma6io-test-ping, Partition: [1]
[00:03:12 INF] SlimMessageBus.Host.Kafka.KafkaPartitionConsumer Creating consumer for Group: subscriber, Topic: 4p5ma6io-test-ping, Partition: [1]
...
[00:03:15 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Received message with Topic: 4p5ma6io-test-ping, Partition: [1], Offset: 98578, payload size: 57
[00:03:15 INF] SlimMessageBus.Host.Kafka.Test.KafkaMessageBusIt.PingConsumer Got message 073 on topic 4p5ma6io-test-ping.
[00:03:15 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Received message with Topic: 4p5ma6io-test-ping, Partition: [1], Offset: 98579, payload size: 57
[00:03:15 INF] SlimMessageBus.Host.Kafka.Test.KafkaMessageBusIt.PingConsumer Got message 075 on topic 4p5ma6io-test-ping.
[00:03:16 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Reached end of partition, Topic: 4p5ma6io-test-ping, Partition: [0], Offset: 100403
[00:03:16 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Commit Offset, Topic: 4p5ma6io-test-ping, Partition: [0], Offset: 100402
[00:03:16 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Reached end of partition, Topic: 4p5ma6io-test-ping, Partition: [1], Offset: 98580
[00:03:16 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Commit Offset, Topic: 4p5ma6io-test-ping, Partition: [1], Offset: 98579
```

## Deployment

The `librdkafka` distribution for Windows requires [Visual C++ Redistributable for 2013](https://www.microsoft.com/en-US/download/details.aspx?id=40784) installed on the server. More information can be found [here](https://www.microsoft.com/en-US/download/details.aspx?id=40784).
2 changes: 1 addition & 1 deletion src/Host.Plugin.Properties.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<Import Project="Common.NuGet.Properties.xml" />

<PropertyGroup>
<Version>2.5.3-rc1</Version>
<Version>2.5.3-rc2</Version>
</PropertyGroup>

</Project>
10 changes: 4 additions & 6 deletions src/Infrastructure/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version: '3.4'
version: "3.4"

services:
zookeeper:
Expand All @@ -16,13 +16,13 @@ services:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_CREATE_TOPICS: "user-test-ping:2:1,user-test-echo:2:1"
KAFKA_CREATE_TOPICS: "user-test-ping:2:1,user-test-echo:2:1,user-test-echo-resp:2:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
depends_on:
- zookeeper
networks:
- slim

mqtt:
container_name: slim.mqtt
image: eclipse-mosquitto:2.0.18
Expand Down Expand Up @@ -74,16 +74,14 @@ services:
- "11002:11002"
networks:
- slim

nats:
container_name: slim.nats
image: nats:2.10
ports:
- 4222:4222
networks:
- slim


networks:
slim: {}

Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public interface IKafkaPartitionConsumer : IDisposable

void OnPartitionAssigned(TopicPartition partition);
Task OnMessage(ConsumeResult message);
void OnPartitionEndReached(TopicPartitionOffset offset);
void OnPartitionEndReached();
void OnPartitionRevoked();

void OnClose();
Expand Down
36 changes: 21 additions & 15 deletions src/SlimMessageBus.Host.Kafka/Consumer/KafkaGroupConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@
private Task _consumerTask;
private CancellationTokenSource _consumerCts;

public KafkaMessageBus MessageBus { get; }
public KafkaMessageBusSettings ProviderSettings { get; }
public string Group { get; }
public IReadOnlyCollection<string> Topics { get; }

public KafkaGroupConsumer(KafkaMessageBus messageBus, string group, IReadOnlyCollection<string> topics, Func<TopicPartition, IKafkaCommitController, IKafkaPartitionConsumer> processorFactory)
: base(messageBus.LoggerFactory.CreateLogger<KafkaGroupConsumer>())
public KafkaGroupConsumer(ILoggerFactory loggerFactory, KafkaMessageBusSettings providerSettings, string group, IReadOnlyCollection<string> topics, Func<TopicPartition, IKafkaCommitController, IKafkaPartitionConsumer> processorFactory)
: base(loggerFactory.CreateLogger<KafkaGroupConsumer>())
{
MessageBus = messageBus;
ProviderSettings = providerSettings ?? throw new ArgumentNullException(nameof(providerSettings));
Group = group ?? throw new ArgumentNullException(nameof(group));
Topics = topics ?? throw new ArgumentNullException(nameof(topics));

Expand Down Expand Up @@ -58,17 +58,17 @@
var config = new ConsumerConfig
{
GroupId = group,
BootstrapServers = MessageBus.ProviderSettings.BrokerList
BootstrapServers = ProviderSettings.BrokerList
};
MessageBus.ProviderSettings.ConsumerConfig(config);
ProviderSettings.ConsumerConfig(config);

// ToDo: add support for auto commit

Check warning on line 65 in src/SlimMessageBus.Host.Kafka/Consumer/KafkaGroupConsumer.cs

View workflow job for this annotation

GitHub Actions / build

Complete the task associated to this 'TODO' comment. (https://rules.sonarsource.com/csharp/RSPEC-1135)
config.EnableAutoCommit = false;
// Notify when we reach EoF, so that we can do a manual commit
config.EnablePartitionEof = true;
config.EnablePartitionEof = true;

var consumer = MessageBus.ProviderSettings.ConsumerBuilderFactory(config)
.SetStatisticsHandler((_, json) => OnStatistics(json))
var consumer = ProviderSettings.ConsumerBuilderFactory(config)
.SetStatisticsHandler((_, json) => OnStatistics(json))
.SetPartitionsAssignedHandler((_, partitions) => OnPartitionAssigned(partitions))
.SetPartitionsRevokedHandler((_, partitions) => OnPartitionRevoked(partitions))
.SetOffsetsCommittedHandler((_, offsets) => OnOffsetsCommitted(offsets))
Expand Down Expand Up @@ -120,7 +120,7 @@
}
catch (ConsumeException e)
{
var pollRetryInterval = MessageBus.ProviderSettings.ConsumerPollRetryInterval;
var pollRetryInterval = ProviderSettings.ConsumerPollRetryInterval;

Logger.LogError(e, "Group [{Group}]: Error occurred while polling new messages (will retry in {RetryInterval}) - {Reason}", Group, pollRetryInterval, e.Error.Reason);
await Task.Delay(pollRetryInterval, _consumerCts.Token).ConfigureAwait(false);
Expand All @@ -128,13 +128,13 @@
}
}
catch (OperationCanceledException)
{

Check warning on line 131 in src/SlimMessageBus.Host.Kafka/Consumer/KafkaGroupConsumer.cs

View workflow job for this annotation

GitHub Actions / build

Either remove or fill this block of code. (https://rules.sonarsource.com/csharp/RSPEC-108)
}

Logger.LogInformation("Group [{Group}]: Unsubscribing from topics", Group);
_consumer.Unsubscribe();

if (MessageBus.ProviderSettings.EnableCommitOnBusStop)
if (ProviderSettings.EnableCommitOnBusStop)
{
OnClose();
}
Expand Down Expand Up @@ -201,7 +201,7 @@
Logger.LogDebug("Group [{Group}]: Reached end of partition, Topic: {Topic}, Partition: {Partition}, Offset: {Offset}", Group, offset.Topic, offset.Partition, offset.Offset);

var processor = _processors[offset.TopicPartition];
processor.OnPartitionEndReached(offset);
processor.OnPartitionEndReached();
}

protected async virtual ValueTask OnMessage(ConsumeResult message)
Expand All @@ -212,15 +212,21 @@
await processor.OnMessage(message).ConfigureAwait(false);
}

protected virtual void OnOffsetsCommitted(CommittedOffsets e)
protected internal virtual void OnOffsetsCommitted(CommittedOffsets e)
{
if (e.Error.IsError || e.Error.IsFatal)
{
Logger.LogWarning("Group [{Group}]: Failed to commit offsets: [{Offsets}], error: {error}", Group, string.Join(", ", e.Offsets), e.Error.Reason);
if (Logger.IsEnabled(LogLevel.Warning))
{
Logger.LogWarning("Group [{Group}]: Failed to commit offsets: [{Offsets}], error: {ErrorMessage}", Group, string.Join(", ", e.Offsets), e.Error.Reason);
}
}
else
{
Logger.LogTrace("Group [{Group}]: Successfully committed offsets: [{Offsets}]", Group, string.Join(", ", e.Offsets));
if (Logger.IsEnabled(LogLevel.Debug))
{
Logger.LogDebug("Group [{Group}]: Successfully committed offsets: [{Offsets}]", Group, string.Join(", ", e.Offsets));
}
}
}

Expand All @@ -235,7 +241,7 @@

protected virtual void OnStatistics(string json)
{
Logger.LogTrace("Group [{Group}]: Statistics: {statistics}", Group, json);

Check warning on line 244 in src/SlimMessageBus.Host.Kafka/Consumer/KafkaGroupConsumer.cs

View workflow job for this annotation

GitHub Actions / build

Use PascalCase for named placeholders. (https://rules.sonarsource.com/csharp/RSPEC-6678)
}

#region Implementation of IKafkaCoordinator
Expand Down
Loading
Loading