diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index d5a8161a..89c8cd48 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -22,27 +22,52 @@ jobs: steps: - uses: actions/checkout@v3 + - name: Setup .NET uses: actions/setup-dotnet@v3 with: dotnet-version: 7.0.x + - name: Restore dependencies run: dotnet restore $SOLUTION_NAME working-directory: ./src + + - name: Install Coverlet + run: find . -name "*.Test.csproj" | xargs -t -I {} dotnet add {} package coverlet.collector + working-directory: ./src + + - name: SonarCloud - Setup Java17 + uses: actions/setup-java@v3 + with: + distribution: "zulu" + java-version: "17" + + - name: SonarCloud - Install SonarCloud scanner + run: dotnet tool update dotnet-sonarscanner --tool-path ./.sonar/scanner + + - name: SonarCloud - SonarScanner Begin + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} # Needed to get PR information, if any + SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }} + run: ../.sonar/scanner/dotnet-sonarscanner begin /k:"zarusz_SlimMessageBus" /o:"zarusz" /d:sonar.token="${{ secrets.SONAR_TOKEN }}" /d:sonar.host.url="https://sonarcloud.io" /d:sonar.cs.opencover.reportsPaths="**/coverage.opencover.xml" /d:sonar.exclusions="Samples/**/*,Tests/**/*" + working-directory: ./src + - name: Build run: dotnet build $SOLUTION_NAME --configuration $SOLUTION_CONFIGURATION --no-restore working-directory: ./src - name: Unit Tests - run: dotnet test $SOLUTION_NAME --configuration $SOLUTION_CONFIGURATION --no-build --verbosity normal --logger html --results-directory TestResults --filter Category!=Integration + run: dotnet test $SOLUTION_NAME --configuration $SOLUTION_CONFIGURATION --no-build --verbosity normal --logger html --results-directory TestResults --collect:"XPlat Code Coverage;Format=opencover" --filter Category!=Integration working-directory: ./src + # - name: Dump GitHub context # env: # GITHUB_CONTEXT: ${{ toJson(github) }} # run: echo "$GITHUB_CONTEXT" # Run integration tests against the test infrastructure if secrets are provided + - name: Integrations Tests if: "${{ env.azure_servicebus_key != '' }}" - run: dotnet test $SOLUTION_NAME --configuration $SOLUTION_CONFIGURATION --no-build --verbosity normal --logger html --results-directory TestResults --filter Category=Integration + run: dotnet test $SOLUTION_NAME --configuration $SOLUTION_CONFIGURATION --no-build --verbosity normal --logger html --results-directory TestResults --collect:"XPlat Code Coverage;Format=opencover" --filter Category=Integration working-directory: ./src env: azure_servicebus_key: ${{ secrets.azure_servicebus_key }} @@ -54,6 +79,14 @@ jobs: redis_password: ${{ secrets.redis_password }} mqtt_password: ${{ secrets.mqtt_password }} rabbitmq_password: ${{ secrets.rabbitmq_password }} + + - name: SonarCloud - SonarScanner End + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} # Needed to get PR information, if any + SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }} + run: ../.sonar/scanner/dotnet-sonarscanner end /d:sonar.token="${{ secrets.SONAR_TOKEN }}" + working-directory: ./src + - name: Upload test results uses: actions/upload-artifact@v3 with: diff --git a/README.md b/README.md index b668c785..cdbd91f2 100644 --- a/README.md +++ b/README.md @@ -5,9 +5,14 @@ SlimMessageBus is a client façade for message brokers for .NET. It comes with i [![Gitter](https://badges.gitter.im/SlimMessageBus/community.svg)](https://gitter.im/SlimMessageBus/community?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge) [![GitHub license](https://img.shields.io/github/license/zarusz/SlimMessageBus)](https://github.com/zarusz/SlimMessageBus/blob/master/LICENSE) [![Build](https://github.com/zarusz/SlimMessageBus/actions/workflows/build.yml/badge.svg?branch=master)](https://github.com/zarusz/SlimMessageBus/actions/workflows/build.yml) +[![Maintainability Rating](https://sonarcloud.io/api/project_badges/measure?project=zarusz_SlimMessageBus&metric=sqale_rating)](https://sonarcloud.io/summary/new_code?id=zarusz_SlimMessageBus) +[![Coverage](https://sonarcloud.io/api/project_badges/measure?project=zarusz_SlimMessageBus&metric=coverage)](https://sonarcloud.io/summary/new_code?id=zarusz_SlimMessageBus) +[![Duplicated Lines (%)](https://sonarcloud.io/api/project_badges/measure?project=zarusz_SlimMessageBus&metric=duplicated_lines_density)](https://sonarcloud.io/summary/new_code?id=zarusz_SlimMessageBus) +[![Vulnerabilities](https://sonarcloud.io/api/project_badges/measure?project=zarusz_SlimMessageBus&metric=vulnerabilities)](https://sonarcloud.io/summary/new_code?id=zarusz_SlimMessageBus) +[![Quality Gate Status](https://sonarcloud.io/api/project_badges/measure?project=zarusz_SlimMessageBus&metric=alert_status)](https://sonarcloud.io/summary/new_code?id=zarusz_SlimMessageBus) -> The v2.0.0 major release is available. -> Please review the [release notes](https://github.com/zarusz/SlimMessageBus/releases/tag/Host.Transport-2.0.0). +> The 2.x.x major release is available. +> Please review the [migration guide](https://github.com/zarusz/SlimMessageBus/releases/tag/Host.Transport-2.0.0). - [Key elements of SlimMessageBus](#key-elements-of-slimmessagebus) - [Docs](#docs) @@ -148,7 +153,7 @@ services.AddSlimMessageBus(mbb => { mbb // First child bus - in this example Kafka transport - .AddChildBus("Bus1", (builder) => + .AddChildBus("Bus1", (builder) => { builder .Produce(x => x.DefaultTopic("some-topic")) @@ -171,7 +176,7 @@ services.AddSlimMessageBus(mbb => // Use in-memory transport provider //.WithProviderMemory(cfg => { ... }) // requires SlimMessageBus.Host.Memory package }) - + // Add other bus transports (as child bus), if needed //.AddChildBus("Bus2", (builder) => { }) @@ -264,7 +269,7 @@ The `SlimMessageBus` configuration for the in-memory provider looks like this: //IServiceCollection services; // Cofigure the message bus -services.AddSlimMessageBus(mbb => +services.AddSlimMessageBus(mbb => { mbb.WithProviderMemory(); // Find types that implement IConsumer and IRequestHandler and declare producers and consumers on the mbb @@ -277,7 +282,7 @@ services.AddSlimMessageBus(mbb => For the ASP.NET project, set up the `MessageBus.Current` helper (if you want to use it, and pick up the current web-request scope): ```cs -services.AddSlimMessageBus(mbb => +services.AddSlimMessageBus(mbb => { // ... mbb.AddAspNet(); // requires SlimMessageBus.Host.AspNetCore package @@ -356,7 +361,7 @@ Run all tests: dotnet test ``` -Run all tests except integration tests that require local/cloud infrastructure: +Run all tests except integration tests that require local/cloud infrastructure: ```cmd dotnet test --filter Category!=Integration diff --git a/src/SlimMessageBus.Host.AzureEventHub/Consumer/EhGroupConsumer.cs b/src/SlimMessageBus.Host.AzureEventHub/Consumer/EhGroupConsumer.cs index 33ad56fc..c3c1b0ab 100644 --- a/src/SlimMessageBus.Host.AzureEventHub/Consumer/EhGroupConsumer.cs +++ b/src/SlimMessageBus.Host.AzureEventHub/Consumer/EhGroupConsumer.cs @@ -17,7 +17,7 @@ public EhGroupConsumer(EventHubMessageBus messageBus, GroupPath groupPath, Func< _groupPath = groupPath ?? throw new ArgumentNullException(nameof(groupPath)); if (partitionConsumerFactory == null) throw new ArgumentNullException(nameof(partitionConsumerFactory)); - MessageBus = messageBus ?? throw new ArgumentNullException(nameof(messageBus)); + MessageBus = messageBus; _partitionConsumerByPartitionId = new SafeDictionaryWrapper(partitionId => { diff --git a/src/SlimMessageBus.Host.AzureEventHub/Consumer/EhPartitionConsumer.cs b/src/SlimMessageBus.Host.AzureEventHub/Consumer/EhPartitionConsumer.cs index 6ca40cd5..76a42f39 100644 --- a/src/SlimMessageBus.Host.AzureEventHub/Consumer/EhPartitionConsumer.cs +++ b/src/SlimMessageBus.Host.AzureEventHub/Consumer/EhPartitionConsumer.cs @@ -18,9 +18,9 @@ public abstract class EhPartitionConsumer protected EhPartitionConsumer(EventHubMessageBus messageBus, GroupPathPartitionId groupPathPartition) { - _logger = messageBus.LoggerFactory.CreateLogger(); MessageBus = messageBus ?? throw new ArgumentNullException(nameof(messageBus)); GroupPathPartition = groupPathPartition ?? throw new ArgumentNullException(nameof(groupPathPartition)); + _logger = messageBus.LoggerFactory.CreateLogger(); } public Task OpenAsync() diff --git a/src/SlimMessageBus.Host.AzureEventHub/EventHubMessageBus.cs b/src/SlimMessageBus.Host.AzureEventHub/EventHubMessageBus.cs index 53915dc0..4a5345bb 100644 --- a/src/SlimMessageBus.Host.AzureEventHub/EventHubMessageBus.cs +++ b/src/SlimMessageBus.Host.AzureEventHub/EventHubMessageBus.cs @@ -46,8 +46,8 @@ protected override void Build() _logger.LogDebug(e, "Error creating EventHubClient for path {Path}", path); throw; } - }); - } + }); + } protected override async Task CreateConsumers() { @@ -102,7 +102,7 @@ protected override async Task OnStart() } } - protected override async Task ProduceToTransport(object message, string path, byte[] messagePayload, IDictionary messageHeaders, CancellationToken cancellationToken) + protected override async Task ProduceToTransport(object message, string path, byte[] messagePayload, IDictionary messageHeaders = null, CancellationToken cancellationToken = default) { AssertActive(); diff --git a/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusMessageBus.cs b/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusMessageBus.cs index 57849b32..796bd7e0 100644 --- a/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusMessageBus.cs +++ b/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusMessageBus.cs @@ -114,7 +114,7 @@ protected override async ValueTask DisposeAsyncCore() } } - protected override async Task ProduceToTransport(object message, string path, byte[] messagePayload, IDictionary messageHeaders, CancellationToken cancellationToken) + protected override async Task ProduceToTransport(object message, string path, byte[] messagePayload, IDictionary messageHeaders = null, CancellationToken cancellationToken = default) { var messageType = message?.GetType(); @@ -164,11 +164,11 @@ protected override async Task ProduceToTransport(object message, string path, by } } - public override Task ProduceRequest(object request, IDictionary headers, string path, ProducerSettings producerSettings) + public override Task ProduceRequest(object request, IDictionary requestHeaders, string path, ProducerSettings producerSettings) { - if (headers is null) throw new ArgumentNullException(nameof(headers)); + if (requestHeaders is null) throw new ArgumentNullException(nameof(requestHeaders)); - return base.ProduceRequest(request, headers, path, producerSettings); + return base.ProduceRequest(request, requestHeaders, path, producerSettings); } #endregion diff --git a/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusTopologyService.cs b/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusTopologyService.cs index 9073371c..1fde5746 100644 --- a/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusTopologyService.cs +++ b/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusTopologyService.cs @@ -20,9 +20,10 @@ public ServiceBusTopologyService(ILogger logger, Mess [Flags] private enum TopologyCreationStatus { - NotExists = 0, - Exists = 1, - Created = 2 + None = 0, + NotExists = 1, + Exists = 2, + Created = 4 } private async Task SwallowExceptionIfEntityExists(Func> task) @@ -206,7 +207,7 @@ await TryCreateQueue(path, topologyProvisioning.CanConsumerCreateQueue, options removeRuleTasks .AddRange(rulesPage.Values .Where(rule => !filters.Any(filter => filter.Name == rule.Name)) - .Select(rule => SwallowExceptionIfMessagingEntityNotFound(() => + .Select(rule => SwallowExceptionIfMessagingEntityNotFound(() => adminClient.DeleteRuleAsync(path, subscriptionName, rule.Name))) ); } diff --git a/src/SlimMessageBus.Host.FluentValidation/HandlerValidationInterceptor.cs b/src/SlimMessageBus.Host.FluentValidation/HandlerValidationInterceptor.cs index 9cee837d..0decc80b 100644 --- a/src/SlimMessageBus.Host.FluentValidation/HandlerValidationInterceptor.cs +++ b/src/SlimMessageBus.Host.FluentValidation/HandlerValidationInterceptor.cs @@ -12,9 +12,9 @@ public HandlerValidationInterceptor(IEnumerable> validators, IVali { } - public async Task OnHandle(T message, Func> next, IConsumerContext context) + public async Task OnHandle(T request, Func> next, IConsumerContext context) { - await OnValidate(message, context.CancellationToken).ConfigureAwait(false); + await OnValidate(request, context.CancellationToken).ConfigureAwait(false); return await next().ConfigureAwait(false); } } \ No newline at end of file diff --git a/src/SlimMessageBus.Host.Kafka/Consumer/KafkaGroupConsumer.cs b/src/SlimMessageBus.Host.Kafka/Consumer/KafkaGroupConsumer.cs index 9549a81a..19f6dba9 100644 --- a/src/SlimMessageBus.Host.Kafka/Consumer/KafkaGroupConsumer.cs +++ b/src/SlimMessageBus.Host.Kafka/Consumer/KafkaGroupConsumer.cs @@ -20,7 +20,7 @@ public class KafkaGroupConsumer : AbstractConsumer, IKafkaCommitController public KafkaGroupConsumer(KafkaMessageBus messageBus, string group, IReadOnlyCollection topics, Func processorFactory) : base(messageBus.LoggerFactory.CreateLogger()) { - MessageBus = messageBus ?? throw new ArgumentNullException(nameof(messageBus)); + MessageBus = messageBus; Group = group ?? throw new ArgumentNullException(nameof(group)); Topics = topics ?? throw new ArgumentNullException(nameof(topics)); diff --git a/src/SlimMessageBus.Host.Kafka/KafkaMessageBus.cs b/src/SlimMessageBus.Host.Kafka/KafkaMessageBus.cs index 95b8a853..12665e01 100644 --- a/src/SlimMessageBus.Host.Kafka/KafkaMessageBus.cs +++ b/src/SlimMessageBus.Host.Kafka/KafkaMessageBus.cs @@ -116,7 +116,7 @@ protected override async ValueTask DisposeAsyncCore() } } - protected override async Task ProduceToTransport(object message, string path, byte[] messagePayload, IDictionary messageHeaders, CancellationToken cancellationToken) + protected override async Task ProduceToTransport(object message, string path, byte[] messagePayload, IDictionary messageHeaders = null, CancellationToken cancellationToken = default) { AssertActive(); diff --git a/src/SlimMessageBus.Host.Outbox.Sql/SqlOutboxRepository.cs b/src/SlimMessageBus.Host.Outbox.Sql/SqlOutboxRepository.cs index 9d9323ef..a10ffb8e 100644 --- a/src/SlimMessageBus.Host.Outbox.Sql/SqlOutboxRepository.cs +++ b/src/SlimMessageBus.Host.Outbox.Sql/SqlOutboxRepository.cs @@ -342,13 +342,13 @@ protected async virtual ValueTask DisposeAsyncCore() } } - public async Task DeleteSent(DateTime timestampBefore, CancellationToken token) + public async Task DeleteSent(DateTime olderThan, CancellationToken token) { await EnsureConnection(); var affected = await ExecuteNonQuery(token, Settings.OperationRetry, _sqlTemplate.SqlOutboxMessageDeleteSent, cmd => { - cmd.Parameters.Add("@Timestamp", SqlDbType.DateTime2).Value = timestampBefore; + cmd.Parameters.Add("@Timestamp", SqlDbType.DateTime2).Value = olderThan; }); _logger.Log(affected > 0 ? LogLevel.Information : LogLevel.Debug, "Removed {MessageCount} sent messages from outbox table", affected); diff --git a/src/SlimMessageBus.Host.RabbitMQ/RabbitMqMessageBus.cs b/src/SlimMessageBus.Host.RabbitMQ/RabbitMqMessageBus.cs index 90c90277..0bc899f5 100644 --- a/src/SlimMessageBus.Host.RabbitMQ/RabbitMqMessageBus.cs +++ b/src/SlimMessageBus.Host.RabbitMQ/RabbitMqMessageBus.cs @@ -5,8 +5,8 @@ public class RabbitMqMessageBus : MessageBusBase, IR private readonly ILogger _logger; private IConnection _connection; - private IModel _channel; private readonly object _channelLock = new(); + private IModel _channel; #region IRabbitMqChannel @@ -38,22 +38,22 @@ protected override async Task CreateConsumers() foreach (var (queueName, consumers) in Settings.Consumers.GroupBy(x => x.GetQueueName()).ToDictionary(x => x.Key, x => x.ToList())) { - AddConsumer(new RabbitMqConsumer(LoggerFactory, - channel: this, - queueName: queueName, - consumers, - Serializer, - messageBus: this, + AddConsumer(new RabbitMqConsumer(LoggerFactory, + channel: this, + queueName: queueName, + consumers, + Serializer, + messageBus: this, ProviderSettings.HeaderValueConverter)); } if (Settings.RequestResponse != null) { - AddConsumer(new RabbitMqResponseConsumer(LoggerFactory, - channel: this, - queueName: Settings.RequestResponse.GetQueueName(), - Settings.RequestResponse, - this, + AddConsumer(new RabbitMqResponseConsumer(LoggerFactory, + channel: this, + queueName: Settings.RequestResponse.GetQueueName(), + Settings.RequestResponse, + this, ProviderSettings.HeaderValueConverter)); } } @@ -83,28 +83,32 @@ private async Task CreateConnection() lock (_channelLock) { - _channel.CloseAndDispose(); - _channel = _connection.CreateModel(); - - var topologyService = new RabbitMqTopologyService(LoggerFactory, _channel, Settings, ProviderSettings); + _channel?.CloseAndDispose(); - var customAction = ProviderSettings.GetOrDefault(RabbitMqProperties.TopologyInitializer); - if (customAction != null) + if (_connection != null) { - // Allow the user to specify its own initializer - customAction(_channel, () => topologyService.ProvisionTopology()); - } - else - { - // Perform default topology setup - topologyService.ProvisionTopology(); + _channel = _connection.CreateModel(); + + var topologyService = new RabbitMqTopologyService(LoggerFactory, _channel, Settings, ProviderSettings); + + var customAction = ProviderSettings.GetOrDefault(RabbitMqProperties.TopologyInitializer); + if (customAction != null) + { + // Allow the user to specify its own initializer + customAction(_channel, () => topologyService.ProvisionTopology()); + } + else + { + // Perform default topology setup + topologyService.ProvisionTopology(); + } } } } catch (Exception e) { _logger.LogError(e, "Could not initialize RabbitMQ connection: {ErrorMessage}", e.Message); - } + } } protected override async ValueTask DisposeAsyncCore() diff --git a/src/SlimMessageBus.Host.Redis/RedisMessageBus.cs b/src/SlimMessageBus.Host.Redis/RedisMessageBus.cs index 7d5b574b..507155a2 100644 --- a/src/SlimMessageBus.Host.Redis/RedisMessageBus.cs +++ b/src/SlimMessageBus.Host.Redis/RedisMessageBus.cs @@ -154,7 +154,7 @@ void AddTopicConsumer(string topic, ISubscriber subscriber, IMessageProcessor messageHeaders, CancellationToken cancellationToken) + protected override Task ProduceToTransport(object message, string path, byte[] messagePayload, IDictionary messageHeaders = null, CancellationToken cancellationToken = default) { var messageType = message.GetType(); diff --git a/src/SlimMessageBus.Host/Collections/SafeDictionaryWrapper.cs b/src/SlimMessageBus.Host/Collections/SafeDictionaryWrapper.cs index 385d4e83..cacd03a6 100644 --- a/src/SlimMessageBus.Host/Collections/SafeDictionaryWrapper.cs +++ b/src/SlimMessageBus.Host/Collections/SafeDictionaryWrapper.cs @@ -9,9 +9,11 @@ /// /// public class SafeDictionaryWrapper : IReadOnlyCache -{ +{ + private readonly object _lock = new(); private readonly IDictionary _mutableDict; private ReadOnlyDictionary _readonlyDict; + private readonly Func _valueFactory; /// /// Provides read only snapshot of the mutable internal dictionary @@ -22,7 +24,7 @@ public IReadOnlyDictionary Dictionary { if (_readonlyDict == null) { - lock (this) + lock (_lock) { if (_readonlyDict == null) { @@ -35,7 +37,6 @@ public IReadOnlyDictionary Dictionary } } - private readonly Func _valueFactory; public SafeDictionaryWrapper() : this(null) @@ -59,7 +60,7 @@ public TValue GetOrAdd(TKey key, Func factory) // check if we have the value already for the key if (!_mutableDict.TryGetValue(key, out var value)) { - lock (this) + lock (_lock) { // double check if another thread did create it in meantime (before lock) if (!_mutableDict.TryGetValue(key, out value)) @@ -83,7 +84,7 @@ public TValue GetOrAdd(TKey key) public void Set(TKey key, TValue value) { - lock (this) + lock (_lock) { // allocate a new dictonary to avoid mutation while reading in another thread _mutableDict[key] = value; @@ -93,7 +94,7 @@ public void Set(TKey key, TValue value) public void Mutate(Action> action) { - lock (this) + lock (_lock) { action(_mutableDict); OnChanged(); @@ -102,7 +103,7 @@ public void Mutate(Action> action) public void Clear(Action action = null) { - lock (this) + lock (_lock) { if (action != null) { @@ -115,7 +116,7 @@ public void Clear(Action action = null) public IReadOnlyCollection ClearAndSnapshot() { - lock (this) + lock (_lock) { var snapshot = Dictionary.Values.ToList(); _mutableDict.Clear(); @@ -128,7 +129,7 @@ public IReadOnlyCollection ClearAndSnapshot() public void ForEach(Action action) { - lock (this) + lock (_lock) { foreach (var entry in _mutableDict) { @@ -139,7 +140,7 @@ public void ForEach(Action action) public void ForEach(Action action) { - lock (this) + lock (_lock) { foreach (var value in _mutableDict.Values) { diff --git a/src/SlimMessageBus.Host/Consumer/InterceptorPipelines/ConsumerInterceptorPipeline.cs b/src/SlimMessageBus.Host/Consumer/InterceptorPipelines/ConsumerInterceptorPipeline.cs index ff2e62f4..bf322b79 100644 --- a/src/SlimMessageBus.Host/Consumer/InterceptorPipelines/ConsumerInterceptorPipeline.cs +++ b/src/SlimMessageBus.Host/Consumer/InterceptorPipelines/ConsumerInterceptorPipeline.cs @@ -12,12 +12,10 @@ internal class ConsumerInterceptorPipeline private readonly IMessageTypeConsumerInvokerSettings _consumerInvoker; private readonly Type _responseType; - private readonly IEnumerable _consumerInterceptors; private readonly Func>, IConsumerContext, Task> _consumerInterceptorFunc; private IEnumerator _consumerInterceptorsEnumerator; private bool _consumerInterceptorsVisited; - private readonly IEnumerable _handlerInterceptors; private readonly Func _handlerInterceptorFunc; private IEnumerator _handlerInterceptorsEnumerator; private bool _handlerInterceptorsVisited; @@ -34,12 +32,10 @@ public ConsumerInterceptorPipeline(RuntimeTypeCache runtimeTypeCache, IMessageHa _consumerInvoker = consumerInvoker; _responseType = responseType; - _consumerInterceptors = consumerInterceptors; _consumerInterceptorFunc = runtimeTypeCache.ConsumerInterceptorType[message.GetType()]; _consumerInterceptorsVisited = consumerInterceptors is null; _consumerInterceptorsEnumerator = consumerInterceptors?.GetEnumerator(); - _handlerInterceptors = handlerInterceptors; _handlerInterceptorFunc = responseType != null ? runtimeTypeCache.HandlerInterceptorType[(message.GetType(), responseType)] : null; _handlerInterceptorsVisited = handlerInterceptors is null; _handlerInterceptorsEnumerator = handlerInterceptors?.GetEnumerator(); diff --git a/src/SlimMessageBus.Host/Consumer/MessageProcessors/ConcurrencyIncreasingMessageProcessorDecorator.cs b/src/SlimMessageBus.Host/Consumer/MessageProcessors/ConcurrencyIncreasingMessageProcessorDecorator.cs index bfc73ec3..1f1e8e6e 100644 --- a/src/SlimMessageBus.Host/Consumer/MessageProcessors/ConcurrencyIncreasingMessageProcessorDecorator.cs +++ b/src/SlimMessageBus.Host/Consumer/MessageProcessors/ConcurrencyIncreasingMessageProcessorDecorator.cs @@ -16,7 +16,7 @@ public sealed class ConcurrencyIncreasingMessageProcessorDecorator : I private readonly object _lastExceptionLock = new(); private int _pendingCount; - + public int PendingCount => _pendingCount; public IReadOnlyCollection ConsumerSettings => _target.ConsumerSettings; @@ -43,7 +43,7 @@ public void Dispose() #endregion - public async Task<(Exception Exception, AbstractConsumerSettings ConsumerSettings, object Response, object Message)> ProcessMessage(TMessage message, IReadOnlyDictionary messageHeaders, CancellationToken cancellationToken, IServiceProvider currentServiceProvider = null) + public async Task<(Exception Exception, AbstractConsumerSettings ConsumerSettings, object Response, object Message)> ProcessMessage(TMessage transportMessage, IReadOnlyDictionary messageHeaders, CancellationToken cancellationToken, IServiceProvider currentServiceProvider = null) { // Ensure only desired number of messages are being processed concurrently await _concurrentSemaphore.WaitAsync().ConfigureAwait(false); @@ -60,7 +60,7 @@ public void Dispose() Interlocked.Increment(ref _pendingCount); // Fire and forget - _ = ProcessInBackground(message, messageHeaders, currentServiceProvider, cancellationToken); + _ = ProcessInBackground(transportMessage, messageHeaders, currentServiceProvider, cancellationToken); // Not exception - we don't know yet return (null, null, null, null); diff --git a/src/SlimMessageBus.Host/Consumer/MessageProcessors/ResponseMessageProcessor.cs b/src/SlimMessageBus.Host/Consumer/MessageProcessors/ResponseMessageProcessor.cs index 2864b6fc..9c2157f5 100644 --- a/src/SlimMessageBus.Host/Consumer/MessageProcessors/ResponseMessageProcessor.cs +++ b/src/SlimMessageBus.Host/Consumer/MessageProcessors/ResponseMessageProcessor.cs @@ -15,13 +15,10 @@ public class ResponseMessageProcessor : IMessageProcessor public ResponseMessageProcessor(ILoggerFactory loggerFactory, RequestResponseSettings requestResponseSettings, IResponseConsumer responseConsumer, MessagePayloadProvider messagePayloadProvider) { if (loggerFactory is null) throw new ArgumentNullException(nameof(loggerFactory)); - if (requestResponseSettings is null) throw new ArgumentNullException(nameof(requestResponseSettings)); - if (responseConsumer is null) throw new ArgumentNullException(nameof(responseConsumer)); - if (messagePayloadProvider is null) throw new ArgumentNullException(nameof(messagePayloadProvider)); _logger = loggerFactory.CreateLogger>(); _requestResponseSettings = requestResponseSettings ?? throw new ArgumentNullException(nameof(requestResponseSettings)); - _responseConsumer = responseConsumer; + _responseConsumer = responseConsumer ?? throw new ArgumentNullException(nameof(responseConsumer)); _consumerSettings = new List { _requestResponseSettings }; _messagePayloadProvider = messagePayloadProvider ?? throw new ArgumentNullException(nameof(messagePayloadProvider)); } diff --git a/src/SlimMessageBus.Host/MessageTypeResolver/AssemblyQualifiedNameMessageTypeResolver.cs b/src/SlimMessageBus.Host/MessageTypeResolver/AssemblyQualifiedNameMessageTypeResolver.cs index d1710581..8e24ec83 100644 --- a/src/SlimMessageBus.Host/MessageTypeResolver/AssemblyQualifiedNameMessageTypeResolver.cs +++ b/src/SlimMessageBus.Host/MessageTypeResolver/AssemblyQualifiedNameMessageTypeResolver.cs @@ -1,14 +1,15 @@ namespace SlimMessageBus.Host; -using SlimMessageBus.Host.Collections; using System.Text.RegularExpressions; +using SlimMessageBus.Host.Collections; + /// /// that uses the for the message type string passed in the message header. /// public class AssemblyQualifiedNameMessageTypeResolver : IMessageTypeResolver { - private static readonly Regex RedundantAssemblyTokens = new(@"\, (Version|Culture|PublicKeyToken)\=([\w\d.]+)"); + private static readonly Regex RedundantAssemblyTokens = new(@"\, (Version|Culture|PublicKeyToken)\=([\w\d.]+)", RegexOptions.None, TimeSpan.FromSeconds(2)); /// /// Determines wheather to emit the Version, Culture and PublicKeyToken along with the Assembly name (for strong assembly naming). diff --git a/src/Tools/SecretStore/SecretService.cs b/src/Tools/SecretStore/SecretService.cs index 2887264b..b7887fb6 100644 --- a/src/Tools/SecretStore/SecretService.cs +++ b/src/Tools/SecretStore/SecretService.cs @@ -4,7 +4,7 @@ public class SecretService { - private readonly Regex _placeholder = new(@"\{\{(\w+)\}\}"); + private readonly Regex _placeholder = new(@"\{\{(\w+)\}\}", RegexOptions.None, TimeSpan.FromSeconds(2)); private readonly ISecretStore _secretStore; public SecretService(ISecretStore secretStore) => _secretStore = secretStore;