Skip to content

Commit

Permalink
Remove hooks (use interceptors instead)
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Maruszak <maruszaktomasz@gmail.com>
  • Loading branch information
zarusz committed Apr 15, 2023
1 parent b641029 commit 1df1499
Show file tree
Hide file tree
Showing 50 changed files with 618 additions and 1,182 deletions.
Binary file added docs/images/interceptors.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
89 changes: 3 additions & 86 deletions docs/intro.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@
- [Configuration](#configuration)
- [Pub/Sub communication](#pubsub-communication)
- [Producer](#producer)
- [Producer hooks](#producer-hooks)
- [Set message headers](#set-message-headers)
- [Consumer](#consumer)
- [Start or Stop message consumption](#start-or-stop-message-consumption)
- [Consumer hooks](#consumer-hooks)
- [Consumer context (additional message information)](#consumer-context-additional-message-information)
- [Per-message DI container scope](#per-message-di-container-scope)
- [Hybrid bus and message scope reuse](#hybrid-bus-and-message-scope-reuse)
Expand Down Expand Up @@ -155,38 +153,6 @@ await bus.Publish(msg, "other-topic");

> The transport plugins might introduce additional configuration options. Please check the relevant provider docs. For example, Azure Service Bus, Azure Event Hub and Kafka allow setting the partitioning key for a given message type.
#### Producer hooks

> The [Interceptors](#interceptors) is a newer approach that should be used instead of the hooks.
When you need to intercept a message that is being published or sent via the bus, you can use the available producer hooks:

```cs
mbb
.Produce<SomeMessage>(x =>
{
x.DefaultTopic(someMessageTopic);
x.AttachEvents(events =>
{
// Invoke the action for the specified message type published/sent via the bus:
events.OnMessageProduced = (bus, producerSettings, message, path) => {
Console.WriteLine("The SomeMessage: {0} was sent to topic/queue {1}", message, path);
}
});
})
.AttachEvents(events =>
{
// Invoke the action for any message type published/sent via the bus:
events.OnMessageProduced = (bus, producerSettings, message, path) => {
Console.WriteLine("The message: {0} was sent to topic/queue {1}", message, path);
};
});
```

The hook can be applied at the specified producer or the whole bus.

> The user-specified `Action<>` methods need to be thread-safe.
#### Set message headers

> Since version 1.15.0
Expand Down Expand Up @@ -289,57 +255,6 @@ await consumerControl.Stop();

> Since version 1.15.5

#### Consumer hooks

> The [Interceptors](#interceptors) is a newer approach that should be used instead of the hooks.

When you need to intercept a message that is delivered to a consumer, you can use the available consumer hooks:

```cs
mbb
.Consume<SomeMessage>(x =>
{
x.Topic("some-topic");
// This events trigger only for this consumer
x.AttachEvents(events =>
{
// 1. Invoke the action for the specified message type when arrived on the bus (pre consumer OnHandle method):
events.OnMessageArrived = (bus, consumerSettings, message, path, nativeMessage) => {
Console.WriteLine("The SomeMessage: {0} arrived on the topic/queue {1}", message, path);
}

// 2. Invoke the action when the consumer caused an unhandled exception
events.OnMessageFault = (bus, consumerSettings, message, ex, nativeMessage) => {
};

// 3. Invoke the action for the specified message type after consumer processed (post consumer OnHandle method).
// This is executed also if the message handling faulted (2.)
events.OnMessageFinished = (bus, consumerSettings, message, path, nativeMessage) => {
Console.WriteLine("The SomeMessage: {0} finished on the topic/queue {1}", message, path);
}
});
})
// Any consumer events for the bus (a sum of all events across registered consumers)
.AttachEvents(events =>
{
// Invoke the action for the specified message type when sent via the bus:
events.OnMessageArrived = (bus, consumerSettings, message, path, nativeMessage) => {
Console.WriteLine("The message: {0} arrived on the topic/queue {1}", message, path);
};

events.OnMessageFault = (bus, consumerSettings, message, ex, nativeMessage) => {
};

events.OnMessageFinished = (bus, consumerSettings, message, path, nativeMessage) => {
Console.WriteLine("The SomeMessage: {0} finished on the topic/queue {1}", message, path);
}
});
```

The hook can be applied for the specified consumer, or for all consumers in the particular bus instance.

> The user specified `Action<>` methods need to be thread-safe as they will be executed concurrently as messages are being processed.

#### Consumer context (additional message information)

> Changed in version 1.15.0
Expand Down Expand Up @@ -912,6 +827,8 @@ Interceptors allow to tap into the message processing pipeline on both the produ
- prevent a message from being produced or from being consumed or handled,
- perform some additional application specific authorization checks.

![](images/interceptors.jpg)

### Producer Lifecycle

When a message is produced (via the `bus.Publish(message)` or `bus.Send(request)`) the SMB is performing a DI lookup for the interceptor interface types that are relevant given the message type (or request and response types) and execute them in order.
Expand All @@ -936,7 +853,7 @@ public interface ISendInterceptor<in TRequest, TResponse> : IInterceptor
}
```

Remember to register your interceptor types in the DI (either using auto-discovery [`addInterceptorsFromAssembly`](#MsDependencyInjection) or manually).
Remember to register your interceptor types in the DI (either using auto-discovery [`AddServicesFromAssembly()`](#autoregistration-of-consumers-and-interceptors) or manually).

> SMB has an optimization that will remember the types of messages for which the DI resolved interceptor. That allows us to avoid having to perform lookups with the DI and other internal processing.

Expand Down
2 changes: 1 addition & 1 deletion src/Host.Transport.Properties.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<Import Project="Common.Properties.xml" />

<PropertyGroup>
<Version>2.1.0-rc1</Version>
<Version>2.1.0-rc2</Version>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
</PropertyGroup>
</Project>
2 changes: 0 additions & 2 deletions src/SlimMessageBus.Host.AzureEventHub/BuilderExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
namespace SlimMessageBus.Host.AzureEventHub;

using SlimMessageBus.Host;

public static class BuilderExtensions
{
/// <summary>
Expand Down
2 changes: 0 additions & 2 deletions src/SlimMessageBus.Host.AzureEventHub/ConsumerParams.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
namespace SlimMessageBus.Host.AzureEventHub;

using Azure.Storage.Blobs;

public class ConsumerParams : GroupPath
{
public BlobContainerClient CheckpointClient { get; set; }
Expand Down
18 changes: 4 additions & 14 deletions src/SlimMessageBus.Host.AzureEventHub/EventHubMessageBus.cs
Original file line number Diff line number Diff line change
@@ -1,16 +1,9 @@
namespace SlimMessageBus.Host.AzureEventHub;

using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Producer;
using Azure.Storage.Blobs;

using SlimMessageBus.Host;
using SlimMessageBus.Host.Collections;

/// <summary>
/// MessageBus implementation for Azure Event Hub.
/// </summary>
public class EventHubMessageBus : MessageBusBase
public class EventHubMessageBus : MessageBusBase<EventHubMessageBusSettings>
{
private readonly ILogger _logger;
private BlobContainerClient _blobContainerClient;
Expand All @@ -19,13 +12,10 @@ public class EventHubMessageBus : MessageBusBase

protected internal BlobContainerClient BlobContainerClient => _blobContainerClient;

public EventHubMessageBusSettings ProviderSettings { get; }

public EventHubMessageBus(MessageBusSettings settings, EventHubMessageBusSettings eventHubSettings)
: base(settings)
public EventHubMessageBus(MessageBusSettings settings, EventHubMessageBusSettings providerSettings)
: base(settings, providerSettings)
{
_logger = LoggerFactory.CreateLogger<EventHubMessageBus>();
ProviderSettings = eventHubSettings;
_logger = LoggerFactory.CreateLogger<EventHubMessageBus>();

OnBuildProvider();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
namespace SlimMessageBus.Host.AzureEventHub;

using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Producer;
using Azure.Storage.Blobs;

public class EventHubMessageBusSettings
{
/// <summary>
Expand Down
5 changes: 5 additions & 0 deletions src/SlimMessageBus.Host.AzureEventHub/GlobalUsings.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
global using Microsoft.Extensions.Logging;

global using SlimMessageBus.Host;
global using SlimMessageBus.Host.Collections;

global using Azure.Messaging.EventHubs;
global using Azure.Messaging.EventHubs.Producer;
global using Azure.Storage.Blobs;
Original file line number Diff line number Diff line change
Expand Up @@ -190,22 +190,11 @@ protected async Task ProcessMessageAsyncInternal(ServiceBusReceivedMessage messa
return;
}

var (exception, consumerSettings, response) = await MessageProcessor.ProcessMessage(message, message.ApplicationProperties, token).ConfigureAwait(false);
var (exception, _, _) = await MessageProcessor.ProcessMessage(message, message.ApplicationProperties, token).ConfigureAwait(false);
if (exception != null)
{
_logger.LogError(exception, "Abandon message (exception occured while processing) - Path: {Path}, SubscriptionName: {SubscriptionName}, SequenceNumber: {SequenceNumber}, DeliveryCount: {DeliveryCount}, MessageId: {MessageId}", TopicSubscription.Path, TopicSubscription.SubscriptionName, message.SequenceNumber, message.DeliveryCount, message.MessageId);

try
{
// Execute the event hook
consumerSettings?.OnMessageFault?.Invoke(MessageBus, consumerSettings, null, exception, message);
MessageBus.Settings.OnMessageFault?.Invoke(MessageBus, consumerSettings, null, exception, message);
}
catch (Exception eh)
{
MessageBusBase.HookFailed(_logger, eh, nameof(IConsumerEvents.OnMessageFault));
}

var messageProperties = new Dictionary<string, object>
{
// Set the exception message
Expand All @@ -224,17 +213,7 @@ protected async Task ProcessMessageAsyncInternal(ServiceBusReceivedMessage messa

protected Task ProcessErrorAsyncInternal(Exception exception, ServiceBusErrorSource errorSource)
{
try
{
_logger.LogError(exception, "Error while processing Path: {Path}, SubscriptionName: {SubscriptionName}, Error Message: {ErrorMessage}, Error Source: {ErrorSource}", TopicSubscription.Path, TopicSubscription.SubscriptionName, exception.Message, errorSource);

// Execute the event hook
MessageBus.Settings.OnMessageFault?.Invoke(MessageBus, null, null, exception, null);
}
catch (Exception eh)
{
MessageBusBase.HookFailed(_logger, eh, nameof(IConsumerEvents.OnMessageFault));
}
_logger.LogError(exception, "Error while processing Path: {Path}, SubscriptionName: {SubscriptionName}, Error Message: {ErrorMessage}, Error Source: {ErrorSource}", TopicSubscription.Path, TopicSubscription.SubscriptionName, exception.Message, errorSource);
return Task.CompletedTask;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,20 @@
using SlimMessageBus.Host.AzureServiceBus.Consumer;
using SlimMessageBus.Host.Collections;

public class ServiceBusMessageBus : MessageBusBase
public class ServiceBusMessageBus : MessageBusBase<ServiceBusMessageBusSettings>
{
private readonly ILogger _logger;
private readonly List<AsbBaseConsumer> _consumers = new();

public ServiceBusMessageBusSettings ProviderSettings { get; }

private ServiceBusClient _client;
private SafeDictionaryWrapper<string, ServiceBusSender> _producerByPath;

private Task _provisionTopologyTask = null;

public ServiceBusMessageBus(MessageBusSettings settings, ServiceBusMessageBusSettings providerSettings)
: base(settings)
: base(settings, providerSettings)
{
_logger = LoggerFactory.CreateLogger<ServiceBusMessageBus>();
ProviderSettings = providerSettings ?? throw new ArgumentNullException(nameof(providerSettings));

OnBuildProvider();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,6 @@ protected AbstractConsumerBuilder(MessageBusSettings settings, Type messageType,
Settings.Consumers.Add(ConsumerSettings);
}

[Obsolete("Please use the interceptors https://github.com/zarusz/SlimMessageBus/blob/master/docs/intro.md#interceptors")]
public TBuilder AttachEvents<TBuilder>(Action<IConsumerEvents> eventsConfig)
where TBuilder : AbstractConsumerBuilder
{
if (eventsConfig == null) throw new ArgumentNullException(nameof(eventsConfig));

eventsConfig(ConsumerSettings);
return (TBuilder)this;
}

public T Do<T>(Action<T> builder) where T : AbstractConsumerBuilder
{
if (builder == null) throw new ArgumentNullException(nameof(builder));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,15 +154,6 @@ public ConsumerBuilder<T> Instances(int numberOfInstances)
return this;
}

/// <summary>
/// Adds custom hooks for the consumer.
/// </summary>
/// <param name="eventsConfig"></param>
/// <returns></returns>
[Obsolete("Please use the interceptors https://github.com/zarusz/SlimMessageBus/blob/master/docs/intro.md#interceptors")]
public ConsumerBuilder<T> AttachEvents(Action<IConsumerEvents> eventsConfig)
=> AttachEvents<ConsumerBuilder<T>>(eventsConfig);

/// <summary>
/// Enable (or disable) creation of DI child scope for each meesage.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,6 @@ public THandlerBuilder Instances(int numberOfInstances)
return TypedThis;
}

/// <summary>
/// Adds custom hooks for the handler.
/// </summary>
/// <param name="eventsConfig"></param>
/// <returns></returns>
[Obsolete("Please use the interceptors https://github.com/zarusz/SlimMessageBus/blob/master/docs/intro.md#interceptors")]
public THandlerBuilder AttachEvents(Action<IConsumerEvents> eventsConfig) =>
AttachEvents<THandlerBuilder>(eventsConfig);

public THandlerBuilder Do(Action<THandlerBuilder> action) =>
base.Do(action);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,30 +244,6 @@ public MessageBusBuilder Do(Action<MessageBusBuilder> builder)
return this;
}

public MessageBusBuilder AttachEvents(Action<IProducerEvents> eventsConfig)
{
if (eventsConfig == null) throw new ArgumentNullException(nameof(eventsConfig));

eventsConfig(Settings);
return this;
}

public MessageBusBuilder AttachEvents(Action<IConsumerEvents> eventsConfig)
{
if (eventsConfig == null) throw new ArgumentNullException(nameof(eventsConfig));

eventsConfig(Settings);
return this;
}

public MessageBusBuilder AttachEvents(Action<IBusEvents> eventsConfig)
{
if (eventsConfig == null) throw new ArgumentNullException(nameof(eventsConfig));

eventsConfig(Settings);
return this;
}

/// <summary>
/// Sets the default enable (or disable) creation of DI child scope for each meesage.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,6 @@ public ProducerBuilder<T> DefaultTimeout(TimeSpan timeout)
return this;
}

public ProducerBuilder<T> AttachEvents(Action<IProducerEvents> eventsConfig)
{
if (eventsConfig == null) throw new ArgumentNullException(nameof(eventsConfig));

eventsConfig(Settings);
return this;
}

/// <summary>
/// Hook called whenver message is being produced. Can be used to add (or mutate) message headers.
/// </summary>
Expand Down
Loading

0 comments on commit 1df1499

Please sign in to comment.