Skip to content

Commit

Permalink
Refactor
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Maruszak <maruszaktomasz@gmail.com>
  • Loading branch information
zarusz committed Jun 23, 2024
1 parent 3eefe52 commit 11c7c48
Show file tree
Hide file tree
Showing 16 changed files with 119 additions and 124 deletions.
2 changes: 1 addition & 1 deletion src/Host.Plugin.Properties.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<Import Project="Common.NuGet.Properties.xml" />

<PropertyGroup>
<Version>2.4.0-rc7</Version>
<Version>2.4.0-rc8</Version>
</PropertyGroup>

</Project>
47 changes: 23 additions & 24 deletions src/SlimMessageBus.Host.AzureEventHub/EventHubMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,43 +104,42 @@ protected override async Task OnStart()
}
}

protected override async Task<(IReadOnlyCollection<T> Dispatched, Exception Exception)> ProduceToTransport<T>(IReadOnlyCollection<T> envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken = default)
protected override async Task<ProduceToTransportBulkResult<T>> ProduceToTransportBulk<T>(IReadOnlyCollection<T> envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken)
{
AssertActive();

var dispatched = new List<T>(envelopes.Count);
try
{
var messages = envelopes
var messagesByPartition = envelopes
.Where(x => x.Message != null)
.Select(
envelope =>
{
var messageType = envelope.Message?.GetType();
var messagePayload = Serializer.Serialize(envelope.MessageType, envelope.Message);
.Select(envelope =>
{
var messageType = envelope.Message?.GetType();
var messagePayload = Serializer.Serialize(envelope.MessageType, envelope.Message);

_logger.LogDebug("Producing message {Message} of Type {MessageType} on Path {Path} with Size {MessageSize}", envelope.Message, messageType?.Name, path, messagePayload?.Length ?? 0);
_logger.LogDebug("Producing message {Message} of Type {MessageType} on Path {Path} with Size {MessageSize}", envelope.Message, messageType?.Name, path, messagePayload?.Length ?? 0);

var ev = envelope.Message != null ? new EventData(messagePayload) : new EventData();
var ev = envelope.Message != null ? new EventData(messagePayload) : new EventData();

if (envelope.Headers != null)
if (envelope.Headers != null)
{
foreach (var header in envelope.Headers)
{
foreach (var header in envelope.Headers)
{
ev.Properties.Add(header.Key, header.Value);
}
ev.Properties.Add(header.Key, header.Value);
}
}

var partitionKey = messageType != null
? GetPartitionKey(messageType, envelope.Message)
: null;
var partitionKey = messageType != null
? GetPartitionKey(messageType, envelope.Message)
: null;

return (Envelope: envelope, Message: ev, PartitionKey: partitionKey);
})
.GroupBy(x => x.PartitionKey);
return (Envelope: envelope, Message: ev, PartitionKey: partitionKey);
})
.GroupBy(x => x.PartitionKey);

var producer = _producerByPath[path];
foreach (var partition in messages)
foreach (var partition in messagesByPartition)
{
EventDataBatch batch = null;
try
Expand Down Expand Up @@ -184,7 +183,7 @@ protected override async Task OnStart()
batch = null;
}

return (dispatched, null);
return new(dispatched, null);
}
finally
{
Expand All @@ -194,10 +193,10 @@ protected override async Task OnStart()
}
catch (Exception ex)
{
return (dispatched, ex);
return new(dispatched, ex);
}

return (dispatched, null);
return new(dispatched, null);
}

#endregion
Expand Down
27 changes: 12 additions & 15 deletions src/SlimMessageBus.Host.AzureServiceBus/ServiceBusMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,14 @@ void AddConsumerFrom(TopicSubscriptionParams topicSubscription, IMessageProcesso
}
}

protected override async Task<(IReadOnlyCollection<T> Dispatched, Exception Exception)> ProduceToTransport<T>(IReadOnlyCollection<T> envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken = default)
protected override async Task<ProduceToTransportBulkResult<T>> ProduceToTransportBulk<T>(IReadOnlyCollection<T> envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken)
{
async Task SendBatchAsync(ServiceBusSender senderClient, ServiceBusMessageBatch batch, CancellationToken cancellationToken)
{
await Retry.WithDelay(
Task SendBatchAsync(ServiceBusSender senderClient, ServiceBusMessageBatch batch, CancellationToken cancellationToken) =>
Retry.WithDelay(
async cancellationToken =>
{
await senderClient.SendMessagesAsync(batch, cancellationToken).ConfigureAwait(false);
_logger.LogDebug("Batch of {BatchSize} message(s) dispatched to {path} ({SizeInBytes} bytes)", batch.Count, path, batch.SizeInBytes);
_logger.LogDebug("Batch of {BatchSize} message(s) dispatched to {Path} ({SizeInBytes} bytes)", batch.Count, path, batch.SizeInBytes);
},
(exception, attempt) =>
{
Expand All @@ -135,18 +134,16 @@ await Retry.WithDelay(
_logger.LogWarning("Service bus throttled. Backing off (Attempt: {Attempt}).", attempt);
return true;
}

return false;
},
TimeSpan.FromSeconds(2),
TimeSpan.FromSeconds(1),
delay: TimeSpan.FromSeconds(2),
jitter: TimeSpan.FromSeconds(1),
cancellationToken);
}

AssertActive();

var messages = envelopes.Select(
envelope =>
var messages = envelopes
.Select(envelope =>
{
var messageType = envelope.Message?.GetType();
var messagePayload = Serializer.Serialize(envelope.MessageType, envelope.Message);
Expand Down Expand Up @@ -190,12 +187,12 @@ await Retry.WithDelay(
await senderClient.SendMessageAsync(item.ServiceBusMessage, cancellationToken: cancellationToken).ConfigureAwait(false);
_logger.LogDebug("Delivered item {Message} of type {MessageType} to {Path}", item.Envelope.Message, item.Envelope.MessageType?.Name, path);

return ([item.Envelope], null);
return new([item.Envelope], null);
}
catch (Exception ex)
{
_logger.LogDebug(ex, "Producing message {Message} of type {MessageType} to path {Path} resulted in error {Error}", item.Envelope.Message, item.Envelope.MessageType?.Name, path, ex.Message);
return ([], ex);
return new([], ex);
}
}

Expand Down Expand Up @@ -233,12 +230,12 @@ await Retry.WithDelay(
batch = null;
}

return (dispatched, null);
return new(dispatched, null);
}
catch (Exception ex)
{
_logger.LogError(ex, "Producing message batch to path {Path} resulted in error {Error}", path, ex.Message);
return (dispatched, ex);
return new(dispatched, ex);
}
finally
{
Expand Down
6 changes: 3 additions & 3 deletions src/SlimMessageBus.Host.Kafka/KafkaMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ protected override async ValueTask DisposeAsyncCore()
}
}

protected override async Task<(IReadOnlyCollection<T> Dispatched, Exception Exception)> ProduceToTransport<T>(IReadOnlyCollection<T> envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken = default)
protected override async Task<ProduceToTransportBulkResult<T>> ProduceToTransportBulk<T>(IReadOnlyCollection<T> envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken)
{
AssertActive();

Expand Down Expand Up @@ -172,10 +172,10 @@ protected override async ValueTask DisposeAsyncCore()
}
catch (Exception ex)
{
return (dispatched, ex);
return new(dispatched, ex);
}

return (dispatched, null);
return new(dispatched, null);
}

protected byte[] GetMessageKey(ProducerSettings producerSettings, Type messageType, object message, string topic)
Expand Down
4 changes: 2 additions & 2 deletions src/SlimMessageBus.Host.Memory/MemoryMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ private IMessageProcessorQueue CreateMessageProcessorQueue(IMessageProcessor<obj
: new MessageProcessorQueue(messageProcessor, LoggerFactory.CreateLogger<MessageProcessorQueue>(), CancellationToken);
}

protected override Task<(IReadOnlyCollection<T> Dispatched, Exception Exception)> ProduceToTransport<T>(IReadOnlyCollection<T> envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken = default)
=> Task.FromResult<(IReadOnlyCollection<T> Dispatched, Exception Exception)>(([], null)); // Not used
protected override Task<ProduceToTransportBulkResult<T>> ProduceToTransportBulk<T>(IReadOnlyCollection<T> envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken)
=> Task.FromResult<ProduceToTransportBulkResult<T>>(new([], null)); // Not used

public override Task ProduceResponse(string requestId, object request, IReadOnlyDictionary<string, object> requestHeaders, object response, Exception responseException, IMessageTypeConsumerInvokerSettings consumerInvoker)
=> Task.CompletedTask; // Not used to responses
Expand Down
12 changes: 5 additions & 7 deletions src/SlimMessageBus.Host.Mqtt/MqttMessageBus.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
namespace SlimMessageBus.Host.Mqtt;

using System.Security.Cryptography;

using MQTTnet.Extensions.ManagedClient;

public class MqttMessageBus : MessageBusBase<MqttMessageBusSettings>
Expand Down Expand Up @@ -103,10 +101,10 @@ private Task OnMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
return Task.CompletedTask;
}

protected override async Task<(IReadOnlyCollection<T> Dispatched, Exception Exception)> ProduceToTransport<T>(IReadOnlyCollection<T> envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken = default)
protected override async Task<ProduceToTransportBulkResult<T>> ProduceToTransportBulk<T>(IReadOnlyCollection<T> envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken)
{
var messages = envelopes.Select(
envelope =>
var messages = envelopes
.Select(envelope =>
{
var messagePayload = Serializer.Serialize(envelope.MessageType, envelope.Message);

Expand Down Expand Up @@ -156,10 +154,10 @@ private Task OnMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
}
catch (Exception ex)
{
return (dispatched, ex);
return new(dispatched, ex);
}
}

return (dispatched, null);
return new(dispatched, null);
}
}
16 changes: 8 additions & 8 deletions src/SlimMessageBus.Host.Outbox/Interceptors/OutboxSendingTask.cs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ internal async Task<int> SendMessages(IServiceProvider serviceProvider, IOutboxR
outboxMessage =>
{
var message = bus.Serializer.Deserialize(outboxMessage.MessageType, outboxMessage.MessagePayload);
return new EnvelopeWithId(outboxMessage.Id, message, outboxMessage.MessageType, outboxMessage.Headers ?? new Dictionary<string, object>());
return new OutboxBulkMessage(outboxMessage.Id, message, outboxMessage.MessageType, outboxMessage.Headers ?? new Dictionary<string, object>());
})
.Batch(bulkProducer.MaxMessagesPerTransaction ?? defaultBatchSize);

Expand All @@ -271,12 +271,12 @@ internal async Task<int> SendMessages(IServiceProvider serviceProvider, IOutboxR
return count;
}

internal async Task<(bool Success, int Published)> DispatchBatchAsync(IOutboxRepository outboxRepository, IMessageBusBulkProducer producer, IMessageBusTarget messageBusTarget, IReadOnlyCollection<EnvelopeWithId> batch, string busName, string path, CancellationToken cancellationToken)
internal async Task<(bool Success, int Published)> DispatchBatchAsync(IOutboxRepository outboxRepository, IMessageBusBulkProducer producer, IMessageBusTarget messageBusTarget, IReadOnlyCollection<OutboxBulkMessage> batch, string busName, string path, CancellationToken cancellationToken)
{
_logger.LogDebug("Publishing batch of {MessageCount} messages to pathGroup {Path} on {BusName} bus", batch.Count, path, busName);

// TOOD: Enclose in a transaction
var results = await producer.ProduceToTransport(batch, path, messageBusTarget, cancellationToken).ConfigureAwait(false);
var results = await producer.ProduceToTransportBulk(batch, path, messageBusTarget, cancellationToken).ConfigureAwait(false);
if (cancellationToken.IsCancellationRequested && results.Dispatched.Count == 0)
{
// if cancellation has been requested, only return if no messages were published
Expand Down Expand Up @@ -311,14 +311,14 @@ private static IMasterMessageBus GetBus(ICompositeMessageBus compositeMessageBus
return null;
}

public record EnvelopeWithId : Envelope
public record OutboxBulkMessage : BulkMessageEnvelope
{
public EnvelopeWithId(Guid id, object Message, Type MessageType, IDictionary<string, object> Headers)
: base(Message, MessageType, Headers)
public Guid Id { get; }

public OutboxBulkMessage(Guid id, object message, Type messageType, IDictionary<string, object> headers)
: base(message, messageType, headers)
{
Id = id;
}

public Guid Id { get; }
}
}
6 changes: 3 additions & 3 deletions src/SlimMessageBus.Host.RabbitMQ/RabbitMqMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ protected override async ValueTask DisposeAsyncCore()
}
}

protected override async Task<(IReadOnlyCollection<T> Dispatched, Exception Exception)> ProduceToTransport<T>(IReadOnlyCollection<T> envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken = default)
protected override async Task<ProduceToTransportBulkResult<T>> ProduceToTransportBulk<T>(IReadOnlyCollection<T> envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken)
{
await EnsureInitFinished();

Expand Down Expand Up @@ -178,9 +178,9 @@ protected override async ValueTask DisposeAsyncCore()
}
catch (Exception ex)
{
return (dispatched, ex);
return new(dispatched, ex);
}

return (dispatched, null);
return new(dispatched, null);
}
}
10 changes: 7 additions & 3 deletions src/SlimMessageBus.Host.Redis/RedisMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,13 @@ void AddTopicConsumer(string topic, ISubscriber subscriber, IMessageProcessor<Me

#region Overrides of MessageBusBase

protected override async Task<(IReadOnlyCollection<T> Dispatched, Exception Exception)> ProduceToTransport<T>(IReadOnlyCollection<T> envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken = default)
protected override async Task<ProduceToTransportBulkResult<T>> ProduceToTransportBulk<T>(IReadOnlyCollection<T> envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken)
{
#if NETSTANDARD2_0
if (envelopes is null) throw new ArgumentNullException(nameof(envelopes));
#else
ArgumentNullException.ThrowIfNull(envelopes);
#endif

AssertActive();

Expand Down Expand Up @@ -184,10 +188,10 @@ void AddTopicConsumer(string topic, ISubscriber subscriber, IMessageProcessor<Me
}
catch (Exception ex)
{
return (dispatched, ex);
return new(dispatched, ex);
}

return (dispatched, null);
return new(dispatched, null);
}

#endregion
Expand Down
5 changes: 2 additions & 3 deletions src/SlimMessageBus.Host.Sql/GlobalUsings.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
global using System.Data;

global using Microsoft.Data.SqlClient;
global using Microsoft.Data.SqlClient;
global using Microsoft.Extensions.DependencyInjection;
global using Microsoft.Extensions.DependencyInjection.Extensions;
global using Microsoft.Extensions.Logging;

Expand Down
6 changes: 2 additions & 4 deletions src/SlimMessageBus.Host.Sql/SqlMessageBus.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
namespace SlimMessageBus.Host.Sql;

using Microsoft.Extensions.DependencyInjection;

public class SqlMessageBus : MessageBusBase<SqlMessageBusSettings>
{
public SqlMessageBus(MessageBusSettings settings, SqlMessageBusSettings providerSettings)
Expand All @@ -27,12 +25,12 @@ public override async Task ProvisionTopology()
await provisioningService.Migrate(CancellationToken); // provisining happens asynchronously
}

protected override async Task<(IReadOnlyCollection<T> Dispatched, Exception Exception)> ProduceToTransport<T>(IReadOnlyCollection<T> envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken = default)
protected override Task<ProduceToTransportBulkResult<T>> ProduceToTransportBulk<T>(IReadOnlyCollection<T> envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken)
{
var sqlRepository = targetBus.ServiceProvider.GetService<ISqlRepository>();

// ToDo: Save to table

return ([], new NotImplementedException());
return Task.FromResult(new ProduceToTransportBulkResult<T>([], new NotImplementedException()));
}
}
9 changes: 6 additions & 3 deletions src/SlimMessageBus.Host/IMessageBusBulkProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@ public interface IMessageBusBulkProducer
/// <summary>
/// The maximum number of messages that can take part in a <see cref="TransactionScope"/>. Null if transaction scopes are not supported by the message bus.
/// </summary>
int? MaxMessagesPerTransaction { get; }
Task<(IReadOnlyCollection<T> Dispatched, Exception Exception)> ProduceToTransport<T>(IReadOnlyCollection<T> envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken = default) where T: Envelope;
int? MaxMessagesPerTransaction { get; }

Task<ProduceToTransportBulkResult<T>> ProduceToTransportBulk<T>(IReadOnlyCollection<T> envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken = default) where T : BulkMessageEnvelope;
}

public record Envelope(object Message, Type MessageType, IDictionary<string, object> Headers);
public record BulkMessageEnvelope(object Message, Type MessageType, IDictionary<string, object> Headers);

public record ProduceToTransportBulkResult<T>(IReadOnlyCollection<T> Dispatched, Exception Exception);
Loading

0 comments on commit 11c7c48

Please sign in to comment.