Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Host.HybridMessageSerialzier] Resolve HybridMessageSerializer dependencies from IServiceProvider #239

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 21 additions & 15 deletions docs/serialization.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,21 +175,27 @@ The Hybrid plugin allows to have multiple serialization formats on one message b
To use it install the nuget package `SlimMessageBus.Host.Serialization.Hybrid` and then configure the bus:

```cs
services.AddSlimMessageBus(mbb =>
{
// serializer 1
var avroSerializer = new AvroMessageSerializer();

// serializer 2
var jsonSerializer = new JsonMessageSerializer();

// Note: Certain messages will be serialized by the Avro serializer, other using the Json serializer
mbb.AddHybridSerializer(new Dictionary<IMessageSerializer, Type[]>
{
[jsonSerializer] = new[] { typeof(SubtractCommand) }, // the first one will be the default serializer, no need to declare types here
[avroSerializer] = new[] { typeof(AddCommand), typeof(MultiplyRequest), typeof(MultiplyResponse) },
}, defaultMessageSerializer: jsonSerializer);
});
services
.AddSlimMessageBus(mbb =>
{
mbb
.AddHybridSerializer(
builder => {
builder
.AsDefault()
.AddJsonSerializer();

builder
.For(typeof(Message1), typeof(Message2))
.AddAvroSerializer();

builder
.For(typeof(Message3))
.AddGoogleProtobufSerializer();
})
...
}
```

The routing to the proper serializer happens based on message type. When a type cannot be matched the default serializer will be used.
The routing to the proper serializer happens based on message type. When a type cannot be matched the default serializer will be used.
42 changes: 12 additions & 30 deletions src/Samples/Sample.Serialization.ConsoleApp/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,44 +39,26 @@ static async Task Main(string[] args) => await Host.CreateDefaultBuilder(args)
Secrets.Load(@"..\..\..\..\..\secrets.txt");

services.AddHostedService<MainProgram>();

// alternatively a simpler approach, but using the slower ReflectionMessageCreationStategy and ReflectionSchemaLookupStrategy
var avroSerializer = new AvroMessageSerializer();

// Avro serialized using the AvroConvert library - no schema generation neeeded upfront.
var jsonSerializer = new JsonMessageSerializer();

services
.AddSlimMessageBus(mbb =>
{
// Note: remember that Memory provider does not support req-resp yet.
var provider = Provider.Redis;

/*
var sl = new DictionarySchemaLookupStrategy();
/// register all your types
sl.Add(typeof(AddCommand), AddCommand._SCHEMA);
sl.Add(typeof(MultiplyRequest), MultiplyRequest._SCHEMA);
sl.Add(typeof(MultiplyResponse), MultiplyResponse._SCHEMA);
var mf = new DictionaryMessageCreationStategy();
/// register all your types
mf.Add(typeof(AddCommand), () => new AddCommand());
mf.Add(typeof(MultiplyRequest), () => new MultiplyRequest());
mf.Add(typeof(MultiplyResponse), () => new MultiplyResponse());
// longer approach, but should be faster as it's not using reflection
var avroSerializer = new AvroMessageSerializer(mf, sl);
*/
var provider = Provider.Memory;

mbb
.AddServicesFromAssemblyContaining<AddCommandConsumer>()
// Note: Certain messages will be serialized by one Avro serializer, other using the Json serializer
.AddHybridSerializer(new Dictionary<IMessageSerializer, Type[]>

// Note: Certain messages will be serialized by the Avro serializer, others will fall back to the Json serializer (the default)
.AddHybridSerializer(builder =>
{
[jsonSerializer] = new[] { typeof(SubtractCommand) }, // the first one will be the default serializer, no need to declare types here
[avroSerializer] = new[] { typeof(AddCommand), typeof(MultiplyRequest), typeof(MultiplyResponse) },
}, defaultMessageSerializer: jsonSerializer)
builder
.AsDefault()
.AddJsonSerializer();

builder
.For(typeof(AddCommand), typeof(MultiplyRequest), typeof(MultiplyResponse))
.AddAvroSerializer();
})

.Produce<AddCommand>(x => x.DefaultTopic("AddCommand"))
.Consume<AddCommand>(x => x.Topic("AddCommand").WithConsumer<AddCommandConsumer>())
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
namespace SlimMessageBus.Host;

public interface ISerializationBuilder : IHasPostConfigurationActions
public interface ISerializationBuilder
{
void RegisterSerializer<TMessageSerializer>(Action<IServiceCollection> services) where TMessageSerializer : class, IMessageSerializer;
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
namespace SlimMessageBus.Host;

public class MessageBusBuilder : ISerializationBuilder
using Microsoft.Extensions.DependencyInjection.Extensions;

public class MessageBusBuilder : IHasPostConfigurationActions, ISerializationBuilder
{
/// <summary>
/// Parent bus builder.
Expand Down Expand Up @@ -224,6 +226,13 @@
return this;
}

public void RegisterSerializer<TMessageSerializer>(Action<IServiceCollection> services)
where TMessageSerializer : class, IMessageSerializer
{
PostConfigurationActions.Add(services);
PostConfigurationActions.Add(services => services.TryAddSingleton<IMessageSerializer>(sp => sp.GetRequiredService<TMessageSerializer>()));
}

public MessageBusBuilder WithDependencyResolver(IServiceProvider serviceProvider)
{
Settings.ServiceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
Expand Down Expand Up @@ -264,7 +273,7 @@
public MessageBusBuilder WithMessageTypeResolver<T>() => WithMessageTypeResolver(typeof(T));

/// <summary>
/// Hook called whenver message is being produced. Can be used to change message headers.
/// Hook called whenever message is being produced. Can be used to change message headers.
/// </summary>
/// <param name="executePrevious">Should the previously set modifier be executed as well?</param>
public MessageBusBuilder WithHeaderModifier(MessageHeaderModifier<object> headerModifier, bool executePrevious = true)
Expand Down Expand Up @@ -325,7 +334,7 @@
child.MergeFrom(Settings);
}

builderAction?.Invoke(child);

Check warning on line 337 in src/SlimMessageBus.Host.Configuration/Builders/MessageBusBuilder.cs

View workflow job for this annotation

GitHub Actions / build

Remove this unnecessary check for null. (https://rules.sonarsource.com/csharp/RSPEC-2589)

Check warning on line 337 in src/SlimMessageBus.Host.Configuration/Builders/MessageBusBuilder.cs

View workflow job for this annotation

GitHub Actions / build

Remove this unnecessary check for null. (https://rules.sonarsource.com/csharp/RSPEC-2589)

Check warning on line 337 in src/SlimMessageBus.Host.Configuration/Builders/MessageBusBuilder.cs

View workflow job for this annotation

GitHub Actions / build

Remove this unnecessary check for null. (https://rules.sonarsource.com/csharp/RSPEC-2589)

return this;
}
Expand All @@ -338,5 +347,5 @@
throw new ConfigurationMessageBusException($"{busName}The bus provider was not configured. Check the MessageBus configuration and ensure the has the '.WithProviderXxx()' setting for one of the available transports.");
}
return BusFactory(Settings);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ public static class SerializationBuilderExtensions
public static TBuilder AddAvroSerializer<TBuilder>(this TBuilder builder, IMessageCreationStrategy messageCreationStrategy, ISchemaLookupStrategy schemaLookupStrategy)
where TBuilder : ISerializationBuilder
{
builder.PostConfigurationActions.Add(services =>
builder.RegisterSerializer<AvroMessageSerializer>(services =>
{
services.TryAddSingleton(svp => new AvroMessageSerializer(svp.GetRequiredService<ILoggerFactory>(), messageCreationStrategy, schemaLookupStrategy));
services.TryAddSingleton<IMessageSerializer>(svp => svp.GetRequiredService<AvroMessageSerializer>());
});
return builder;
}
Expand All @@ -35,10 +34,9 @@ public static TBuilder AddAvroSerializer<TBuilder>(this TBuilder builder, IMessa
public static TBuilder AddAvroSerializer<TBuilder>(this TBuilder builder)
where TBuilder : ISerializationBuilder
{
builder.PostConfigurationActions.Add(services =>
builder.RegisterSerializer<AvroMessageSerializer>(services =>
{
services.TryAddSingleton(svp => new AvroMessageSerializer(svp.GetRequiredService<ILoggerFactory>()));
services.TryAddSingleton<IMessageSerializer>(svp => svp.GetRequiredService<AvroMessageSerializer>());
});
return builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@ public static class SerializationBuilderExtensions
public static TBuilder AddGoogleProtobufSerializer<TBuilder>(this TBuilder builder, IMessageParserFactory messageParserFactory = null)
where TBuilder : ISerializationBuilder
{
builder.PostConfigurationActions.Add(services =>
builder.RegisterSerializer<GoogleProtobufMessageSerializer>(services =>
{
services.TryAddSingleton(svp => new GoogleProtobufMessageSerializer(svp.GetRequiredService<ILoggerFactory>(), messageParserFactory));
services.TryAddSingleton<IMessageSerializer>(svp => svp.GetRequiredService<GoogleProtobufMessageSerializer>());
});
return builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@
public class HybridMessageSerializer : IMessageSerializer
{
private readonly ILogger _logger;
private readonly IList<IMessageSerializer> _serializers = new List<IMessageSerializer>();
private readonly IDictionary<Type, IMessageSerializer> _serializerByType = new Dictionary<Type, IMessageSerializer>();
private readonly Dictionary<Type, IMessageSerializer> _serializerByType = [];

public IMessageSerializer DefaultSerializer { get; set; }

internal IReadOnlyDictionary<Type, IMessageSerializer> SerializerByType => _serializerByType;

public HybridMessageSerializer(ILogger<HybridMessageSerializer> logger, IDictionary<IMessageSerializer, Type[]> registration, IMessageSerializer defaultMessageSerializer = null)
{
_logger = logger;
Expand All @@ -24,12 +26,14 @@ public HybridMessageSerializer(ILogger<HybridMessageSerializer> logger, IDiction

public void Add(IMessageSerializer serializer, params Type[] supportedTypes)
{
if (_serializers.Count == 0 && DefaultSerializer == null)
{
DefaultSerializer = serializer;
}
#if NETSTANDARD2_0
if (serializer is null) throw new ArgumentNullException(nameof(serializer));
#else
ArgumentNullException.ThrowIfNull(serializer);
#endif

DefaultSerializer ??= serializer;

_serializers.Add(serializer);
foreach (var type in supportedTypes)
{
_serializerByType.Add(type, serializer);
Expand All @@ -38,19 +42,19 @@ public void Add(IMessageSerializer serializer, params Type[] supportedTypes)

protected virtual IMessageSerializer MatchSerializer(Type t)
{
if (_serializers.Count == 0)
{
throw new InvalidOperationException("No serializers registered.");
}

if (!_serializerByType.TryGetValue(t, out var serializer))
{
// use first as default
_logger.LogTrace("Serializer for type {0} not registered, will use default serializer", t);
_logger.LogTrace("Serializer for type {MessageType} not registered, will use default serializer", t);

if (DefaultSerializer == null)
{
throw new InvalidOperationException("No serializers registered.");
}

serializer = DefaultSerializer;
}

_logger.LogDebug("Serializer for type {0} will be {1}", t, serializer);
_logger.LogDebug("Serializer for type {MessageType} will be {Serializer}", t, serializer);
return serializer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,120 @@
public static TBuilder AddHybridSerializer<TBuilder>(this TBuilder builder, IDictionary<IMessageSerializer, Type[]> registration, IMessageSerializer defaultMessageSerializer)
where TBuilder : ISerializationBuilder
{
builder.PostConfigurationActions.Add(services =>
builder.RegisterSerializer<HybridMessageSerializer>(services =>
{
services.TryAddSingleton(svp => new HybridMessageSerializer(svp.GetRequiredService<ILogger<HybridMessageSerializer>>(), registration, defaultMessageSerializer));
services.TryAddSingleton<IMessageSerializer>(svp => svp.GetRequiredService<HybridMessageSerializer>());
});
return builder;
}

/// <summary>
/// Registers the <see cref="IMessageSerializer"/> with implementation as <see cref="HybridMessageSerializer"/> using serializers as registered in the <see cref="IServiceCollection"/>.
/// </summary>
/// <param name="mbb"><see cref="MessageBusBuilder"/></param>
/// <param name="registration">Action to register serializers for dependency injection resolution.</param>
/// <returns><see cref="MessageBusBuilder"/></returns>
public static MessageBusBuilder AddHybridSerializer(this MessageBusBuilder mbb, Action<HybridSerializerOptionsBuilder> registration)
{
var builder = new HybridSerializerOptionsBuilder();
registration(builder);

foreach (var action in builder.ServiceRegistrations)
{
mbb.PostConfigurationActions.Add(action);
}

mbb.PostConfigurationActions.Add(services =>
{
services.TryAddSingleton(svp =>
{
if (services.Count(x => x.ServiceType == typeof(IMessageSerializer)) > 1)
{
throw new NotSupportedException($"Registering instances of {nameof(IMessageSerializer)} outside of {nameof(AddHybridSerializer)} is not supported.");
}

var defaultMessageSerializer = builder.DefaultSerializer != null ? (IMessageSerializer)svp.GetRequiredService(builder.DefaultSerializer) : null;
var typeRegistrations = builder.TypeRegistrations.ToDictionary(x => (IMessageSerializer)svp.GetRequiredService(x.Key), x => x.Value);
return new HybridMessageSerializer(svp.GetRequiredService<ILogger<HybridMessageSerializer>>(), typeRegistrations, defaultMessageSerializer);
});

services.AddSingleton<IMessageSerializer>(svp => svp.GetRequiredService<HybridMessageSerializer>());
});
return mbb;
}

public sealed class HybridSerializerOptionsBuilder
{
private readonly List<SerializerConfiguration> _configurations = [];

public Type DefaultSerializer
{
get
{
return _configurations

Check warning on line 71 in src/SlimMessageBus.Host.Serialization.Hybrid/SerializationBuilderExtensions.cs

View workflow job for this annotation

GitHub Actions / build

'_configurations
.OfType<DefaultSerializerConfiguration>()
.LastOrDefault(x => x.IsValid)
.Type;
}
}

public IReadOnlyList<Action<IServiceCollection>> ServiceRegistrations
{
get
{
return _configurations

Check warning on line 82 in src/SlimMessageBus.Host.Serialization.Hybrid/SerializationBuilderExtensions.cs

View workflow job for this annotation

GitHub Actions / build

Refactor 'ServiceRegistrations' into a method, properties should not copy collections. (https://rules.sonarsource.com/csharp/RSPEC-2365)

Check warning on line 82 in src/SlimMessageBus.Host.Serialization.Hybrid/SerializationBuilderExtensions.cs

View workflow job for this annotation

GitHub Actions / build

Refactor 'ServiceRegistrations' into a method, properties should not copy collections. (https://rules.sonarsource.com/csharp/RSPEC-2365)
.Where(x => x.IsValid)
.Select(x => x.Action)
.ToList();
}
}

public IReadOnlyDictionary<Type, Type[]> TypeRegistrations
{
get
{
return _configurations
.OfType<ForSerializerConfiguration>()
.Where(x => x.IsValid)
.ToDictionary(x => x.Type, x => x.Types);
}
}

public ISerializationBuilder AsDefault()
{
var configuration = new DefaultSerializerConfiguration();
this._configurations.Add(configuration);
return configuration;
}

public ISerializationBuilder For(params Type[] types)
{
var configuration = new ForSerializerConfiguration(types);
this._configurations.Add(configuration);
return configuration;
}

public abstract class SerializerConfiguration : ISerializationBuilder
{
public Action<IServiceCollection> Action { get; private set; }
public bool IsValid => Type != null;
public Type Type { get; private set; } = null;

public void RegisterSerializer<TMessageSerializer>(Action<IServiceCollection> services)
where TMessageSerializer : class, IMessageSerializer
{
Type = typeof(TMessageSerializer);
Action = services;
}
}

public class ForSerializerConfiguration(Type[] types) : SerializerConfiguration, ISerializationBuilder

Check warning on line 128 in src/SlimMessageBus.Host.Serialization.Hybrid/SerializationBuilderExtensions.cs

View workflow job for this annotation

GitHub Actions / build

'SerializerConfiguration' implements 'ISerializationBuilder' so 'ISerializationBuilder' can be removed from the inheritance list. (https://rules.sonarsource.com/csharp/RSPEC-1939)

Check warning on line 128 in src/SlimMessageBus.Host.Serialization.Hybrid/SerializationBuilderExtensions.cs

View workflow job for this annotation

GitHub Actions / build

'SerializerConfiguration' implements 'ISerializationBuilder' so 'ISerializationBuilder' can be removed from the inheritance list. (https://rules.sonarsource.com/csharp/RSPEC-1939)
{
public Type[] Types { get; } = types;
}

public class DefaultSerializerConfiguration : SerializerConfiguration, ISerializationBuilder

Check warning on line 133 in src/SlimMessageBus.Host.Serialization.Hybrid/SerializationBuilderExtensions.cs

View workflow job for this annotation

GitHub Actions / build

'SerializerConfiguration' implements 'ISerializationBuilder' so 'ISerializationBuilder' can be removed from the inheritance list. (https://rules.sonarsource.com/csharp/RSPEC-1939)

Check warning on line 133 in src/SlimMessageBus.Host.Serialization.Hybrid/SerializationBuilderExtensions.cs

View workflow job for this annotation

GitHub Actions / build

'SerializerConfiguration' implements 'ISerializationBuilder' so 'ISerializationBuilder' can be removed from the inheritance list. (https://rules.sonarsource.com/csharp/RSPEC-1939)
{
}
}
}
Loading
Loading