Skip to content

Commit

Permalink
[Host.Kafka] Fix how PartitionEOF offset is comitted
Browse files Browse the repository at this point in the history
[Host.Kafka] Upgrade to latest Confluent.Kafka.Net library.
[Host.Kafka] Extend the kafka docs.
[Host.Kafka] Use test containers for Kafka int tests.

Signed-off-by: Tomasz Maruszak <maruszaktomasz@gmail.com>
  • Loading branch information
zarusz committed Sep 17, 2024
1 parent 5ea6856 commit 75da769
Show file tree
Hide file tree
Showing 17 changed files with 250 additions and 149 deletions.
18 changes: 10 additions & 8 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ jobs:
working-directory: ./src

- name: Install Coverlet
#if: false
run: find . -name "*.Test.csproj" | xargs -t -I {} dotnet add {} package coverlet.collector
working-directory: ./src

Expand Down Expand Up @@ -119,10 +120,10 @@ jobs:
azure_eventhub_connectionstring: ${{ secrets.azure_eventhub_connectionstring }}
azure_storagecontainer_connectionstring: ${{ secrets.azure_storagecontainer_connectionstring }}

kafka_brokers: ${{ secrets.kafka_brokers }}
kafka_username: ${{ secrets.kafka_username }}
kafka_password: ${{ secrets.kafka_password }}
kafka_secure: ${{ secrets.kafka_secure }}
#kafka_brokers: ${{ secrets.kafka_brokers }}
#kafka_username: ${{ secrets.kafka_username }}
#kafka_password: ${{ secrets.kafka_password }}
#kafka_secure: ${{ secrets.kafka_secure }}

_mqtt_server: ${{ secrets.mqtt_server }}
_mqtt_port: ${{ secrets.mqtt_port }}
Expand All @@ -137,10 +138,10 @@ jobs:
sqlserver_connectionstring: ${{ secrets.sqlserver_connectionstring }}

# Connects to the local Test Containers
_kafka_brokers: localhost:9092
_kafka_username: user
_kafka_password: password
_kafka_secure: false
kafka_brokers: localhost:9092
kafka_username: user
kafka_password: password
kafka_secure: false

mqtt_server: localhost
mqtt_port: 1883
Expand Down Expand Up @@ -192,6 +193,7 @@ jobs:
name: .NET Tests
path: ./test-results/*.trx
reporter: dotnet-trx
fail-on-error: false

- name: Copy NuGet packages
shell: bash
Expand Down
36 changes: 33 additions & 3 deletions docs/provider_kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Please read the [Introduction](intro.md) before reading this provider documentat
- [Consumers](#consumers)
- [Offset Commit](#offset-commit)
- [Consumer Error Handling](#consumer-error-handling)
- [Debugging](#debugging)
- [Deployment](#deployment)

## Underlying client
Expand All @@ -22,9 +23,9 @@ The SMB Kafka implementation uses [confluent-kafka-dotnet](https://github.com/co

When troubleshooting or fine tuning it is worth reading the `librdkafka` and `confluent-kafka-dotnet` docs:

- [Introduction](https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md
- [Broker version compatibility](https://github.com/edenhill/librdkafka/wiki/Broker-version-compatibility)
- [Using SSL with librdkafka](https://github.com/edenhill/librdkafka/wiki/Using-SSL-with-librdkafka)
- [Introduction](https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md)
- [Broker version compatibility](https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#broker-version-compatibility)
- [Using SSL with librdkafka](https://github.com/confluentinc/librdkafka/wiki/Using-SSL-with-librdkafka)

## Configuration properties

Expand Down Expand Up @@ -246,6 +247,35 @@ The error handler can perform the following actions:

If no custom error handler is provided, the provider logs the exception and moves on to process the next message.

### Debugging

Kafka uses a sophisticated protocol for partition assignment:

- Partition assignments may change due to factors like rebalancing.
- A running consumer instance might not receive any partitions if there are more consumers than partitions for a given topic.

To better understand what's happening, you can enable Debug level logging in your library, such as SlimMessageBus.Host.Kafka.KafkaGroupConsumer.

At this logging level, you can track the lifecycle events of the consumer group:

```
[00:03:06 INF] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Subscribing to topics: 4p5ma6io-test-ping
[00:03:06 INF] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Consumer loop started
[00:03:12 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Assigned partition, Topic: 4p5ma6io-test-ping, Partition: [0]
[00:03:12 INF] SlimMessageBus.Host.Kafka.KafkaPartitionConsumer Creating consumer for Group: subscriber, Topic: 4p5ma6io-test-ping, Partition: [0]
[00:03:12 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Assigned partition, Topic: 4p5ma6io-test-ping, Partition: [1]
[00:03:12 INF] SlimMessageBus.Host.Kafka.KafkaPartitionConsumer Creating consumer for Group: subscriber, Topic: 4p5ma6io-test-ping, Partition: [1]
...
[00:03:15 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Received message with Topic: 4p5ma6io-test-ping, Partition: [1], Offset: 98578, payload size: 57
[00:03:15 INF] SlimMessageBus.Host.Kafka.Test.KafkaMessageBusIt.PingConsumer Got message 073 on topic 4p5ma6io-test-ping.
[00:03:15 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Received message with Topic: 4p5ma6io-test-ping, Partition: [1], Offset: 98579, payload size: 57
[00:03:15 INF] SlimMessageBus.Host.Kafka.Test.KafkaMessageBusIt.PingConsumer Got message 075 on topic 4p5ma6io-test-ping.
[00:03:16 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Reached end of partition, Topic: 4p5ma6io-test-ping, Partition: [0], Offset: 100403
[00:03:16 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Commit Offset, Topic: 4p5ma6io-test-ping, Partition: [0], Offset: 100402
[00:03:16 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Reached end of partition, Topic: 4p5ma6io-test-ping, Partition: [1], Offset: 98580
[00:03:16 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Commit Offset, Topic: 4p5ma6io-test-ping, Partition: [1], Offset: 98579
```

## Deployment

The `librdkafka` distribution for Windows requires [Visual C++ Redistributable for 2013](https://www.microsoft.com/en-US/download/details.aspx?id=40784) installed on the server. More information can be found [here](https://www.microsoft.com/en-US/download/details.aspx?id=40784).
43 changes: 37 additions & 6 deletions docs/provider_kafka.t.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Please read the [Introduction](intro.md) before reading this provider documentat
- [Consumers](#consumers)
- [Offset Commit](#offset-commit)
- [Consumer Error Handling](#consumer-error-handling)
- [Debugging](#debugging)
- [Deployment](#deployment)

## Underlying client
Expand All @@ -22,13 +23,13 @@ The SMB Kafka implementation uses [confluent-kafka-dotnet](https://github.com/co

When troubleshooting or fine tuning it is worth reading the `librdkafka` and `confluent-kafka-dotnet` docs:

- [Introduction](https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md
- [Broker version compatibility](https://github.com/edenhill/librdkafka/wiki/Broker-version-compatibility)
- [Using SSL with librdkafka](https://github.com/edenhill/librdkafka/wiki/Using-SSL-with-librdkafka)
- [Introduction](https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md)
- [Broker version compatibility](https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#broker-version-compatibility)
- [Using SSL with librdkafka](https://github.com/confluentinc/librdkafka/wiki/Using-SSL-with-librdkafka)

## Configuration properties

Producer, consumer and global configuration properties are described [here](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md).
Producer, consumer and global configuration properties are described [here](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md).
The configuration on the underlying Kafka client can be adjusted like so:

```cs
Expand All @@ -52,7 +53,7 @@ services.AddSlimMessageBus(mbb =>

### Minimizing message latency

There is a good description [here](https://github.com/edenhill/librdkafka/wiki/How-to-decrease-message-latency) on improving the latency by applying producer/consumer settings on librdkafka. Here is how you enter the settings using SlimMessageBus:
There is a good description [here](https://github.com/confluentinc/librdkafka/wiki/How-to-decrease-message-latency) on improving the latency by applying producer/consumer settings on librdkafka. Here is how you enter the settings using SlimMessageBus:

```cs
services.AddSlimMessageBus(mbb =>
Expand Down Expand Up @@ -210,7 +211,8 @@ mbb

### Offset Commit

In the current Kafka provider implementation, SMB handles the manual commit of topic-partition offsets for the consumer. This configuration is controlled through the following methods on the consumer builder:
In the current Kafka provider implementation, SMB handles the manual commit of topic-partition offsets for the consumer.Th
is configuration is controlled through the following methods on the consumer builder:

- `CheckpointEvery(int)` – Commits the offset after a specified number of processed messages.
- `CheckpointAfter(TimeSpan)` – Commits the offset after a specified time interval.
Expand All @@ -236,6 +238,35 @@ The error handler can perform the following actions:

If no custom error handler is provided, the provider logs the exception and moves on to process the next message.

### Debugging

Kafka uses a sophisticated protocol for partition assignment:

- Partition assignments may change due to factors like rebalancing.
- A running consumer instance might not receive any partitions if there are more consumers than partitions for a given topic.

To better understand what's happening, you can enable Debug level logging in your library, such as SlimMessageBus.Host.Kafka.KafkaGroupConsumer.

At this logging level, you can track the lifecycle events of the consumer group:

```
[00:03:06 INF] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Subscribing to topics: 4p5ma6io-test-ping
[00:03:06 INF] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Consumer loop started
[00:03:12 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Assigned partition, Topic: 4p5ma6io-test-ping, Partition: [0]
[00:03:12 INF] SlimMessageBus.Host.Kafka.KafkaPartitionConsumer Creating consumer for Group: subscriber, Topic: 4p5ma6io-test-ping, Partition: [0]
[00:03:12 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Assigned partition, Topic: 4p5ma6io-test-ping, Partition: [1]
[00:03:12 INF] SlimMessageBus.Host.Kafka.KafkaPartitionConsumer Creating consumer for Group: subscriber, Topic: 4p5ma6io-test-ping, Partition: [1]
...
[00:03:15 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Received message with Topic: 4p5ma6io-test-ping, Partition: [1], Offset: 98578, payload size: 57
[00:03:15 INF] SlimMessageBus.Host.Kafka.Test.KafkaMessageBusIt.PingConsumer Got message 073 on topic 4p5ma6io-test-ping.
[00:03:15 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Received message with Topic: 4p5ma6io-test-ping, Partition: [1], Offset: 98579, payload size: 57
[00:03:15 INF] SlimMessageBus.Host.Kafka.Test.KafkaMessageBusIt.PingConsumer Got message 075 on topic 4p5ma6io-test-ping.
[00:03:16 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Reached end of partition, Topic: 4p5ma6io-test-ping, Partition: [0], Offset: 100403
[00:03:16 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Commit Offset, Topic: 4p5ma6io-test-ping, Partition: [0], Offset: 100402
[00:03:16 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Reached end of partition, Topic: 4p5ma6io-test-ping, Partition: [1], Offset: 98580
[00:03:16 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Commit Offset, Topic: 4p5ma6io-test-ping, Partition: [1], Offset: 98579
```

## Deployment

The `librdkafka` distribution for Windows requires [Visual C++ Redistributable for 2013](https://www.microsoft.com/en-US/download/details.aspx?id=40784) installed on the server. More information can be found [here](https://www.microsoft.com/en-US/download/details.aspx?id=40784).
2 changes: 1 addition & 1 deletion src/Host.Plugin.Properties.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<Import Project="Common.NuGet.Properties.xml" />

<PropertyGroup>
<Version>2.5.3-rc1</Version>
<Version>2.5.3-rc2</Version>
</PropertyGroup>

</Project>
10 changes: 4 additions & 6 deletions src/Infrastructure/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version: '3.4'
version: "3.4"

services:
zookeeper:
Expand All @@ -16,13 +16,13 @@ services:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_CREATE_TOPICS: "user-test-ping:2:1,user-test-echo:2:1"
KAFKA_CREATE_TOPICS: "user-test-ping:2:1,user-test-echo:2:1,user-test-echo-resp:2:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
depends_on:
- zookeeper
networks:
- slim

mqtt:
container_name: slim.mqtt
image: eclipse-mosquitto:2.0.18
Expand Down Expand Up @@ -74,16 +74,14 @@ services:
- "11002:11002"
networks:
- slim

nats:
container_name: slim.nats
image: nats:2.10
ports:
- 4222:4222
networks:
- slim


networks:
slim: {}

Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public interface IKafkaPartitionConsumer : IDisposable

void OnPartitionAssigned(TopicPartition partition);
Task OnMessage(ConsumeResult message);
void OnPartitionEndReached(TopicPartitionOffset offset);
void OnPartitionEndReached();
void OnPartitionRevoked();

void OnClose();
Expand Down
36 changes: 21 additions & 15 deletions src/SlimMessageBus.Host.Kafka/Consumer/KafkaGroupConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ public class KafkaGroupConsumer : AbstractConsumer, IKafkaCommitController
private Task _consumerTask;
private CancellationTokenSource _consumerCts;

public KafkaMessageBus MessageBus { get; }
public KafkaMessageBusSettings ProviderSettings { get; }
public string Group { get; }
public IReadOnlyCollection<string> Topics { get; }

public KafkaGroupConsumer(KafkaMessageBus messageBus, string group, IReadOnlyCollection<string> topics, Func<TopicPartition, IKafkaCommitController, IKafkaPartitionConsumer> processorFactory)
: base(messageBus.LoggerFactory.CreateLogger<KafkaGroupConsumer>())
public KafkaGroupConsumer(ILoggerFactory loggerFactory, KafkaMessageBusSettings providerSettings, string group, IReadOnlyCollection<string> topics, Func<TopicPartition, IKafkaCommitController, IKafkaPartitionConsumer> processorFactory)
: base(loggerFactory.CreateLogger<KafkaGroupConsumer>())
{
MessageBus = messageBus;
ProviderSettings = providerSettings ?? throw new ArgumentNullException(nameof(providerSettings));
Group = group ?? throw new ArgumentNullException(nameof(group));
Topics = topics ?? throw new ArgumentNullException(nameof(topics));

Expand Down Expand Up @@ -58,17 +58,17 @@ protected IConsumer CreateConsumer(string group)
var config = new ConsumerConfig
{
GroupId = group,
BootstrapServers = MessageBus.ProviderSettings.BrokerList
BootstrapServers = ProviderSettings.BrokerList
};
MessageBus.ProviderSettings.ConsumerConfig(config);
ProviderSettings.ConsumerConfig(config);

// ToDo: add support for auto commit
config.EnableAutoCommit = false;
// Notify when we reach EoF, so that we can do a manual commit
config.EnablePartitionEof = true;
config.EnablePartitionEof = true;

var consumer = MessageBus.ProviderSettings.ConsumerBuilderFactory(config)
.SetStatisticsHandler((_, json) => OnStatistics(json))
var consumer = ProviderSettings.ConsumerBuilderFactory(config)
.SetStatisticsHandler((_, json) => OnStatistics(json))
.SetPartitionsAssignedHandler((_, partitions) => OnPartitionAssigned(partitions))
.SetPartitionsRevokedHandler((_, partitions) => OnPartitionRevoked(partitions))
.SetOffsetsCommittedHandler((_, offsets) => OnOffsetsCommitted(offsets))
Expand Down Expand Up @@ -120,7 +120,7 @@ protected async virtual Task ConsumerLoop()
}
catch (ConsumeException e)
{
var pollRetryInterval = MessageBus.ProviderSettings.ConsumerPollRetryInterval;
var pollRetryInterval = ProviderSettings.ConsumerPollRetryInterval;

Logger.LogError(e, "Group [{Group}]: Error occurred while polling new messages (will retry in {RetryInterval}) - {Reason}", Group, pollRetryInterval, e.Error.Reason);
await Task.Delay(pollRetryInterval, _consumerCts.Token).ConfigureAwait(false);
Expand All @@ -134,7 +134,7 @@ protected async virtual Task ConsumerLoop()
Logger.LogInformation("Group [{Group}]: Unsubscribing from topics", Group);
_consumer.Unsubscribe();

if (MessageBus.ProviderSettings.EnableCommitOnBusStop)
if (ProviderSettings.EnableCommitOnBusStop)
{
OnClose();
}
Expand Down Expand Up @@ -201,7 +201,7 @@ protected virtual void OnPartitionEndReached(TopicPartitionOffset offset)
Logger.LogDebug("Group [{Group}]: Reached end of partition, Topic: {Topic}, Partition: {Partition}, Offset: {Offset}", Group, offset.Topic, offset.Partition, offset.Offset);

var processor = _processors[offset.TopicPartition];
processor.OnPartitionEndReached(offset);
processor.OnPartitionEndReached();
}

protected async virtual ValueTask OnMessage(ConsumeResult message)
Expand All @@ -212,15 +212,21 @@ protected async virtual ValueTask OnMessage(ConsumeResult message)
await processor.OnMessage(message).ConfigureAwait(false);
}

protected virtual void OnOffsetsCommitted(CommittedOffsets e)
protected internal virtual void OnOffsetsCommitted(CommittedOffsets e)
{
if (e.Error.IsError || e.Error.IsFatal)
{
Logger.LogWarning("Group [{Group}]: Failed to commit offsets: [{Offsets}], error: {error}", Group, string.Join(", ", e.Offsets), e.Error.Reason);
if (Logger.IsEnabled(LogLevel.Warning))
{
Logger.LogWarning("Group [{Group}]: Failed to commit offsets: [{Offsets}], error: {ErrorMessage}", Group, string.Join(", ", e.Offsets), e.Error.Reason);
}
}
else
{
Logger.LogTrace("Group [{Group}]: Successfully committed offsets: [{Offsets}]", Group, string.Join(", ", e.Offsets));
if (Logger.IsEnabled(LogLevel.Debug))
{
Logger.LogDebug("Group [{Group}]: Successfully committed offsets: [{Offsets}]", Group, string.Join(", ", e.Offsets));
}
}
}

Expand Down
Loading

0 comments on commit 75da769

Please sign in to comment.