Skip to content

Commit

Permalink
[Host.Kafka] Fix lag of the last message in partition #131, bump clie…
Browse files Browse the repository at this point in the history
…nt lib, remove cloudkarafka

Signed-off-by: Tomasz Maruszak <maruszaktomasz@gmail.com>
  • Loading branch information
zarusz committed Oct 26, 2024
1 parent 2239152 commit 7178332
Show file tree
Hide file tree
Showing 20 changed files with 80 additions and 131 deletions.
2 changes: 1 addition & 1 deletion src/Host.Plugin.Properties.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<Import Project="Common.NuGet.Properties.xml" />

<PropertyGroup>
<Version>2.5.4-rc1</Version>
<Version>2.5.4-rc2</Version>
</PropertyGroup>

</Project>
2 changes: 1 addition & 1 deletion src/Infrastructure/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ services:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_CREATE_TOPICS: "user-test-ping:2:1,user-test-echo:2:1,user-test-echo-resp:2:1"
KAFKA_CREATE_TOPICS: "test-ping:2:1,test-echo:2:1,test-echo-resp:2:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
depends_on:
- zookeeper
Expand Down
8 changes: 0 additions & 8 deletions src/Samples/Sample.Simple.ConsoleApp/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,6 @@ private static void ConfigureMessageBus(MessageBusBuilder mbb, IConfiguration co
var consumerGroup = "consoleapp";
var responseGroup = "consoleapp-1";

if (provider == Provider.Kafka)
{
// Note: We are using the free plan of CloudKarafka to host the Kafka infrastructure. The free plan has a limit on topic you can get free and it requires these topic prefixes.
topicForAddCommand = "4p5ma6io-test-ping";
topicForMultiplyRequest = "4p5ma6io-multiply-request";
topicForResponses = "4p5ma6io-responses";
}

/*
Azure Event Hub setup notes:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,6 @@
<ImplicitUsings>enable</ImplicitUsings>
</PropertyGroup>

<ItemGroup>
<None Include="..\..\cloudkarafka_2023-10.pem" Link="cloudkarafka_2023-10.pem">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
</ItemGroup>
Expand Down
3 changes: 1 addition & 2 deletions src/Samples/Sample.Simple.ConsoleApp/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
"Kafka": {
"Brokers": "{{kafka_brokers}}",
"Username": "{{kafka_username}}",
"Password": "{{kafka_password}}",
"Secure": "{{mqtt_secure}}"
"Password": "{{kafka_password}}"
},
"Azure": {
"EventHub": {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
namespace SlimMessageBus.Host.Kafka;

public interface IKafkaCommitController
{
{
/// <summary>
/// The offset of the topic-parition that should be commited onto the consumer group
/// </summary>
/// <param name="offset"></param>
void Commit(TopicPartitionOffset offset);
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
namespace SlimMessageBus.Host.Kafka;

using ConsumeResult = Confluent.Kafka.ConsumeResult<Confluent.Kafka.Ignore, byte[]>;
using ConsumeResult = ConsumeResult<Ignore, byte[]>;

/// <summary>
/// The processor of assigned partition (<see cref="TopicPartition"/>).
Expand Down
1 change: 1 addition & 0 deletions src/SlimMessageBus.Host.Kafka/Consumer/KafkaExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
namespace SlimMessageBus.Host.Kafka;

public static class KafkaExtensions
{
public static TopicPartitionOffset AddOffset(this TopicPartitionOffset topicPartitionOffset, int addOffset)
Expand Down
4 changes: 2 additions & 2 deletions src/SlimMessageBus.Host.Kafka/Consumer/KafkaGroupConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -245,11 +245,11 @@ protected virtual void OnStatistics(string json)
Logger.LogTrace("Group [{Group}]: Statistics: {statistics}", Group, json);
}

#region Implementation of IKafkaCoordinator
#region Implementation of IKafkaCommitController

public void Commit(TopicPartitionOffset offset)
{
Logger.LogDebug("Group [{Group}]: Commit Offset, Topic: {Topic}, Partition: {Partition}, Offset: {Offset}", Group, offset.Topic, offset.Partition, offset.Offset);
Logger.LogDebug("Group [{Group}]: Commit Offset, Topic: {Topic}, Partition: {Partition}, Offset: {Offset}", Group, offset.Topic, offset.Partition, offset.Offset);
_consumer.Commit([offset]);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,13 @@ public void Commit(TopicPartitionOffset offset)
{
if (offset != null && (_lastCheckpointOffset == null || offset.Offset > _lastCheckpointOffset.Offset))
{
_logger.LogDebug("Group [{Group}]: Commit at Offset: {Offset}, Partition: {Partition}, Topic: {Topic}", Group, offset.Offset, offset.Partition, offset.Topic);

_lastCheckpointOffset = offset;
_commitController.Commit(offset);

// See https://github.com/confluentinc/confluent-kafka-dotnet/blob/25f320a672b4324d732304cb4efa2288867b320c/src/Confluent.Kafka/Consumer.cs#L338
// See https://github.com/confluentinc/confluent-kafka-dotnet/issues/1380#issuecomment-672036089
// The commit has to have the last processed message + 1

_commitController.Commit(offset.AddOffset(1));

CheckpointTrigger?.Reset();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="2.5.3" />
<PackageReference Include="Confluent.Kafka" Version="2.6.0" />
</ItemGroup>

<ItemGroup>
Expand Down
3 changes: 1 addition & 2 deletions src/SlimMessageBus.sln
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{A6C28448-3839-490C-BE30-580C1A45E225}"
ProjectSection(SolutionItems) = preProject
.editorconfig = .editorconfig
cloudkarafka_2023-10.pem = cloudkarafka_2023-10.pem
Common.NuGet.Properties.xml = Common.NuGet.Properties.xml
Common.Properties.xml = Common.Properties.xml
..\CONTRIBUTING.md = ..\CONTRIBUTING.md
Expand Down Expand Up @@ -277,7 +276,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Nats-SingleNode", "Nats-Sin
Samples\Infrastructure\Nats-SingleNode\docker-compose.yml = Samples\Infrastructure\Nats-SingleNode\docker-compose.yml
EndProjectSection
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SlimMessageBus.Host.AspNetCore.Test", "Tests\SlimMessageBus.Host.AspNetCore.Test\SlimMessageBus.Host.AspNetCore.Test.csproj", "{9FCBF788-1F0C-43E2-909D-1F96B2685F38}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SlimMessageBus.Host.AspNetCore.Test", "Tests\SlimMessageBus.Host.AspNetCore.Test\SlimMessageBus.Host.AspNetCore.Test.csproj", "{9FCBF788-1F0C-43E2-909D-1F96B2685F38}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public async Task When_OnPartitionEndReached_Then_ShouldCommit()
_subject.Value.OnPartitionEndReached();

// assert
_commitControllerMock.Verify(x => x.Commit(message.TopicPartitionOffset), Times.Once);
_commitControllerMock.Verify(x => x.Commit(message.TopicPartitionOffset.AddOffset(1)), Times.Once);
}

[Fact]
Expand Down Expand Up @@ -98,7 +98,7 @@ public async Task When_OnMessage_Given_CheckpointTriggerFires_Then_ShouldCommit(
await _subject.Value.OnMessage(message3);

// assert
_commitControllerMock.Verify(x => x.Commit(message3.TopicPartitionOffset), Times.Once);
_commitControllerMock.Verify(x => x.Commit(message3.TopicPartitionOffset.AddOffset(1)), Times.Once);
}

private ConsumeResult GetSomeMessage(int offsetAdd = 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void When_NewInstance_Then_TopicPartitionSet()
public async Task When_OnPartitionEndReached_Then_ShouldCommit()
{
// arrange
var partition = new TopicPartitionOffset(_topicPartition, new Offset(10));
var messageOffset = new TopicPartitionOffset(_topicPartition, new Offset(10));
var message = GetSomeMessage();

_subject.OnPartitionAssigned(_topicPartition);
Expand All @@ -56,7 +56,7 @@ public async Task When_OnPartitionEndReached_Then_ShouldCommit()
_subject.OnPartitionEndReached();

// assert
_commitControllerMock.Verify(x => x.Commit(partition), Times.Once);
_commitControllerMock.Verify(x => x.Commit(messageOffset.AddOffset(1)), Times.Once);
}

[Fact]
Expand Down Expand Up @@ -98,7 +98,7 @@ public async Task When_OnMessage_Given_CheckpointReturnTrue_Then_ShouldCommit()
await _subject.OnMessage(message);

// assert
_commitControllerMock.Verify(x => x.Commit(message.TopicPartitionOffset), Times.Once);
_commitControllerMock.Verify(x => x.Commit(message.TopicPartitionOffset.AddOffset(1)), Times.Once);
}

[Fact]
Expand Down
97 changes: 51 additions & 46 deletions src/Tests/SlimMessageBus.Host.Kafka.Test/KafkaMessageBusIt.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,28 +26,12 @@ namespace SlimMessageBus.Host.Kafka.Test;
public class KafkaMessageBusIt(ITestOutputHelper testOutputHelper)
: BaseIntegrationTest<KafkaMessageBusIt>(testOutputHelper)
{
private const int NumberOfMessages = 77;
private string TopicPrefix { get; set; }
private const int NumberOfMessages = 300;
private readonly static TimeSpan DelayTimeSpan = TimeSpan.FromSeconds(5);

private static void AddSsl(string username, string password, ClientConfig c)
{
// cloudkarafka.com uses SSL with SASL authentication
c.SecurityProtocol = SecurityProtocol.SaslSsl;
c.SaslUsername = username;
c.SaslPassword = password;
c.SaslMechanism = SaslMechanism.ScramSha256;
c.SslCaLocation = "cloudkarafka_2023-10.pem";
}

protected override void SetupServices(ServiceCollection services, IConfigurationRoot configuration)
{
var kafkaBrokers = Secrets.Service.PopulateSecrets(configuration["Kafka:Brokers"]);
var kafkaUsername = Secrets.Service.PopulateSecrets(configuration["Kafka:Username"]);
var kafkaPassword = Secrets.Service.PopulateSecrets(configuration["Kafka:Password"]);
var kafkaSecure = Convert.ToBoolean(Secrets.Service.PopulateSecrets(configuration["Kafka:Secure"]));

// Topics on cloudkarafka.com are prefixed with username
TopicPrefix = $"{kafkaUsername}-";

services
.AddSlimMessageBus((mbb) =>
Expand All @@ -59,24 +43,13 @@ protected override void SetupServices(ServiceCollection services, IConfiguration
{
config.LingerMs = 5; // 5ms
config.SocketNagleDisable = true;

if (kafkaSecure)
{
AddSsl(kafkaUsername, kafkaPassword, config);
}

};
cfg.ConsumerConfig = (config) =>
{
config.FetchErrorBackoffMs = 1;
config.SocketNagleDisable = true;
// when the test containers start there is no consumer group yet, so we want to start from the beginning
config.AutoOffsetReset = AutoOffsetReset.Earliest;

if (kafkaSecure)
{
AddSsl(kafkaUsername, kafkaPassword, config);
}
};
});
mbb.AddServicesFromAssemblyContaining<PingConsumer>();
Expand All @@ -90,13 +63,15 @@ protected override void SetupServices(ServiceCollection services, IConfiguration

public IMessageBus MessageBus => ServiceProvider.GetRequiredService<IMessageBus>();

[Fact]
public async Task BasicPubSub()
[Theory]
[InlineData(300, 100, 120)]
[InlineData(300, 120, 100)]
public async Task BasicPubSub(int numberOfMessages, int delayProducerAt, int delayConsumerAt)
{
// arrange
AddBusConfiguration(mbb =>
{
var topic = $"{TopicPrefix}test-ping";
var topic = "test-ping";
mbb.Produce<PingMessage>(x =>
{
x.DefaultTopic(topic);
Expand All @@ -117,35 +92,65 @@ public async Task BasicPubSub()
// doc:fragment:ExampleCheckpointConfig
});

var consumedMessages = ServiceProvider.GetRequiredService<TestEventCollector<ConsumedMessage>>();
var consumedMessages = ServiceProvider.GetRequiredService<TestEventCollector<ConsumedMessage>>();
var consumerControl = ServiceProvider.GetRequiredService<IConsumerControl>();
var messageBus = MessageBus;

// act

// consume all messages that might be on the queue/subscription
await consumedMessages.WaitUntilArriving(newMessagesTimeout: 5);
consumedMessages.Clear();

var pauseAtOffsets = new HashSet<int> { delayConsumerAt };

consumedMessages.OnAdded += (IList<ConsumedMessage> messages, ConsumedMessage message) =>
{
// At the given index message stop the consumers, to simulate a pause in the processing to check if it resumes exactly from the next message
if (pauseAtOffsets.Contains(message.Message.Counter))
{
// Remove self to cause only delay once (in case the same message gets repeated)
pauseAtOffsets.Remove(message.Message.Counter);

consumerControl.Stop().ContinueWith(async (_) =>
{
await Task.Delay(DelayTimeSpan);
await consumerControl.Start();
});
}
};

// publish
var stopwatch = Stopwatch.StartNew();

var stopwatch = Stopwatch.StartNew();

var messages = Enumerable
.Range(0, NumberOfMessages)
.Range(0, numberOfMessages)
.Select(i => new PingMessage(DateTime.UtcNow, i))
.ToList();

await Task.WhenAll(messages.Select(m => messageBus.Publish(m)));


var index = 0;
foreach (var m in messages)
{
if (index == delayProducerAt)
{
// We want to force the Partition EOF event to be triggered by Kafka
Logger.LogInformation("Waiting some time before publish to force Partition EOF event (MessageIndex: {MessageIndex})", index);
await Task.Delay(DelayTimeSpan);
}
await messageBus.Publish(m);
index++;
}

stopwatch.Stop();
Logger.LogInformation("Published {MessageCount} messages in {PublishTime}", messages.Count, stopwatch.Elapsed);
Logger.LogInformation("Published {MessageCount} messages in {ProduceTime} including simulated delay {DelayTime}", messages.Count, stopwatch.Elapsed, DelayTimeSpan);

// consume
stopwatch.Restart();

await consumedMessages.WaitUntilArriving(newMessagesTimeout: 5);
await consumedMessages.WaitUntilArriving(newMessagesTimeout: 10);

stopwatch.Stop();
Logger.LogInformation("Consumed {MessageCount} messages in {ConsumedTime}", consumedMessages.Count, stopwatch.Elapsed);
Logger.LogInformation("Consumed {MessageCount} messages in {ConsumeTime} including simlulated delay {DelayTime}", consumedMessages.Count, stopwatch.Elapsed, DelayTimeSpan);

// assert

Expand All @@ -163,8 +168,8 @@ public async Task BasicPubSub()
.Where(x => x.Partition == 1)
.All(x => x.Message.Counter % 2 == 1)
.Should().BeTrue();
}

}

[Fact]
public async Task BasicReqResp()
{
Expand All @@ -174,7 +179,7 @@ public async Task BasicReqResp()

AddBusConfiguration(mbb =>
{
var topic = $"{TopicPrefix}test-echo";
var topic = "test-echo";
mbb
.Produce<EchoRequest>(x =>
{
Expand All @@ -191,7 +196,7 @@ public async Task BasicReqResp()
.CheckpointAfter(TimeSpan.FromSeconds(10)))
.ExpectRequestResponses(x =>
{
x.ReplyToTopic($"{TopicPrefix}test-echo-resp");
x.ReplyToTopic("test-echo-resp");
x.KafkaGroup("response-reader");
// for subsequent test runs allow enough time for kafka to reassign the partitions
x.DefaultTimeout(TimeSpan.FromSeconds(60));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@
<None Update="appsettings.json">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
<None Include="..\..\cloudkarafka_2023-10.pem" Link="cloudkarafka_2023-10.pem">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
<None Update="xunit.runner.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
Expand Down
Loading

0 comments on commit 7178332

Please sign in to comment.