diff --git a/src/Host.Plugin.Properties.xml b/src/Host.Plugin.Properties.xml
index a6cde8b2..854aff3c 100644
--- a/src/Host.Plugin.Properties.xml
+++ b/src/Host.Plugin.Properties.xml
@@ -4,7 +4,7 @@
- 2.4.0-rc7
+ 2.4.0-rc8
\ No newline at end of file
diff --git a/src/SlimMessageBus.Host.AzureEventHub/EventHubMessageBus.cs b/src/SlimMessageBus.Host.AzureEventHub/EventHubMessageBus.cs
index 6bd0de8c..4d2eb13c 100644
--- a/src/SlimMessageBus.Host.AzureEventHub/EventHubMessageBus.cs
+++ b/src/SlimMessageBus.Host.AzureEventHub/EventHubMessageBus.cs
@@ -104,43 +104,42 @@ protected override async Task OnStart()
}
}
- protected override async Task<(IReadOnlyCollection Dispatched, Exception Exception)> ProduceToTransport(IReadOnlyCollection envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken = default)
+ protected override async Task> ProduceToTransportBulk(IReadOnlyCollection envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken)
{
AssertActive();
var dispatched = new List(envelopes.Count);
try
{
- var messages = envelopes
+ var messagesByPartition = envelopes
.Where(x => x.Message != null)
- .Select(
- envelope =>
- {
- var messageType = envelope.Message?.GetType();
- var messagePayload = Serializer.Serialize(envelope.MessageType, envelope.Message);
+ .Select(envelope =>
+ {
+ var messageType = envelope.Message?.GetType();
+ var messagePayload = Serializer.Serialize(envelope.MessageType, envelope.Message);
- _logger.LogDebug("Producing message {Message} of Type {MessageType} on Path {Path} with Size {MessageSize}", envelope.Message, messageType?.Name, path, messagePayload?.Length ?? 0);
+ _logger.LogDebug("Producing message {Message} of Type {MessageType} on Path {Path} with Size {MessageSize}", envelope.Message, messageType?.Name, path, messagePayload?.Length ?? 0);
- var ev = envelope.Message != null ? new EventData(messagePayload) : new EventData();
+ var ev = envelope.Message != null ? new EventData(messagePayload) : new EventData();
- if (envelope.Headers != null)
+ if (envelope.Headers != null)
+ {
+ foreach (var header in envelope.Headers)
{
- foreach (var header in envelope.Headers)
- {
- ev.Properties.Add(header.Key, header.Value);
- }
+ ev.Properties.Add(header.Key, header.Value);
}
+ }
- var partitionKey = messageType != null
- ? GetPartitionKey(messageType, envelope.Message)
- : null;
+ var partitionKey = messageType != null
+ ? GetPartitionKey(messageType, envelope.Message)
+ : null;
- return (Envelope: envelope, Message: ev, PartitionKey: partitionKey);
- })
- .GroupBy(x => x.PartitionKey);
+ return (Envelope: envelope, Message: ev, PartitionKey: partitionKey);
+ })
+ .GroupBy(x => x.PartitionKey);
var producer = _producerByPath[path];
- foreach (var partition in messages)
+ foreach (var partition in messagesByPartition)
{
EventDataBatch batch = null;
try
@@ -184,7 +183,7 @@ protected override async Task OnStart()
batch = null;
}
- return (dispatched, null);
+ return new(dispatched, null);
}
finally
{
@@ -194,10 +193,10 @@ protected override async Task OnStart()
}
catch (Exception ex)
{
- return (dispatched, ex);
+ return new(dispatched, ex);
}
- return (dispatched, null);
+ return new(dispatched, null);
}
#endregion
diff --git a/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusMessageBus.cs b/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusMessageBus.cs
index 2c86e5f8..b1143129 100644
--- a/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusMessageBus.cs
+++ b/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusMessageBus.cs
@@ -116,15 +116,14 @@ void AddConsumerFrom(TopicSubscriptionParams topicSubscription, IMessageProcesso
}
}
- protected override async Task<(IReadOnlyCollection Dispatched, Exception Exception)> ProduceToTransport(IReadOnlyCollection envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken = default)
+ protected override async Task> ProduceToTransportBulk(IReadOnlyCollection envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken)
{
- async Task SendBatchAsync(ServiceBusSender senderClient, ServiceBusMessageBatch batch, CancellationToken cancellationToken)
- {
- await Retry.WithDelay(
+ Task SendBatchAsync(ServiceBusSender senderClient, ServiceBusMessageBatch batch, CancellationToken cancellationToken) =>
+ Retry.WithDelay(
async cancellationToken =>
{
await senderClient.SendMessagesAsync(batch, cancellationToken).ConfigureAwait(false);
- _logger.LogDebug("Batch of {BatchSize} message(s) dispatched to {path} ({SizeInBytes} bytes)", batch.Count, path, batch.SizeInBytes);
+ _logger.LogDebug("Batch of {BatchSize} message(s) dispatched to {Path} ({SizeInBytes} bytes)", batch.Count, path, batch.SizeInBytes);
},
(exception, attempt) =>
{
@@ -135,18 +134,16 @@ await Retry.WithDelay(
_logger.LogWarning("Service bus throttled. Backing off (Attempt: {Attempt}).", attempt);
return true;
}
-
return false;
},
- TimeSpan.FromSeconds(2),
- TimeSpan.FromSeconds(1),
+ delay: TimeSpan.FromSeconds(2),
+ jitter: TimeSpan.FromSeconds(1),
cancellationToken);
- }
AssertActive();
- var messages = envelopes.Select(
- envelope =>
+ var messages = envelopes
+ .Select(envelope =>
{
var messageType = envelope.Message?.GetType();
var messagePayload = Serializer.Serialize(envelope.MessageType, envelope.Message);
@@ -190,12 +187,12 @@ await Retry.WithDelay(
await senderClient.SendMessageAsync(item.ServiceBusMessage, cancellationToken: cancellationToken).ConfigureAwait(false);
_logger.LogDebug("Delivered item {Message} of type {MessageType} to {Path}", item.Envelope.Message, item.Envelope.MessageType?.Name, path);
- return ([item.Envelope], null);
+ return new([item.Envelope], null);
}
catch (Exception ex)
{
_logger.LogDebug(ex, "Producing message {Message} of type {MessageType} to path {Path} resulted in error {Error}", item.Envelope.Message, item.Envelope.MessageType?.Name, path, ex.Message);
- return ([], ex);
+ return new([], ex);
}
}
@@ -233,12 +230,12 @@ await Retry.WithDelay(
batch = null;
}
- return (dispatched, null);
+ return new(dispatched, null);
}
catch (Exception ex)
{
_logger.LogError(ex, "Producing message batch to path {Path} resulted in error {Error}", path, ex.Message);
- return (dispatched, ex);
+ return new(dispatched, ex);
}
finally
{
diff --git a/src/SlimMessageBus.Host.Kafka/KafkaMessageBus.cs b/src/SlimMessageBus.Host.Kafka/KafkaMessageBus.cs
index 48d73149..f9d4db7f 100644
--- a/src/SlimMessageBus.Host.Kafka/KafkaMessageBus.cs
+++ b/src/SlimMessageBus.Host.Kafka/KafkaMessageBus.cs
@@ -114,7 +114,7 @@ protected override async ValueTask DisposeAsyncCore()
}
}
- protected override async Task<(IReadOnlyCollection Dispatched, Exception Exception)> ProduceToTransport(IReadOnlyCollection envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken = default)
+ protected override async Task> ProduceToTransportBulk(IReadOnlyCollection envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken)
{
AssertActive();
@@ -172,10 +172,10 @@ protected override async ValueTask DisposeAsyncCore()
}
catch (Exception ex)
{
- return (dispatched, ex);
+ return new(dispatched, ex);
}
- return (dispatched, null);
+ return new(dispatched, null);
}
protected byte[] GetMessageKey(ProducerSettings producerSettings, Type messageType, object message, string topic)
diff --git a/src/SlimMessageBus.Host.Memory/MemoryMessageBus.cs b/src/SlimMessageBus.Host.Memory/MemoryMessageBus.cs
index fa7cfac4..ee351a37 100644
--- a/src/SlimMessageBus.Host.Memory/MemoryMessageBus.cs
+++ b/src/SlimMessageBus.Host.Memory/MemoryMessageBus.cs
@@ -123,8 +123,8 @@ private IMessageProcessorQueue CreateMessageProcessorQueue(IMessageProcessor(), CancellationToken);
}
- protected override Task<(IReadOnlyCollection Dispatched, Exception Exception)> ProduceToTransport(IReadOnlyCollection envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken = default)
- => Task.FromResult<(IReadOnlyCollection Dispatched, Exception Exception)>(([], null)); // Not used
+ protected override Task> ProduceToTransportBulk(IReadOnlyCollection envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken)
+ => Task.FromResult>(new([], null)); // Not used
public override Task ProduceResponse(string requestId, object request, IReadOnlyDictionary requestHeaders, object response, Exception responseException, IMessageTypeConsumerInvokerSettings consumerInvoker)
=> Task.CompletedTask; // Not used to responses
diff --git a/src/SlimMessageBus.Host.Mqtt/MqttMessageBus.cs b/src/SlimMessageBus.Host.Mqtt/MqttMessageBus.cs
index ab86b701..db25d6e8 100644
--- a/src/SlimMessageBus.Host.Mqtt/MqttMessageBus.cs
+++ b/src/SlimMessageBus.Host.Mqtt/MqttMessageBus.cs
@@ -1,7 +1,5 @@
namespace SlimMessageBus.Host.Mqtt;
-using System.Security.Cryptography;
-
using MQTTnet.Extensions.ManagedClient;
public class MqttMessageBus : MessageBusBase
@@ -103,10 +101,10 @@ private Task OnMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
return Task.CompletedTask;
}
- protected override async Task<(IReadOnlyCollection Dispatched, Exception Exception)> ProduceToTransport(IReadOnlyCollection envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken = default)
+ protected override async Task> ProduceToTransportBulk(IReadOnlyCollection envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken)
{
- var messages = envelopes.Select(
- envelope =>
+ var messages = envelopes
+ .Select(envelope =>
{
var messagePayload = Serializer.Serialize(envelope.MessageType, envelope.Message);
@@ -156,10 +154,10 @@ private Task OnMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
}
catch (Exception ex)
{
- return (dispatched, ex);
+ return new(dispatched, ex);
}
}
- return (dispatched, null);
+ return new(dispatched, null);
}
}
\ No newline at end of file
diff --git a/src/SlimMessageBus.Host.Outbox/Interceptors/OutboxSendingTask.cs b/src/SlimMessageBus.Host.Outbox/Interceptors/OutboxSendingTask.cs
index ba20f691..39c506fa 100644
--- a/src/SlimMessageBus.Host.Outbox/Interceptors/OutboxSendingTask.cs
+++ b/src/SlimMessageBus.Host.Outbox/Interceptors/OutboxSendingTask.cs
@@ -252,7 +252,7 @@ internal async Task SendMessages(IServiceProvider serviceProvider, IOutboxR
outboxMessage =>
{
var message = bus.Serializer.Deserialize(outboxMessage.MessageType, outboxMessage.MessagePayload);
- return new EnvelopeWithId(outboxMessage.Id, message, outboxMessage.MessageType, outboxMessage.Headers ?? new Dictionary());
+ return new OutboxBulkMessage(outboxMessage.Id, message, outboxMessage.MessageType, outboxMessage.Headers ?? new Dictionary());
})
.Batch(bulkProducer.MaxMessagesPerTransaction ?? defaultBatchSize);
@@ -271,12 +271,12 @@ internal async Task SendMessages(IServiceProvider serviceProvider, IOutboxR
return count;
}
- internal async Task<(bool Success, int Published)> DispatchBatchAsync(IOutboxRepository outboxRepository, IMessageBusBulkProducer producer, IMessageBusTarget messageBusTarget, IReadOnlyCollection batch, string busName, string path, CancellationToken cancellationToken)
+ internal async Task<(bool Success, int Published)> DispatchBatchAsync(IOutboxRepository outboxRepository, IMessageBusBulkProducer producer, IMessageBusTarget messageBusTarget, IReadOnlyCollection batch, string busName, string path, CancellationToken cancellationToken)
{
_logger.LogDebug("Publishing batch of {MessageCount} messages to pathGroup {Path} on {BusName} bus", batch.Count, path, busName);
// TOOD: Enclose in a transaction
- var results = await producer.ProduceToTransport(batch, path, messageBusTarget, cancellationToken).ConfigureAwait(false);
+ var results = await producer.ProduceToTransportBulk(batch, path, messageBusTarget, cancellationToken).ConfigureAwait(false);
if (cancellationToken.IsCancellationRequested && results.Dispatched.Count == 0)
{
// if cancellation has been requested, only return if no messages were published
@@ -311,14 +311,14 @@ private static IMasterMessageBus GetBus(ICompositeMessageBus compositeMessageBus
return null;
}
- public record EnvelopeWithId : Envelope
+ public record OutboxBulkMessage : BulkMessageEnvelope
{
- public EnvelopeWithId(Guid id, object Message, Type MessageType, IDictionary Headers)
- : base(Message, MessageType, Headers)
+ public Guid Id { get; }
+
+ public OutboxBulkMessage(Guid id, object message, Type messageType, IDictionary headers)
+ : base(message, messageType, headers)
{
Id = id;
}
-
- public Guid Id { get; }
}
}
diff --git a/src/SlimMessageBus.Host.RabbitMQ/RabbitMqMessageBus.cs b/src/SlimMessageBus.Host.RabbitMQ/RabbitMqMessageBus.cs
index 87f50be7..528cd38f 100644
--- a/src/SlimMessageBus.Host.RabbitMQ/RabbitMqMessageBus.cs
+++ b/src/SlimMessageBus.Host.RabbitMQ/RabbitMqMessageBus.cs
@@ -128,7 +128,7 @@ protected override async ValueTask DisposeAsyncCore()
}
}
- protected override async Task<(IReadOnlyCollection Dispatched, Exception Exception)> ProduceToTransport(IReadOnlyCollection envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken = default)
+ protected override async Task> ProduceToTransportBulk(IReadOnlyCollection envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken)
{
await EnsureInitFinished();
@@ -178,9 +178,9 @@ protected override async ValueTask DisposeAsyncCore()
}
catch (Exception ex)
{
- return (dispatched, ex);
+ return new(dispatched, ex);
}
- return (dispatched, null);
+ return new(dispatched, null);
}
}
diff --git a/src/SlimMessageBus.Host.Redis/RedisMessageBus.cs b/src/SlimMessageBus.Host.Redis/RedisMessageBus.cs
index e6c01fca..7e6d2c2c 100644
--- a/src/SlimMessageBus.Host.Redis/RedisMessageBus.cs
+++ b/src/SlimMessageBus.Host.Redis/RedisMessageBus.cs
@@ -147,9 +147,13 @@ void AddTopicConsumer(string topic, ISubscriber subscriber, IMessageProcessor Dispatched, Exception Exception)> ProduceToTransport(IReadOnlyCollection envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken = default)
+ protected override async Task> ProduceToTransportBulk(IReadOnlyCollection envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken)
{
+#if NETSTANDARD2_0
if (envelopes is null) throw new ArgumentNullException(nameof(envelopes));
+#else
+ ArgumentNullException.ThrowIfNull(envelopes);
+#endif
AssertActive();
@@ -184,10 +188,10 @@ void AddTopicConsumer(string topic, ISubscriber subscriber, IMessageProcessor
{
public SqlMessageBus(MessageBusSettings settings, SqlMessageBusSettings providerSettings)
@@ -27,12 +25,12 @@ public override async Task ProvisionTopology()
await provisioningService.Migrate(CancellationToken); // provisining happens asynchronously
}
- protected override async Task<(IReadOnlyCollection Dispatched, Exception Exception)> ProduceToTransport(IReadOnlyCollection envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken = default)
+ protected override Task> ProduceToTransportBulk(IReadOnlyCollection envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken)
{
var sqlRepository = targetBus.ServiceProvider.GetService();
// ToDo: Save to table
- return ([], new NotImplementedException());
+ return Task.FromResult(new ProduceToTransportBulkResult([], new NotImplementedException()));
}
}
\ No newline at end of file
diff --git a/src/SlimMessageBus.Host/IMessageBusBulkProducer.cs b/src/SlimMessageBus.Host/IMessageBusBulkProducer.cs
index d37a192b..a941a3a7 100644
--- a/src/SlimMessageBus.Host/IMessageBusBulkProducer.cs
+++ b/src/SlimMessageBus.Host/IMessageBusBulkProducer.cs
@@ -11,8 +11,11 @@ public interface IMessageBusBulkProducer
///
/// The maximum number of messages that can take part in a . Null if transaction scopes are not supported by the message bus.
///
- int? MaxMessagesPerTransaction { get; }
- Task<(IReadOnlyCollection Dispatched, Exception Exception)> ProduceToTransport(IReadOnlyCollection envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken = default) where T: Envelope;
+ int? MaxMessagesPerTransaction { get; }
+
+ Task> ProduceToTransportBulk(IReadOnlyCollection envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken = default) where T : BulkMessageEnvelope;
}
-public record Envelope(object Message, Type MessageType, IDictionary Headers);
+public record BulkMessageEnvelope(object Message, Type MessageType, IDictionary Headers);
+
+public record ProduceToTransportBulkResult(IReadOnlyCollection Dispatched, Exception Exception);
\ No newline at end of file
diff --git a/src/SlimMessageBus.Host/MessageBusBase.cs b/src/SlimMessageBus.Host/MessageBusBase.cs
index bafb2714..c3f0f98c 100644
--- a/src/SlimMessageBus.Host/MessageBusBase.cs
+++ b/src/SlimMessageBus.Host/MessageBusBase.cs
@@ -4,7 +4,7 @@ namespace SlimMessageBus.Host;
using SlimMessageBus.Host.Consumer;
using SlimMessageBus.Host.Services;
-
+
public abstract class MessageBusBase : MessageBusBase where TProviderSettings : class
{
public TProviderSettings ProviderSettings { get; }
@@ -420,8 +420,8 @@ protected virtual string GetDefaultPath(Type messageType, ProducerSettings produ
protected async Task ProduceToTransport(object message, Type messageType, string path, IDictionary messageHeaders, IMessageBusTarget targetBus, CancellationToken cancellationToken = default)
{
- var envelope = new Envelope(message, messageType, messageHeaders);
- var result = await ProduceToTransport([envelope], path, targetBus, cancellationToken);
+ var envelope = new BulkMessageEnvelope(message, messageType, messageHeaders);
+ var result = await ProduceToTransportBulk([envelope], path, targetBus, cancellationToken);
if (result.Exception != null)
{
if (result.Exception is ProducerMessageBusException)
@@ -433,7 +433,7 @@ protected async Task ProduceToTransport(object message, Type messageType, string
}
}
- protected abstract Task<(IReadOnlyCollection Dispatched, Exception Exception)> ProduceToTransport(IReadOnlyCollection envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken = default) where T : Envelope;
+ protected abstract Task> ProduceToTransportBulk(IReadOnlyCollection envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken) where T : BulkMessageEnvelope;
public virtual Task ProducePublish(object message, string path = null, IDictionary headers = null, IMessageBusTarget targetBus = null, CancellationToken cancellationToken = default)
{
@@ -718,7 +718,8 @@ public virtual Task OnResponseArrived(byte[] responsePayload, string
///
protected virtual string GenerateRequestId() => Guid.NewGuid().ToString("N", CultureInfo.InvariantCulture);
- public virtual bool IsMessageScopeEnabled(ConsumerSettings consumerSettings, IDictionary consumerContextProperties) => consumerSettings.IsMessageScopeEnabled ?? Settings.IsMessageScopeEnabled ?? true;
+ public virtual bool IsMessageScopeEnabled(ConsumerSettings consumerSettings, IDictionary consumerContextProperties)
+ => consumerSettings.IsMessageScopeEnabled ?? Settings.IsMessageScopeEnabled ?? true;
public virtual IMessageScope CreateMessageScope(ConsumerSettings consumerSettings, object message, IDictionary consumerContextProperties, IServiceProvider currentServiceProvider = null)
{
@@ -726,14 +727,8 @@ public virtual IMessageScope CreateMessageScope(ConsumerSettings consumerSetting
return new MessageScopeWrapper(_logger, currentServiceProvider ?? Settings.ServiceProvider, createMessageScope, message);
}
- public virtual Task ProvisionTopology() => Task.CompletedTask;
-
- Task<(IReadOnlyCollection Dispatched, Exception Exception)> IMessageBusBulkProducer.ProduceToTransport(
- IReadOnlyCollection envelopes,
- string path,
- IMessageBusTarget targetBus,
- CancellationToken cancellationToken)
- {
- return ProduceToTransport(envelopes, path, targetBus, cancellationToken);
- }
-}
\ No newline at end of file
+ public virtual Task ProvisionTopology() => Task.CompletedTask;
+
+ Task> IMessageBusBulkProducer.ProduceToTransportBulk(IReadOnlyCollection envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken)
+ => ProduceToTransportBulk(envelopes, path, targetBus, cancellationToken);
+}
diff --git a/src/SlimMessageBus.Host/Retry.cs b/src/SlimMessageBus.Host/Retry.cs
index 3812b7c4..5c58579d 100644
--- a/src/SlimMessageBus.Host/Retry.cs
+++ b/src/SlimMessageBus.Host/Retry.cs
@@ -2,21 +2,23 @@
public static class Retry
{
- private static Random _random = new(); // NOSONAR
+ private static readonly Random _random = new(); // NOSONAR
public static async Task WithDelay(Func operation, Func shouldRetry, TimeSpan? delay, TimeSpan? jitter = default, CancellationToken cancellationToken = default)
- {
- if (operation is null)
- {
- throw new ArgumentNullException(nameof(operation));
- }
-
- if (shouldRetry is null)
- {
- throw new ArgumentNullException(nameof(shouldRetry));
- }
-
- var pass = 0;
+ {
+#if NETSTANDARD2_0
+ if (operation is null) throw new ArgumentNullException(nameof(operation));
+#else
+ ArgumentNullException.ThrowIfNull(operation);
+#endif
+
+#if NETSTANDARD2_0
+ if (shouldRetry is null) throw new ArgumentNullException(nameof(shouldRetry));
+#else
+ ArgumentNullException.ThrowIfNull(shouldRetry);
+#endif
+
+ var attempt = 0;
do
{
cancellationToken.ThrowIfCancellationRequested();
@@ -32,7 +34,7 @@ public static async Task WithDelay(Func operation, Func
}
catch (Exception e)
{
- if (!shouldRetry(e, pass++))
+ if (!shouldRetry(e, attempt++))
{
throw;
}
diff --git a/src/Tests/SlimMessageBus.Host.Outbox.Test/Interceptors/OutboxSendingTaskTests.cs b/src/Tests/SlimMessageBus.Host.Outbox.Test/Interceptors/OutboxSendingTaskTests.cs
index 21e537bd..65e12900 100644
--- a/src/Tests/SlimMessageBus.Host.Outbox.Test/Interceptors/OutboxSendingTaskTests.cs
+++ b/src/Tests/SlimMessageBus.Host.Outbox.Test/Interceptors/OutboxSendingTaskTests.cs
@@ -29,15 +29,15 @@ public OutboxSendingTaskTests()
[Fact]
public async Task DispatchBatchAsync_ShouldReturnSuccess_WhenAllMessagesArePublished()
{
- var batch = new List
+ var batch = new List
{
- new EnvelopeWithId(Guid.NewGuid(), "Message1", typeof(string), new Dictionary()),
- new EnvelopeWithId(Guid.NewGuid(), "Message2", typeof(string), new Dictionary())
+ new(Guid.NewGuid(), "Message1", typeof(string), new Dictionary()),
+ new(Guid.NewGuid(), "Message2", typeof(string), new Dictionary())
}.AsReadOnly();
- var results = (Dispatched: batch, Exception: (Exception)null);
+ var results = new ProduceToTransportBulkResult(batch, null);
- _producerMock.Setup(x => x.ProduceToTransport(batch, It.IsAny(), It.IsAny(), It.IsAny()))
+ _producerMock.Setup(x => x.ProduceToTransportBulk(batch, It.IsAny(), It.IsAny(), It.IsAny()))
.ReturnsAsync(results);
var (success, published) = await _sut.DispatchBatchAsync(_outboxRepositoryMock.Object, _producerMock.Object, _messageBusTargetMock.Object, batch, "busName", "path", CancellationToken.None);
@@ -49,15 +49,15 @@ public async Task DispatchBatchAsync_ShouldReturnSuccess_WhenAllMessagesArePubli
[Fact]
public async Task DispatchBatchAsync_ShouldReturnFailure_WhenNotAllMessagesArePublished()
{
- var batch = new List
+ var batch = new List
{
- new EnvelopeWithId(Guid.NewGuid(), "Message1", typeof(string), new Dictionary()),
- new EnvelopeWithId(Guid.NewGuid(), "Message2", typeof(string), new Dictionary())
+ new(Guid.NewGuid(), "Message1", typeof(string), new Dictionary()),
+ new(Guid.NewGuid(), "Message2", typeof(string), new Dictionary())
}.AsReadOnly();
- var results = (Dispatched: new List { batch.First() }, Exception: (Exception)null);
+ var results = new ProduceToTransportBulkResult([batch.First()], null);
- _producerMock.Setup(x => x.ProduceToTransport(batch, It.IsAny(), It.IsAny(), It.IsAny()))
+ _producerMock.Setup(x => x.ProduceToTransportBulk(batch, It.IsAny(), It.IsAny(), It.IsAny()))
.ReturnsAsync(results);
var (success, published) = await _sut.DispatchBatchAsync(_outboxRepositoryMock.Object, _producerMock.Object, _messageBusTargetMock.Object, batch, "busName", "path", CancellationToken.None);
@@ -69,15 +69,15 @@ public async Task DispatchBatchAsync_ShouldReturnFailure_WhenNotAllMessagesArePu
[Fact]
public async Task DispatchBatchAsync_ShouldIncrementDeliveryAttempts_WhenNotAllMessagesArePublished()
{
- var batch = new List
+ var batch = new List
{
- new EnvelopeWithId(Guid.NewGuid(), "Message1", typeof(string), new Dictionary()),
- new EnvelopeWithId(Guid.NewGuid(), "Message2", typeof(string), new Dictionary())
+ new(Guid.NewGuid(), "Message1", typeof(string), new Dictionary()),
+ new(Guid.NewGuid(), "Message2", typeof(string), new Dictionary())
}.AsReadOnly();
- var results = (Dispatched: new List { batch.First() }, Exception: (Exception)null);
+ var results = new ProduceToTransportBulkResult([batch.First()], null);
- _producerMock.Setup(x => x.ProduceToTransport(batch, It.IsAny(), It.IsAny(), It.IsAny()))
+ _producerMock.Setup(x => x.ProduceToTransportBulk(batch, It.IsAny(), It.IsAny(), It.IsAny()))
.ReturnsAsync(results);
await _sut.DispatchBatchAsync(_outboxRepositoryMock.Object, _producerMock.Object, _messageBusTargetMock.Object, batch, "busName", "path", CancellationToken.None);
diff --git a/src/Tests/SlimMessageBus.Host.Test/MessageBusTested.cs b/src/Tests/SlimMessageBus.Host.Test/MessageBusTested.cs
index 44a2ab68..0e3e427c 100644
--- a/src/Tests/SlimMessageBus.Host.Test/MessageBusTested.cs
+++ b/src/Tests/SlimMessageBus.Host.Test/MessageBusTested.cs
@@ -35,7 +35,7 @@ protected internal override Task OnStop()
return base.OnStop();
}
- protected override async Task<(IReadOnlyCollection Dispatched, Exception Exception)> ProduceToTransport(IReadOnlyCollection envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken = default)
+ protected override async Task> ProduceToTransportBulk(IReadOnlyCollection envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken = default)
{
await EnsureInitFinished();
@@ -72,10 +72,10 @@ protected internal override Task OnStop()
}
catch (Exception ex)
{
- return (dispatched, ex);
+ return new(dispatched, ex);
}
- return (dispatched, null);
+ return new(dispatched, null);
}
public override DateTimeOffset CurrentTime => CurrentTimeProvider();