Skip to content

Lightweight message bus interface for .NET (pub/sub and request-response) with transport plugins for popular message brokers.

License

Notifications You must be signed in to change notification settings

zarusz/SlimMessageBus

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

SlimMessageBus

SlimMessageBus is a client façade for message brokers for .NET. It comes with implementations for specific brokers (RabbitMQ, Kafka, Azure EventHub, MQTT, Redis Pub/Sub) and in-memory message passing (in-process communication). SlimMessageBus additionally provides request-response implementation over message queues.

Gitter GitHub license Build Maintainability Rating Coverage Duplicated Lines (%) Vulnerabilities Quality Gate Status

The v2 release is available (see migration guide). The v3 release is under construction.

Key elements of SlimMessageBus

  • Consumers:
    • IConsumer<in TMessage> - subscriber in pub/sub (or queue consumer)
    • IRequestHandler<in TRequest, TResponse> & IRequestHandler<in TRequest> - request handler in request-response
  • Producers:
    • IPublishBus - publisher in pub/sub (or queue producer)
    • IRequestResponseBus - sender in req/resp
    • IMessageBus - extends IPublishBus and IRequestResponseBus
  • Misc:
    • IRequest<out TResponse> & IRequest - a marker for request messages
    • MessageBus - static accessor for current context IMessageBus

Docs

Packages

Name Description NuGet
SlimMessageBus The core API for SlimMessageBus NuGet
Transport providers
.Host.AzureEventHub Transport provider for Azure Event Hubs NuGet
.Host.AzureServiceBus Transport provider for Azure Service Bus NuGet
.Host.Kafka Transport provider for Apache Kafka NuGet
.Host.Memory Transport provider implementation for in-process (in memory) message passing (no messaging infrastructure required) NuGet
.Host.MQTT Transport provider for MQTT NuGet
.Host.NATS Transport provider for NATS NuGet
.Host.RabbitMQ Transport provider for RabbitMQ NuGet
.Host.Redis Transport provider for Redis NuGet
.Host.Sql (pending) Transport provider implementation for SQL database message passing NuGet
Serialization
.Host.Serialization.Json Serialization plugin for JSON (Newtonsoft.Json library) NuGet
.Host.Serialization.SystemTextJson Serialization plugin for JSON (System.Text.Json library) NuGet
.Host.Serialization.Avro Serialization plugin for Avro (Apache.Avro library) NuGet
.Host.Serialization.Hybrid Plugin that delegates serialization to other serializers based on message type NuGet
.Host.Serialization.GoogleProtobuf Serialization plugin for Google Protobuf NuGet
Plugins
.Host.AspNetCore Integration for ASP.NET Core NuGet
.Host.Interceptor Core interface for interceptors NuGet
.Host.FluentValidation Validation for messages based on FluentValidation NuGet
.Host.Outbox.Sql Transactional Outbox using SQL NuGet
.Host.Outbox.DbContext Transactional Outbox using EF DbContext NuGet
.Host.AsyncApi AsyncAPI specification generation via Saunter NuGet

Typically the application layers (domain model, business logic) only need to depend on SlimMessageBus which is the facade, and ultimately the application hosting layer (ASP.NET, Console App, Windows Service) will reference and configure the other packages (SlimMessageBus.Host.*) which are the messaging transport providers and additional plugins.

Samples

Basic usage

Some service (or domain layer) publishes a message:

IMessageBus bus; // injected

await bus.Publish(new SomeMessage());

Another service (or application layer) handles the message:

public class SomeMessageConsumer : IConsumer<SomeMessage>
{
   public async Task OnHandle(SomeMessage message, CancellationToken cancellationToken)
   {
       // handle the message
   }
}

Note: It is also possible to avoid having to implement the interface IConsumer<T> (see here).

The bus also supports request-response implemented via queues, topics or in-memory - depending on the chosen transport provider. The sender side sends a request message:

var response = await bus.Send(new SomeRequest());

Note: It is possible to configure the bus to timeout a request when the response does not arrive within the allotted time (see here).

The receiving side handles the request and replies:

public class SomeRequestHandler : IRequestHandler<SomeRequest, SomeResponse>
{
   public async Task<SomeResponse> OnHandle(SomeRequest request, CancellationToken cancellationToken)
   {
      // handle the request message and return a response
      return new SomeResponse { /* ... */ };
   }
}

The bus will ask the DI container to provide the consumer instances (SomeMessageConsumer, SomeRequestHandler).

There is also support for one-way request-response.

Configuration

The Microsoft.Extensions.DependencyInjection is used to compose the bus:

// IServiceCollection services;

services.AddSlimMessageBus(mbb =>
{
   mbb
      // First child bus - in this example Kafka transport
      .AddChildBus("Bus1", (builder) =>
      {
         builder
            .Produce<SomeMessage>(x => x.DefaultTopic("some-topic"))
            .Consume<SomeMessage>(x => x.Topic("some-topic")
               //.WithConsumer<SomeMessageConsumer>() // Optional: can be skipped as IConsumer<SomeMessage> will be resolved from DI
               //.KafkaGroup("some-kafka-consumer-group") // Kafka: Consumer Group
               //.SubscriptionName("some-azure-sb-topic-subscription") // Azure ServiceBus: Subscription Name
            );
            // ...
            // Use Kafka transport provider (requires SlimMessageBus.Host.Kafka package)
            .WithProviderKafka(cfg => { cfg.BrokerList = "localhost:9092"; }); // requires SlimMessageBus.Host.Kafka package
            // Use Azure Service Bus transport provider
            //.WithProviderServiceBus(cfg => { ... }) // requires SlimMessageBus.Host.AzureServiceBus package
            // Use Azure Event Hub transport provider
            //.WithProviderEventHub(cfg => { ... }) // requires SlimMessageBus.Host.AzureEventHub package
            // Use Redis transport provider
            //.WithProviderRedis(cfg => { ... }) // requires SlimMessageBus.Host.Redis package
            // Use RabbitMQ transport provider
            //.WithProviderRabbitMQ(cfg => { ... }) // requires SlimMessageBus.Host.RabbitMQ package
            // Use in-memory transport provider
            //.WithProviderMemory(cfg => { ... }) // requires SlimMessageBus.Host.Memory package
      })

      // Add other bus transports (as child bus), if needed
      //.AddChildBus("Bus2", (builder) => {  })

      // Scan assembly for consumers, handlers, interceptors, and register into MSDI
      .AddServicesFromAssemblyContaining<SomeMessageConsumer>()
      //.AddServicesFromAssembly(Assembly.GetExecutingAssembly());

      // Add JSON serializer
      .AddJsonSerializer(); // requires SlimMessageBus.Host.Serialization.Json or SlimMessageBus.Host.Serialization.SystemTextJson package
});

The configuration can be modularized.

Use Case: Domain Events (in-process pub/sub messaging)

This example shows how SlimMessageBus and SlimMessageBus.Host.Memory can be used to implement the Domain Events pattern. The provider passes messages in the same process (no external message broker is required).

The domain event is a simple POCO:

// domain event
public record OrderSubmittedEvent(Order Order, DateTime Timestamp);

The domain event handler implements the IConsumer<T> interface:

// domain event handler
public class OrderSubmittedHandler : IConsumer<OrderSubmittedEvent>
{
   public Task OnHandle(OrderSubmittedEvent e, CancellationToken cancellationToken)
   {
      // ...
   }
}

The domain event handler (consumer) is obtained from the MSDI at the time of event publication. The event publish enlists in the ongoing scope (web request scope, external message scope of the ongoing message).

In the domain model layer, the domain event gets raised:

// aggregate root
public class Order
{
   public Customer Customer { get; }
   public OrderState State { get; private set; }

   private IList<OrderLine> lines = new List<OrderLine>();
   public IEnumerable<OrderLine> Lines => lines.AsEnumerable();

   public Order(Customer customer)
   {
      Customer = customer;
      State = OrderState.New;
   }

   public OrderLine Add(string productId, int quantity) { }

   public Task Submit()
   {
      State = OrderState.Submitted;

      // Raise domain event
      return MessageBus.Current.Publish(new OrderSubmittedEvent(this));
   }
}

Sample logic executed by the client of the domain model:

var john = new Customer("John", "Whick");

var order = new Order(john);
order.Add("id_machine_gun", 2);
order.Add("id_grenade", 4);

await order.Submit(); // events fired here

Notice the static MessageBus.Current property is configured to resolve a scoped IMessageBus instance (web request-scoped or pick-up message scope from a currently processed message).

The SlimMessageBus configuration for the in-memory provider looks like this:

//IServiceCollection services;

// Configure the message bus
services.AddSlimMessageBus(mbb =>
{
   mbb.WithProviderMemory();
   // Find types that implement IConsumer<T> and IRequestHandler<T, R> and declare producers and consumers on the mbb
   mbb.AutoDeclareFrom(Assembly.GetExecutingAssembly());
   // Scan assembly for consumers, handlers, interceptors, and register into MSDI
   mbb.AddServicesFromAssemblyContaining<OrderSubmittedHandler>();
});

For the ASP.NET project, set up the MessageBus.Current helper (if you want to use it, and pick up the current web-request scope):

services.AddSlimMessageBus(mbb =>
{
   // ...
   mbb.AddAspNet(); // requires SlimMessageBus.Host.AspNetCore package
});
services.AddHttpContextAccessor(); // This is required by the SlimMessageBus.Host.AspNetCore plugin

See the complete sample for ASP.NET Core where the handler and bus are web-request scoped.

Use Case: MediatR replacement

The SlimMessageBus in-memory provider can replace the need to use MediatR library:

  • It has similar semantics and has the interceptor pipeline enabling the addition of custom behavior.
  • The generic interceptors can introduce common behavior like logging, authorization or audit of messages.
  • The FluentValidation plugin can introduce request/command/query validation.
  • The external communication can be layered on top of SlimMessageBus which allows having one library for in-memory and out-of-process messaging (Hybrid Provider).

See the CQRS and FluentValidation samples.

Use Case: Request-response over Kafka topics

See sample.

Features

  • Types of messaging patterns supported:
    • Publish-subscribe
    • Request-response
    • Queues
    • A hybrid of the above (e.g. Kafka with multiple topic consumers in one group)
  • Modern async/await syntax and TPL
  • Fluent configuration
  • SourceLink support
  • Because SlimMessageBus is a facade, chosen messaging transports can be swapped without impacting the overall application architecture.

Principles

  • The core of SlimMessageBus is "slim"
    • Simple, common and friendly API to work with messaging systems
    • No external dependencies.
    • The core interface can be used in the domain model (e.g. Domain Events)
  • Plugin architecture:
    • Message serialization (JSON, Avro, Protobuf)
    • Use your favorite messaging broker as a provider by simply pulling a NuGet package
    • Add transactional outbox pattern or message validation
  • No threads created (pure TPL)
  • Async/Await support
  • Fluent configuration
  • Logging is done via Microsoft.Extensions.Logging.Abstractions so that you can connect to your favorite logger provider.

License

Apache License 2.0

Build

cd src
dotnet build
dotnet pack --output ../dist

NuGet packages end up in dist folder

Testing

To run tests you need to update the secrets.txt to match your cloud infrastructure or local infrastructure. SMB has some message brokers set up on Azure for integration tests (secrets not shared).

Run all tests:

dotnet test

Run all tests except integration tests that require local/cloud infrastructure:

dotnet test --filter Category!=Integration

Credits

Thanks to Gravity9 for providing an Azure subscription that allows running the integration test infrastructure.

Gravity9

Thanks to the following service cloud providers for providing free instances for our integration tests:

If you want to help and sponsor, please write to me.