diff --git a/src/Host.Plugin.Properties.xml b/src/Host.Plugin.Properties.xml index a6cde8b2..854aff3c 100644 --- a/src/Host.Plugin.Properties.xml +++ b/src/Host.Plugin.Properties.xml @@ -4,7 +4,7 @@ - 2.4.0-rc7 + 2.4.0-rc8 \ No newline at end of file diff --git a/src/SlimMessageBus.Host.AzureEventHub/EventHubMessageBus.cs b/src/SlimMessageBus.Host.AzureEventHub/EventHubMessageBus.cs index 6bd0de8c..4d2eb13c 100644 --- a/src/SlimMessageBus.Host.AzureEventHub/EventHubMessageBus.cs +++ b/src/SlimMessageBus.Host.AzureEventHub/EventHubMessageBus.cs @@ -104,43 +104,42 @@ protected override async Task OnStart() } } - protected override async Task<(IReadOnlyCollection Dispatched, Exception Exception)> ProduceToTransport(IReadOnlyCollection envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken = default) + protected override async Task> ProduceToTransportBulk(IReadOnlyCollection envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken) { AssertActive(); var dispatched = new List(envelopes.Count); try { - var messages = envelopes + var messagesByPartition = envelopes .Where(x => x.Message != null) - .Select( - envelope => - { - var messageType = envelope.Message?.GetType(); - var messagePayload = Serializer.Serialize(envelope.MessageType, envelope.Message); + .Select(envelope => + { + var messageType = envelope.Message?.GetType(); + var messagePayload = Serializer.Serialize(envelope.MessageType, envelope.Message); - _logger.LogDebug("Producing message {Message} of Type {MessageType} on Path {Path} with Size {MessageSize}", envelope.Message, messageType?.Name, path, messagePayload?.Length ?? 0); + _logger.LogDebug("Producing message {Message} of Type {MessageType} on Path {Path} with Size {MessageSize}", envelope.Message, messageType?.Name, path, messagePayload?.Length ?? 0); - var ev = envelope.Message != null ? new EventData(messagePayload) : new EventData(); + var ev = envelope.Message != null ? new EventData(messagePayload) : new EventData(); - if (envelope.Headers != null) + if (envelope.Headers != null) + { + foreach (var header in envelope.Headers) { - foreach (var header in envelope.Headers) - { - ev.Properties.Add(header.Key, header.Value); - } + ev.Properties.Add(header.Key, header.Value); } + } - var partitionKey = messageType != null - ? GetPartitionKey(messageType, envelope.Message) - : null; + var partitionKey = messageType != null + ? GetPartitionKey(messageType, envelope.Message) + : null; - return (Envelope: envelope, Message: ev, PartitionKey: partitionKey); - }) - .GroupBy(x => x.PartitionKey); + return (Envelope: envelope, Message: ev, PartitionKey: partitionKey); + }) + .GroupBy(x => x.PartitionKey); var producer = _producerByPath[path]; - foreach (var partition in messages) + foreach (var partition in messagesByPartition) { EventDataBatch batch = null; try @@ -184,7 +183,7 @@ protected override async Task OnStart() batch = null; } - return (dispatched, null); + return new(dispatched, null); } finally { @@ -194,10 +193,10 @@ protected override async Task OnStart() } catch (Exception ex) { - return (dispatched, ex); + return new(dispatched, ex); } - return (dispatched, null); + return new(dispatched, null); } #endregion diff --git a/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusMessageBus.cs b/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusMessageBus.cs index 2c86e5f8..b1143129 100644 --- a/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusMessageBus.cs +++ b/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusMessageBus.cs @@ -116,15 +116,14 @@ void AddConsumerFrom(TopicSubscriptionParams topicSubscription, IMessageProcesso } } - protected override async Task<(IReadOnlyCollection Dispatched, Exception Exception)> ProduceToTransport(IReadOnlyCollection envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken = default) + protected override async Task> ProduceToTransportBulk(IReadOnlyCollection envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken) { - async Task SendBatchAsync(ServiceBusSender senderClient, ServiceBusMessageBatch batch, CancellationToken cancellationToken) - { - await Retry.WithDelay( + Task SendBatchAsync(ServiceBusSender senderClient, ServiceBusMessageBatch batch, CancellationToken cancellationToken) => + Retry.WithDelay( async cancellationToken => { await senderClient.SendMessagesAsync(batch, cancellationToken).ConfigureAwait(false); - _logger.LogDebug("Batch of {BatchSize} message(s) dispatched to {path} ({SizeInBytes} bytes)", batch.Count, path, batch.SizeInBytes); + _logger.LogDebug("Batch of {BatchSize} message(s) dispatched to {Path} ({SizeInBytes} bytes)", batch.Count, path, batch.SizeInBytes); }, (exception, attempt) => { @@ -135,18 +134,16 @@ await Retry.WithDelay( _logger.LogWarning("Service bus throttled. Backing off (Attempt: {Attempt}).", attempt); return true; } - return false; }, - TimeSpan.FromSeconds(2), - TimeSpan.FromSeconds(1), + delay: TimeSpan.FromSeconds(2), + jitter: TimeSpan.FromSeconds(1), cancellationToken); - } AssertActive(); - var messages = envelopes.Select( - envelope => + var messages = envelopes + .Select(envelope => { var messageType = envelope.Message?.GetType(); var messagePayload = Serializer.Serialize(envelope.MessageType, envelope.Message); @@ -190,12 +187,12 @@ await Retry.WithDelay( await senderClient.SendMessageAsync(item.ServiceBusMessage, cancellationToken: cancellationToken).ConfigureAwait(false); _logger.LogDebug("Delivered item {Message} of type {MessageType} to {Path}", item.Envelope.Message, item.Envelope.MessageType?.Name, path); - return ([item.Envelope], null); + return new([item.Envelope], null); } catch (Exception ex) { _logger.LogDebug(ex, "Producing message {Message} of type {MessageType} to path {Path} resulted in error {Error}", item.Envelope.Message, item.Envelope.MessageType?.Name, path, ex.Message); - return ([], ex); + return new([], ex); } } @@ -233,12 +230,12 @@ await Retry.WithDelay( batch = null; } - return (dispatched, null); + return new(dispatched, null); } catch (Exception ex) { _logger.LogError(ex, "Producing message batch to path {Path} resulted in error {Error}", path, ex.Message); - return (dispatched, ex); + return new(dispatched, ex); } finally { diff --git a/src/SlimMessageBus.Host.Kafka/KafkaMessageBus.cs b/src/SlimMessageBus.Host.Kafka/KafkaMessageBus.cs index 48d73149..f9d4db7f 100644 --- a/src/SlimMessageBus.Host.Kafka/KafkaMessageBus.cs +++ b/src/SlimMessageBus.Host.Kafka/KafkaMessageBus.cs @@ -114,7 +114,7 @@ protected override async ValueTask DisposeAsyncCore() } } - protected override async Task<(IReadOnlyCollection Dispatched, Exception Exception)> ProduceToTransport(IReadOnlyCollection envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken = default) + protected override async Task> ProduceToTransportBulk(IReadOnlyCollection envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken) { AssertActive(); @@ -172,10 +172,10 @@ protected override async ValueTask DisposeAsyncCore() } catch (Exception ex) { - return (dispatched, ex); + return new(dispatched, ex); } - return (dispatched, null); + return new(dispatched, null); } protected byte[] GetMessageKey(ProducerSettings producerSettings, Type messageType, object message, string topic) diff --git a/src/SlimMessageBus.Host.Memory/MemoryMessageBus.cs b/src/SlimMessageBus.Host.Memory/MemoryMessageBus.cs index fa7cfac4..ee351a37 100644 --- a/src/SlimMessageBus.Host.Memory/MemoryMessageBus.cs +++ b/src/SlimMessageBus.Host.Memory/MemoryMessageBus.cs @@ -123,8 +123,8 @@ private IMessageProcessorQueue CreateMessageProcessorQueue(IMessageProcessor(), CancellationToken); } - protected override Task<(IReadOnlyCollection Dispatched, Exception Exception)> ProduceToTransport(IReadOnlyCollection envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken = default) - => Task.FromResult<(IReadOnlyCollection Dispatched, Exception Exception)>(([], null)); // Not used + protected override Task> ProduceToTransportBulk(IReadOnlyCollection envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken) + => Task.FromResult>(new([], null)); // Not used public override Task ProduceResponse(string requestId, object request, IReadOnlyDictionary requestHeaders, object response, Exception responseException, IMessageTypeConsumerInvokerSettings consumerInvoker) => Task.CompletedTask; // Not used to responses diff --git a/src/SlimMessageBus.Host.Mqtt/MqttMessageBus.cs b/src/SlimMessageBus.Host.Mqtt/MqttMessageBus.cs index ab86b701..db25d6e8 100644 --- a/src/SlimMessageBus.Host.Mqtt/MqttMessageBus.cs +++ b/src/SlimMessageBus.Host.Mqtt/MqttMessageBus.cs @@ -1,7 +1,5 @@ namespace SlimMessageBus.Host.Mqtt; -using System.Security.Cryptography; - using MQTTnet.Extensions.ManagedClient; public class MqttMessageBus : MessageBusBase @@ -103,10 +101,10 @@ private Task OnMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg) return Task.CompletedTask; } - protected override async Task<(IReadOnlyCollection Dispatched, Exception Exception)> ProduceToTransport(IReadOnlyCollection envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken = default) + protected override async Task> ProduceToTransportBulk(IReadOnlyCollection envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken) { - var messages = envelopes.Select( - envelope => + var messages = envelopes + .Select(envelope => { var messagePayload = Serializer.Serialize(envelope.MessageType, envelope.Message); @@ -156,10 +154,10 @@ private Task OnMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg) } catch (Exception ex) { - return (dispatched, ex); + return new(dispatched, ex); } } - return (dispatched, null); + return new(dispatched, null); } } \ No newline at end of file diff --git a/src/SlimMessageBus.Host.Outbox/Interceptors/OutboxSendingTask.cs b/src/SlimMessageBus.Host.Outbox/Interceptors/OutboxSendingTask.cs index ba20f691..39c506fa 100644 --- a/src/SlimMessageBus.Host.Outbox/Interceptors/OutboxSendingTask.cs +++ b/src/SlimMessageBus.Host.Outbox/Interceptors/OutboxSendingTask.cs @@ -252,7 +252,7 @@ internal async Task SendMessages(IServiceProvider serviceProvider, IOutboxR outboxMessage => { var message = bus.Serializer.Deserialize(outboxMessage.MessageType, outboxMessage.MessagePayload); - return new EnvelopeWithId(outboxMessage.Id, message, outboxMessage.MessageType, outboxMessage.Headers ?? new Dictionary()); + return new OutboxBulkMessage(outboxMessage.Id, message, outboxMessage.MessageType, outboxMessage.Headers ?? new Dictionary()); }) .Batch(bulkProducer.MaxMessagesPerTransaction ?? defaultBatchSize); @@ -271,12 +271,12 @@ internal async Task SendMessages(IServiceProvider serviceProvider, IOutboxR return count; } - internal async Task<(bool Success, int Published)> DispatchBatchAsync(IOutboxRepository outboxRepository, IMessageBusBulkProducer producer, IMessageBusTarget messageBusTarget, IReadOnlyCollection batch, string busName, string path, CancellationToken cancellationToken) + internal async Task<(bool Success, int Published)> DispatchBatchAsync(IOutboxRepository outboxRepository, IMessageBusBulkProducer producer, IMessageBusTarget messageBusTarget, IReadOnlyCollection batch, string busName, string path, CancellationToken cancellationToken) { _logger.LogDebug("Publishing batch of {MessageCount} messages to pathGroup {Path} on {BusName} bus", batch.Count, path, busName); // TOOD: Enclose in a transaction - var results = await producer.ProduceToTransport(batch, path, messageBusTarget, cancellationToken).ConfigureAwait(false); + var results = await producer.ProduceToTransportBulk(batch, path, messageBusTarget, cancellationToken).ConfigureAwait(false); if (cancellationToken.IsCancellationRequested && results.Dispatched.Count == 0) { // if cancellation has been requested, only return if no messages were published @@ -311,14 +311,14 @@ private static IMasterMessageBus GetBus(ICompositeMessageBus compositeMessageBus return null; } - public record EnvelopeWithId : Envelope + public record OutboxBulkMessage : BulkMessageEnvelope { - public EnvelopeWithId(Guid id, object Message, Type MessageType, IDictionary Headers) - : base(Message, MessageType, Headers) + public Guid Id { get; } + + public OutboxBulkMessage(Guid id, object message, Type messageType, IDictionary headers) + : base(message, messageType, headers) { Id = id; } - - public Guid Id { get; } } } diff --git a/src/SlimMessageBus.Host.RabbitMQ/RabbitMqMessageBus.cs b/src/SlimMessageBus.Host.RabbitMQ/RabbitMqMessageBus.cs index 87f50be7..528cd38f 100644 --- a/src/SlimMessageBus.Host.RabbitMQ/RabbitMqMessageBus.cs +++ b/src/SlimMessageBus.Host.RabbitMQ/RabbitMqMessageBus.cs @@ -128,7 +128,7 @@ protected override async ValueTask DisposeAsyncCore() } } - protected override async Task<(IReadOnlyCollection Dispatched, Exception Exception)> ProduceToTransport(IReadOnlyCollection envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken = default) + protected override async Task> ProduceToTransportBulk(IReadOnlyCollection envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken) { await EnsureInitFinished(); @@ -178,9 +178,9 @@ protected override async ValueTask DisposeAsyncCore() } catch (Exception ex) { - return (dispatched, ex); + return new(dispatched, ex); } - return (dispatched, null); + return new(dispatched, null); } } diff --git a/src/SlimMessageBus.Host.Redis/RedisMessageBus.cs b/src/SlimMessageBus.Host.Redis/RedisMessageBus.cs index e6c01fca..7e6d2c2c 100644 --- a/src/SlimMessageBus.Host.Redis/RedisMessageBus.cs +++ b/src/SlimMessageBus.Host.Redis/RedisMessageBus.cs @@ -147,9 +147,13 @@ void AddTopicConsumer(string topic, ISubscriber subscriber, IMessageProcessor Dispatched, Exception Exception)> ProduceToTransport(IReadOnlyCollection envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken = default) + protected override async Task> ProduceToTransportBulk(IReadOnlyCollection envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken) { +#if NETSTANDARD2_0 if (envelopes is null) throw new ArgumentNullException(nameof(envelopes)); +#else + ArgumentNullException.ThrowIfNull(envelopes); +#endif AssertActive(); @@ -184,10 +188,10 @@ void AddTopicConsumer(string topic, ISubscriber subscriber, IMessageProcessor { public SqlMessageBus(MessageBusSettings settings, SqlMessageBusSettings providerSettings) @@ -27,12 +25,12 @@ public override async Task ProvisionTopology() await provisioningService.Migrate(CancellationToken); // provisining happens asynchronously } - protected override async Task<(IReadOnlyCollection Dispatched, Exception Exception)> ProduceToTransport(IReadOnlyCollection envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken = default) + protected override Task> ProduceToTransportBulk(IReadOnlyCollection envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken) { var sqlRepository = targetBus.ServiceProvider.GetService(); // ToDo: Save to table - return ([], new NotImplementedException()); + return Task.FromResult(new ProduceToTransportBulkResult([], new NotImplementedException())); } } \ No newline at end of file diff --git a/src/SlimMessageBus.Host/IMessageBusBulkProducer.cs b/src/SlimMessageBus.Host/IMessageBusBulkProducer.cs index d37a192b..a941a3a7 100644 --- a/src/SlimMessageBus.Host/IMessageBusBulkProducer.cs +++ b/src/SlimMessageBus.Host/IMessageBusBulkProducer.cs @@ -11,8 +11,11 @@ public interface IMessageBusBulkProducer /// /// The maximum number of messages that can take part in a . Null if transaction scopes are not supported by the message bus. /// - int? MaxMessagesPerTransaction { get; } - Task<(IReadOnlyCollection Dispatched, Exception Exception)> ProduceToTransport(IReadOnlyCollection envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken = default) where T: Envelope; + int? MaxMessagesPerTransaction { get; } + + Task> ProduceToTransportBulk(IReadOnlyCollection envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken = default) where T : BulkMessageEnvelope; } -public record Envelope(object Message, Type MessageType, IDictionary Headers); +public record BulkMessageEnvelope(object Message, Type MessageType, IDictionary Headers); + +public record ProduceToTransportBulkResult(IReadOnlyCollection Dispatched, Exception Exception); \ No newline at end of file diff --git a/src/SlimMessageBus.Host/MessageBusBase.cs b/src/SlimMessageBus.Host/MessageBusBase.cs index bafb2714..c3f0f98c 100644 --- a/src/SlimMessageBus.Host/MessageBusBase.cs +++ b/src/SlimMessageBus.Host/MessageBusBase.cs @@ -4,7 +4,7 @@ namespace SlimMessageBus.Host; using SlimMessageBus.Host.Consumer; using SlimMessageBus.Host.Services; - + public abstract class MessageBusBase : MessageBusBase where TProviderSettings : class { public TProviderSettings ProviderSettings { get; } @@ -420,8 +420,8 @@ protected virtual string GetDefaultPath(Type messageType, ProducerSettings produ protected async Task ProduceToTransport(object message, Type messageType, string path, IDictionary messageHeaders, IMessageBusTarget targetBus, CancellationToken cancellationToken = default) { - var envelope = new Envelope(message, messageType, messageHeaders); - var result = await ProduceToTransport([envelope], path, targetBus, cancellationToken); + var envelope = new BulkMessageEnvelope(message, messageType, messageHeaders); + var result = await ProduceToTransportBulk([envelope], path, targetBus, cancellationToken); if (result.Exception != null) { if (result.Exception is ProducerMessageBusException) @@ -433,7 +433,7 @@ protected async Task ProduceToTransport(object message, Type messageType, string } } - protected abstract Task<(IReadOnlyCollection Dispatched, Exception Exception)> ProduceToTransport(IReadOnlyCollection envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken = default) where T : Envelope; + protected abstract Task> ProduceToTransportBulk(IReadOnlyCollection envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken) where T : BulkMessageEnvelope; public virtual Task ProducePublish(object message, string path = null, IDictionary headers = null, IMessageBusTarget targetBus = null, CancellationToken cancellationToken = default) { @@ -718,7 +718,8 @@ public virtual Task OnResponseArrived(byte[] responsePayload, string /// protected virtual string GenerateRequestId() => Guid.NewGuid().ToString("N", CultureInfo.InvariantCulture); - public virtual bool IsMessageScopeEnabled(ConsumerSettings consumerSettings, IDictionary consumerContextProperties) => consumerSettings.IsMessageScopeEnabled ?? Settings.IsMessageScopeEnabled ?? true; + public virtual bool IsMessageScopeEnabled(ConsumerSettings consumerSettings, IDictionary consumerContextProperties) + => consumerSettings.IsMessageScopeEnabled ?? Settings.IsMessageScopeEnabled ?? true; public virtual IMessageScope CreateMessageScope(ConsumerSettings consumerSettings, object message, IDictionary consumerContextProperties, IServiceProvider currentServiceProvider = null) { @@ -726,14 +727,8 @@ public virtual IMessageScope CreateMessageScope(ConsumerSettings consumerSetting return new MessageScopeWrapper(_logger, currentServiceProvider ?? Settings.ServiceProvider, createMessageScope, message); } - public virtual Task ProvisionTopology() => Task.CompletedTask; - - Task<(IReadOnlyCollection Dispatched, Exception Exception)> IMessageBusBulkProducer.ProduceToTransport( - IReadOnlyCollection envelopes, - string path, - IMessageBusTarget targetBus, - CancellationToken cancellationToken) - { - return ProduceToTransport(envelopes, path, targetBus, cancellationToken); - } -} \ No newline at end of file + public virtual Task ProvisionTopology() => Task.CompletedTask; + + Task> IMessageBusBulkProducer.ProduceToTransportBulk(IReadOnlyCollection envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken) + => ProduceToTransportBulk(envelopes, path, targetBus, cancellationToken); +} diff --git a/src/SlimMessageBus.Host/Retry.cs b/src/SlimMessageBus.Host/Retry.cs index 3812b7c4..5c58579d 100644 --- a/src/SlimMessageBus.Host/Retry.cs +++ b/src/SlimMessageBus.Host/Retry.cs @@ -2,21 +2,23 @@ public static class Retry { - private static Random _random = new(); // NOSONAR + private static readonly Random _random = new(); // NOSONAR public static async Task WithDelay(Func operation, Func shouldRetry, TimeSpan? delay, TimeSpan? jitter = default, CancellationToken cancellationToken = default) - { - if (operation is null) - { - throw new ArgumentNullException(nameof(operation)); - } - - if (shouldRetry is null) - { - throw new ArgumentNullException(nameof(shouldRetry)); - } - - var pass = 0; + { +#if NETSTANDARD2_0 + if (operation is null) throw new ArgumentNullException(nameof(operation)); +#else + ArgumentNullException.ThrowIfNull(operation); +#endif + +#if NETSTANDARD2_0 + if (shouldRetry is null) throw new ArgumentNullException(nameof(shouldRetry)); +#else + ArgumentNullException.ThrowIfNull(shouldRetry); +#endif + + var attempt = 0; do { cancellationToken.ThrowIfCancellationRequested(); @@ -32,7 +34,7 @@ public static async Task WithDelay(Func operation, Func } catch (Exception e) { - if (!shouldRetry(e, pass++)) + if (!shouldRetry(e, attempt++)) { throw; } diff --git a/src/Tests/SlimMessageBus.Host.Outbox.Test/Interceptors/OutboxSendingTaskTests.cs b/src/Tests/SlimMessageBus.Host.Outbox.Test/Interceptors/OutboxSendingTaskTests.cs index 21e537bd..65e12900 100644 --- a/src/Tests/SlimMessageBus.Host.Outbox.Test/Interceptors/OutboxSendingTaskTests.cs +++ b/src/Tests/SlimMessageBus.Host.Outbox.Test/Interceptors/OutboxSendingTaskTests.cs @@ -29,15 +29,15 @@ public OutboxSendingTaskTests() [Fact] public async Task DispatchBatchAsync_ShouldReturnSuccess_WhenAllMessagesArePublished() { - var batch = new List + var batch = new List { - new EnvelopeWithId(Guid.NewGuid(), "Message1", typeof(string), new Dictionary()), - new EnvelopeWithId(Guid.NewGuid(), "Message2", typeof(string), new Dictionary()) + new(Guid.NewGuid(), "Message1", typeof(string), new Dictionary()), + new(Guid.NewGuid(), "Message2", typeof(string), new Dictionary()) }.AsReadOnly(); - var results = (Dispatched: batch, Exception: (Exception)null); + var results = new ProduceToTransportBulkResult(batch, null); - _producerMock.Setup(x => x.ProduceToTransport(batch, It.IsAny(), It.IsAny(), It.IsAny())) + _producerMock.Setup(x => x.ProduceToTransportBulk(batch, It.IsAny(), It.IsAny(), It.IsAny())) .ReturnsAsync(results); var (success, published) = await _sut.DispatchBatchAsync(_outboxRepositoryMock.Object, _producerMock.Object, _messageBusTargetMock.Object, batch, "busName", "path", CancellationToken.None); @@ -49,15 +49,15 @@ public async Task DispatchBatchAsync_ShouldReturnSuccess_WhenAllMessagesArePubli [Fact] public async Task DispatchBatchAsync_ShouldReturnFailure_WhenNotAllMessagesArePublished() { - var batch = new List + var batch = new List { - new EnvelopeWithId(Guid.NewGuid(), "Message1", typeof(string), new Dictionary()), - new EnvelopeWithId(Guid.NewGuid(), "Message2", typeof(string), new Dictionary()) + new(Guid.NewGuid(), "Message1", typeof(string), new Dictionary()), + new(Guid.NewGuid(), "Message2", typeof(string), new Dictionary()) }.AsReadOnly(); - var results = (Dispatched: new List { batch.First() }, Exception: (Exception)null); + var results = new ProduceToTransportBulkResult([batch.First()], null); - _producerMock.Setup(x => x.ProduceToTransport(batch, It.IsAny(), It.IsAny(), It.IsAny())) + _producerMock.Setup(x => x.ProduceToTransportBulk(batch, It.IsAny(), It.IsAny(), It.IsAny())) .ReturnsAsync(results); var (success, published) = await _sut.DispatchBatchAsync(_outboxRepositoryMock.Object, _producerMock.Object, _messageBusTargetMock.Object, batch, "busName", "path", CancellationToken.None); @@ -69,15 +69,15 @@ public async Task DispatchBatchAsync_ShouldReturnFailure_WhenNotAllMessagesArePu [Fact] public async Task DispatchBatchAsync_ShouldIncrementDeliveryAttempts_WhenNotAllMessagesArePublished() { - var batch = new List + var batch = new List { - new EnvelopeWithId(Guid.NewGuid(), "Message1", typeof(string), new Dictionary()), - new EnvelopeWithId(Guid.NewGuid(), "Message2", typeof(string), new Dictionary()) + new(Guid.NewGuid(), "Message1", typeof(string), new Dictionary()), + new(Guid.NewGuid(), "Message2", typeof(string), new Dictionary()) }.AsReadOnly(); - var results = (Dispatched: new List { batch.First() }, Exception: (Exception)null); + var results = new ProduceToTransportBulkResult([batch.First()], null); - _producerMock.Setup(x => x.ProduceToTransport(batch, It.IsAny(), It.IsAny(), It.IsAny())) + _producerMock.Setup(x => x.ProduceToTransportBulk(batch, It.IsAny(), It.IsAny(), It.IsAny())) .ReturnsAsync(results); await _sut.DispatchBatchAsync(_outboxRepositoryMock.Object, _producerMock.Object, _messageBusTargetMock.Object, batch, "busName", "path", CancellationToken.None); diff --git a/src/Tests/SlimMessageBus.Host.Test/MessageBusTested.cs b/src/Tests/SlimMessageBus.Host.Test/MessageBusTested.cs index 44a2ab68..0e3e427c 100644 --- a/src/Tests/SlimMessageBus.Host.Test/MessageBusTested.cs +++ b/src/Tests/SlimMessageBus.Host.Test/MessageBusTested.cs @@ -35,7 +35,7 @@ protected internal override Task OnStop() return base.OnStop(); } - protected override async Task<(IReadOnlyCollection Dispatched, Exception Exception)> ProduceToTransport(IReadOnlyCollection envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken = default) + protected override async Task> ProduceToTransportBulk(IReadOnlyCollection envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken = default) { await EnsureInitFinished(); @@ -72,10 +72,10 @@ protected internal override Task OnStop() } catch (Exception ex) { - return (dispatched, ex); + return new(dispatched, ex); } - return (dispatched, null); + return new(dispatched, null); } public override DateTimeOffset CurrentTime => CurrentTimeProvider();