Skip to content

Commit

Permalink
[Host.Memory] Non-blocking Publish option
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Maruszak <maruszaktomasz@gmail.com>
  • Loading branch information
zarusz committed Mar 23, 2024
1 parent 3bb56eb commit aa57a32
Show file tree
Hide file tree
Showing 59 changed files with 2,326 additions and 333 deletions.
106 changes: 73 additions & 33 deletions docs/intro.md

Large diffs are not rendered by default.

1,040 changes: 1,040 additions & 0 deletions docs/intro.t.md

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions docs/plugin_asyncapi.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ services.AddSlimMessageBus(mbb =>
Then register the Saunter services (in that order):

```cs
// Add Saunter to the application services.
// Add Saunter to the application services.
builder.Services.AddAsyncApiSchemaGeneration(options =>
{
options.AsyncApi = new AsyncApiDocument
Expand Down Expand Up @@ -90,4 +90,4 @@ Ensure that your project has the `GenerateDocumentationFile` enabled (more [here
<PropertyGroup>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
</PropertyGroup>
```
```
118 changes: 92 additions & 26 deletions docs/provider_memory.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ Please read the [Introduction](intro.md) before reading this provider documentat
- [Auto Declaration](#auto-declaration)
- [Polymorphic message support](#polymorphic-message-support)
- [Lifecycle](#lifecycle)
- [Concurrency and Ordering](#concurrency-and-ordering)
- [Blocking Publish](#blocking-publish)
- [Asynchronous Publish](#asynchronous-publish)
- [Error Handling](#error-handling)
- [Per-Message DI scope](#per-message-di-scope)
- [Benchmarks](#benchmarks)

## Introduction

The Memory transport provider can be used for internal communication within the same process. It is the simplest transport provider and does not require any external messaging infrastructure.
Expand All @@ -24,13 +26,16 @@ Good use case for in memory communication is:

- to integrate the domain layer with other application layers via domain events pattern,
- to implement mediator pattern (when combined with [interceptors](intro.md#interceptors)),
- to run unit tests against application code that normally runs with an out of process transport provider (Kafka, Azure Service Bus, etc).
- to run unit tests against application code that normally runs with an out of process transport provider (Kafka, Azure Service Bus, etc),
- to start simple messaging without having to provision messaging infrastructure, but when time comes reconfigure SMB to leverage messaging infrastructure.

## Configuration

The memory transport is configured using the `.WithProviderMemory()`:

```cs
using SlimMessageBus.Host.Memory;

services.AddSlimMessageBus(mbb =>
{
// Bus configuration happens here (...)
Expand All @@ -40,7 +45,7 @@ services.AddSlimMessageBus(mbb =>

### Serialization

Since messages are passed within the same process, serializing and deserializing them is redundant. Also disabling serialization gives a performance optimization.
Since messages are passed within the same process, serializing and deserializing them is redundant. Also, disabling serialization gives a performance improvement.

> Serialization is disabled by default for memory bus.
Expand All @@ -50,23 +55,24 @@ Serialization can be disabled or enabled:
services.AddSlimMessageBus(mbb =>
{
// Bus configuration happens here (...)
mbb.WithProviderMemory(new MemoryMessageBusSettings
mbb.WithProviderMemory(cfg =>
{
// Do not serialize the domain events and rather pass the same instance across handlers
EnableMessageSerialization = false
// Serialize the domain events instead of passing the same instance across to handlers/consumers
cfg.EnableMessageSerialization = true
});
//mbb.AddJsonSerializer((); // serializer not needed if EnableMessageSerialization = false
// Serializer not needed if EnableMessageSerialization = false
mbb.AddJsonSerializer();
});
```

> When serialization is disabled for in memory passed messages, the exact same object instance send by the producer will be recieved by the consumer. Therefore state changes on the consumer end will be visible by the producer. Consider making the messages immutable (read only).
> When serialization is disabled for in memory passed messages, the exact same object instance send by the producer will be received by the consumer. Therefore state changes on the consumer end will be visible by the producer.
> Consider making the messages immutable (read only) in that case.
### Virtual Topics

Unlike other transport providers, memory transport does not have true notion of topics (or queues). However, it is still required to use topic names. This is required, so that the bus knows on which virtual topic to deliver the message to, and from what virtual top
erSubmittedEvent"));
Unlike other transport providers, memory transport does not have true notion of topics (or queues). However, it is still required to use topic names. This is required, so that the bus knows on which virtual topic to deliver the message to, and from what virtual topic to consume from.

and the consumer side:
The consumer configuration side should use `.Topic()` to set the virtual topic name:

```cs
// declare that OrderSubmittedEvent will be consumed
Expand All @@ -76,13 +82,19 @@ mbb.Consume<OrderSubmittedEvent>(x => x.Topic(x.MessageType.Name).WithConsumer<O
mbb.Consume<OrderSubmittedEvent>(x => x.Topic("OrderSubmittedEvent").WithConsumer<OrderSubmittedHandler>());
```

The producer configuration side should use `.DefaultTopic()` to set the virtual topic name:

```cs
mbb.Produce<OrderSubmittedEvent>(x => x.DefaultTopic("OrderSubmittedEvent"));
```

> The virtual topic name can be any string. It helps to connect the relevant producers and consumers together.
### Auto Declaration

> Since 1.19.1
For bus configuration, we can leverage `AutoDeclareFrom()` method to discover all the consumers (`IConsumer<T>`) and handlers (`IRequestHandler<T,R>`) types and auto declare the respective producers and consumers/handlers in the bus.
For bus configuration, we can leverage `.AutoDeclareFrom()` method to discover all the consumers (`IConsumer<T>`) and handlers (`IRequestHandler<T,R>`) types in an assembly and auto declare the respective producers and consumers/handlers in the bus.
This can be useful to auto declare all of the domain event handlers in an application layer.

```cs
Expand All @@ -93,7 +105,7 @@ mbb
//.AutoDeclareFrom(Assembly.GetExecutingAssembly(), consumerTypeFilter: (consumerType) => consumerType.Name.EndsWith("Handler"));
```

For example, assuming this is the discovered type:
For example, assuming this is the discovered handler type:

```cs
public class EchoRequestHandler : IRequestHandler<EchoRequest, EchoResponse>
Expand All @@ -102,16 +114,16 @@ public class EchoRequestHandler : IRequestHandler<EchoRequest, EchoResponse>
}
```

The bus registrations will end up:
The bus auto registrations will set-up the producer and handler to the equivalent:

```cs
mbb.Produce<EchoRequest>(x => x.DefaultTopic(x.MessageType.Name));
mbb.Handle<EchoRequest, EchoResponse>(x => x.Topic(x.MessageType.Name).WithConsumer<EchoRequestHandler>());
```

The vitual topic name will be derived from the message type name by default. This can be customized by passing an additional parameter to the `AutoDeclareFrom()` method.
The virtual topic name will be derived from the message type name by default. This can be customized by passing an additional parameter to the `AutoDeclareFrom()` method.

Using `AutoDeclareFrom()` to configure the memory bus is recommended, as it provides a good developer experience.
Using `AutoDeclareFrom()` to configure the memory bus is recommended, as it will declare the producers and consumers automatically as consumer types are added over time.

> Note that it is still required to register (or auto register) the consumer/handler types in the underlying DI (see [here](intro.md#autoregistration-of-consumers-interceptors-and-configurators)).
Expand All @@ -128,25 +140,79 @@ The polymorphic message types (message that share a common ancestry) are support

## Lifecycle

### Concurrency and Ordering

The order of message delivery to consumer will match the order of producer.

By default, each consumer processes one message at a time to ensure ordering.
To [increase the concurrency level](intro.md#concurrently-processed-messages) use the `.Instances(n)` setting:

```cs
mbb.Consume<PingMessage>(x => x.Instances(2));
```

When number of concurrent consumer instances > 0 (`.Instances(N)`) then up to N messages will be processed concurrently (having impact on ordering).

### Blocking Publish

The `Send<T>()` is blocking and `Publsh<T>()` is blocking by default.
It might be expected that the `Publish<T>()` to be fire-and-forget and non-blocking, however this is to ensure the consumer/handler have been processed by the time the method call returns. That behavior is optimized for domain-events where you expect the side effects to be executed synchronously.
Similar to `Send<T>()`, the `Publish<T>()` is blocking by default.

It might be expected that the `Publish<T>()` to be non-blocking (asynchronous) and that the consumer processes the message in the background.
However, blocking mode is enabled by default to ensure the consumer has finished processing by the time the method call returns.
Often we want all the side effect to finish within the unit of work (ongoing web-request, or external message being handled).
That behavior is optimized for domain-events where the side effects are to be executed synchronously within the unit of work.

### Asynchronous Publish

ToDo: In the future we will want to expose a setting to make the `Publish<T>()` non-blocking.
> Since 2.3.0
> In contrast to MediatR, having the `Publish<T>` blocking, allows us to avoid having to use `Send<T, R>` and have the developer to use `Unit` or `VoidResult`.
To use non-blocking publish use the `EnableBlockingPublish` property setting:

```cs
services.AddSlimMessageBus(mbb =>
{
mbb
.WithProviderMemory(cfg =>
{
cfg.EnableMessageSerialization = _enableSerialization;
cfg.EnableBlockingPublish = _enableBlockingPublish;
})
.AddServicesFromAssemblyContaining<PingConsumer>()
.AddJsonSerializer();
});
```

When the `.Publish<T>()` is invoked in the non-blocking mode:

- the consumers will be executed in another async task (in the background),
- that task cancellation token will be bound to the message bus lifecycle (consumers are stopped, the bus is disposed or application shuts down),
- the order of message delivery to consumer will match the order of publish,
- however, when number of concurrent consumer instances > 0 (`.Instances(N)`) then up to N messages will be processed concurrently (having impact on ordering)
- the unit of work where the message is published is decoupled from the consumer unit of work, there will be an independent per message scope created in the DI on the consumer side - this allows to scope consumer dependencies to the message unit of work.

### Error Handling

The exceptions raised in an handler (`IRequestHandler<T, R>`) are bubbled up to the sender (`await bus.Send(request)`).
Likewise, the exceptions raised in an consumer (`IConsumer<T>`) are also bubbled up to the publisher (`await bus.Publish<T>(message)`).
The exceptions raised in a handler (`IRequestHandler<T, R>`) are bubbled up to the sender (`await bus.Send(request)`).
Likewise, the exceptions raised in a consumer (`IConsumer<T>`) are also bubbled up to the publisher (`await bus.Publish<T>(message)`) when [blocking publish mode](#blocking-publish) is enabled.

In the case of [non-blocking publish mode](#asynchronous-publish) when an exception is raised by a message consumer, the memory transport will log the exception and move on to the next message. For more elaborate behavior a custom [error handler](intro.md#error-handling) should be setup.

There is a memory transport specific error handler interface [IMemoryConsumerErrorHandler<T>](../src/SlimMessageBus.Host.Memory/Consumers/IMemoryConsumerErrorHandler.cs) that will be preferred by the memory transport.

The error handler has to be registered in MSDI for all (or specified) message types:

```cs
// Register error handler in MSDI for any message type
services.AddTransient(typeof(IMemoryConsumerErrorHandler<>), typeof(CustomMemoryConsumerErrorHandler<>));
```

The [interceptors](intro.md#interceptors) are able to handle the error before it reaches back to the publisher or sender.
See also the common [error handling](intro.md#error-handling).

### Per-Message DI scope

Unlike stated for the [Introduction](intro.md) the memory bus has per-message scoped disabled by default. However, the memory bus consumer/handler would join the already ongoing DI scope. This is desired in scenarios where an external message is being handled as part of the unit of work (Kafka/ASB, etc) or an API HTTP request is being handled - the memory bus consumer/handler will be looked up in the ongoing/current DI scope.
Unlike stated for the [Introduction](intro.md) the memory bus has per-message scoped disabled by default.
However, the memory bus consumer/handler would join (enlist in) the already ongoing DI scope.
This is desired in scenarios where an external message is being handled as part of the unit of work (Kafka/ASB, etc) or an API HTTP request is being handled - the memory bus consumer/handler will be looked up in the ongoing/current DI scope.

## Benchmarks

Expand All @@ -155,7 +221,7 @@ The project [`SlimMessageBus.Host.Memory.Benchmark`](/src/Tests/SlimMessageBus.H
- The test includes a 1M of simple messages being produced.
- The consumers do not do any logic - we want to test how fast can the messages flow through the bus.
- The benchmark application uses a real life setup including dependency injection container.
- There is a viaration of the test that captures the overhead for the interceptor pipeline.
- There is a variation of the test that captures the overhead for the interceptor pipeline.

Pub/Sub scenario results:

Expand Down Expand Up @@ -190,4 +256,4 @@ Intel Core i7-8550U CPU 1.80GHz (Kaby Lake R), 1 CPU, 8 logical and 4 physical c
Job-XKUBHP : .NET 6.0.11 (6.0.1122.52304), X64 RyuJIT AVX2
```

See the benchmark source [here](../src/Tests/SlimMessageBus.Host.Memory.Benchmark).
See the benchmark source [here](../src/Tests/SlimMessageBus.Host.Memory.Benchmark).
Loading

0 comments on commit aa57a32

Please sign in to comment.