Skip to content

Commit

Permalink
Concurrently processed messages for RabbitMq transport #205
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Maruszak <maruszaktomasz@gmail.com>
  • Loading branch information
zarusz committed Jan 14, 2024
1 parent fdf4552 commit 6471b8a
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 133 deletions.
49 changes: 35 additions & 14 deletions docs/provider_rabbitmq.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ Please read the [Introduction](intro.md) before reading this provider documentat
- [Consumer Error Handling](#consumer-error-handling)
- [Dead Letter Exchange](#dead-letter-exchange)
- [Custom Consumer Error Handler](#custom-consumer-error-handler)
- [Consumer Concurrency Level](#consumer-concurrency-level)
- [Request-Response](#request-response)
- [Topology Provisioning](#topology-provisioning)
- [Not Supported](#not-supported)
- [Feeback](#feeback)
- [Feedback](#feedback)

## Underlying client

Expand Down Expand Up @@ -52,8 +53,8 @@ services.AddSlimMessageBus((mbb) =>
// cfg.ConnectionFactory.Password = "..."
// cfg.ConnectionFactory.Ssl.Enabled = true
// Fine tune the underlying RabbitMQ.Client:
// cfg.ConnectionFactory.ClientProvidedName = $"MyService_{Environment.MachineName}";
// Fine tune the underlying RabbitMQ.Client:
// cfg.ConnectionFactory.ClientProvidedName = $"MyService_{Environment.MachineName}";
});

mbb.AddServicesFromAssemblyContaining<PingConsumer>();
Expand All @@ -64,9 +65,9 @@ services.AddSlimMessageBus((mbb) =>
The relevant elements of the `cfg`:

- The `ConnectionString` allows to set the AMQP URI.
This property is a convenience wrapper on top of `ConnectionFactory.Uri` from the underlying client library.
The URI has the following form: `amqps://<username>:<password>@<host>/<virtual-host>`.
- The `ConnectionFactory` allows to access other client settings. It can be used to setup other connection details in case the AMQP URI cannot be used or there is a need to fine tune the client.
This property is a convenience wrapper on top of `ConnectionFactory.Uri` from the underlying client library.
The URI has the following form: `amqps://<username>:<password>@<host>/<virtual-host>`.
- The `ConnectionFactory` allows to access other client settings. It can be used to setup other connection details in case the AMQP URI cannot be used or there is a need to fine tune the client. For more options see the underlying [RabbitMQ driver docs](https://www.rabbitmq.com/dotnet-api-guide.html#connecting).

### Producers

Expand All @@ -82,7 +83,7 @@ mbb.Produce<OrderEvent>(x => x
.Exchange("orders", exchangeType: ExchangeType.Fanout)
// Will use a routing key provider that for a given message will take it's Id field
.RoutingKeyProvider((m, p) => m.Id.ToString())
// Will use
// Will use
.MessagePropertiesModifier((m, p) =>
{
p.MessageId = GetMessageId(m);
Expand All @@ -96,7 +97,7 @@ services.AddSlimMessageBus((mbb) =>
{
mbb.WithProviderRabbitMQ(cfg =>
{
// All exchanges declared on producers will be durable by default
// All exchanges declared on producers will be durable by default
cfg.UseExchangeDefaults(durable: true);

// All messages will get the ContentType message property assigned
Expand All @@ -122,7 +123,7 @@ Additionally,
mbb.Consume<PingMessage>(x => x
// Use the subscriber queue, do not auto delete
.Queue("subscriber", autoDelete: false)
//
//
.ExchangeBinding("ping")
// The queue declaration in RabbitMQ will have a reference to the dead letter exchange and the DL exchange will be created
.DeadLetterExchange("subscriber-dlq", exchangeType: ExchangeType: Direct)
Expand All @@ -146,7 +147,7 @@ We can specify defaults for all consumers on the bus level:

By default the the transport implementation performs a negative ack (nack) in the AMQP protocol for any message that failed in the consumer. As a result the message will be marked as failed and routed to an dead letter exchange or discarded by the RabbitMQ broker.

The recomendation here is to either:
The recommendation here is to either:

- configure a [dead letter exchange](#dead-letter-exchange) configured on the consumer queue,
- or provide a [custom error handler](#custom-consumer-error-handler) (retry the message couple of times, if failed send to a dead letter exchange).
Expand Down Expand Up @@ -233,6 +234,26 @@ services.AddTransient(typeof(RabbitMqConsumerErrorHandler<>), typeof(CustomRabbi

> When error handler is not found in the DI or it returns `false` then default error handling will be applied.
### Consumer Concurrency Level

By default each consumer in the service process will handle one message at the same time.
In order to increase the desired concurrency, set the [`ConsumerDispatchConcurrency`](https://www.rabbitmq.com/dotnet-api-guide.html#consumer-callbacks-and-ordering) to a value greater than 1.
This is a setting from the underlying RabbitMQ driver that SMB uses.

```cs
services.AddSlimMessageBus((mbb) =>
{
mbb.WithProviderRabbitMQ(cfg =>
{
cfg.ConnectionFactory.ConsumerDispatchConcurrency = 2; // default is 1
// ...
}
}
```

> Notice that increasing concurrency will cause more messages to be processed at the same time within one service instance, hence affecting order of consumption.
> In scenarios where order of consumption is important, you may want to keep concurrency levels set to 1.

### Request-Response

Here is an example how to set-up request-response flow over RabbitMQ. The fanout exchange types was used, but other type could be used as well (altough we might have to provide the [routing key provider](#producers) on the producer side.)
Expand All @@ -257,7 +278,7 @@ services.AddSlimMessageBus((mbb) =>
.ExpectRequestResponses(x =>
{
// Tell the handler to which exchange send the responses to
x.ReplyToExchange("test-echo-resp", ExchangeType.Fanout);
x.ReplyToExchange("test-echo-resp", ExchangeType.Fanout);
// Which queue to use to read responses from
x.Queue("test-echo-resp-queue");
// Bind to the reply to exchange
Expand All @@ -272,14 +293,14 @@ services.AddSlimMessageBus((mbb) =>

SMB automatically creates exchanges from producers, queues, dead letter exchanges and bindings from consumers.

However, if you need to layer on other topology elements (or peform cleanup) this could be achieved with `UseTopologyInitalizer()`:
However, if you need to layer on other topology elements (or peform cleanup) this could be achieved with `UseTopologyInitializer()`:

```cs
services.AddSlimMessageBus((mbb) =>
{
mbb.WithProviderRabbitMQ(cfg =>
{
cfg.UseTopologyInitalizer((channel, applyDefaultTopology) =>
cfg.UseTopologyInitializer((channel, applyDefaultTopology) =>
{
// perform some cleanup if needed
channel.QueueDelete("subscriber-0", ifUnused: true, ifEmpty: false);
Expand All @@ -302,6 +323,6 @@ This might be useful in case the SMB inferred topology is not desired or there a
- [Default type exchanges](https://www.rabbitmq.com/tutorials/amqp-concepts.html#exchange-default) are not yet supported
- Broker generated queues are not yet supported.

## Feeback
## Feedback

Open a github issue if you need a feature, have a suggestion for improvement, or want to contribute an enhancement.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ public static class RabbitMqMessageBusSettingsExtensions
/// </summary>
/// <param name="settings"></param>
/// <param name="action">Action to be executed, the first param is the RabbitMQ <see cref="IModel"/> from the underlying client, and second parameter represents the SMB exchange, queue and binding setup</param>
public static RabbitMqMessageBusSettings UseTopologyInitalizer(this RabbitMqMessageBusSettings settings, RabbitMqTopologyInitializer action)
public static RabbitMqMessageBusSettings UseTopologyInitializer(this RabbitMqMessageBusSettings settings, RabbitMqTopologyInitializer action)
{
if (action == null) throw new ArgumentNullException(nameof(action));

Expand Down
Loading

0 comments on commit 6471b8a

Please sign in to comment.