Skip to content

Commit

Permalink
#238 Resolve HybridMessageSerializer dependencies with IServiceProvider
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Pringle <richardpringle@gmail.com>
  • Loading branch information
EtherZa authored and zarusz committed Apr 8, 2024
1 parent 1c1f84d commit 30d9177
Show file tree
Hide file tree
Showing 19 changed files with 436 additions and 100 deletions.
36 changes: 21 additions & 15 deletions docs/serialization.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,21 +175,27 @@ The Hybrid plugin allows to have multiple serialization formats on one message b
To use it install the nuget package `SlimMessageBus.Host.Serialization.Hybrid` and then configure the bus:

```cs
services.AddSlimMessageBus(mbb =>
{
// serializer 1
var avroSerializer = new AvroMessageSerializer();

// serializer 2
var jsonSerializer = new JsonMessageSerializer();

// Note: Certain messages will be serialized by the Avro serializer, other using the Json serializer
mbb.AddHybridSerializer(new Dictionary<IMessageSerializer, Type[]>
{
[jsonSerializer] = new[] { typeof(SubtractCommand) }, // the first one will be the default serializer, no need to declare types here
[avroSerializer] = new[] { typeof(AddCommand), typeof(MultiplyRequest), typeof(MultiplyResponse) },
}, defaultMessageSerializer: jsonSerializer);
});
services
.AddSlimMessageBus(mbb =>
{
mbb
.AddHybridSerializer(
builder => {
builder
.AsDefault()
.AddJsonSerializer();

builder
.For(typeof(Message1), typeof(Message2))
.AddAvroSerializer();

builder
.For(typeof(Message3))
.AddGoogleProtobufSerializer();
})
...
}
```

The routing to the proper serializer happens based on message type. When a type cannot be matched the default serializer will be used.
The routing to the proper serializer happens based on message type. When a type cannot be matched the default serializer will be used.
42 changes: 12 additions & 30 deletions src/Samples/Sample.Serialization.ConsoleApp/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,44 +39,26 @@ static async Task Main(string[] args) => await Host.CreateDefaultBuilder(args)
Secrets.Load(@"..\..\..\..\..\secrets.txt");

services.AddHostedService<MainProgram>();

// alternatively a simpler approach, but using the slower ReflectionMessageCreationStategy and ReflectionSchemaLookupStrategy
var avroSerializer = new AvroMessageSerializer();

// Avro serialized using the AvroConvert library - no schema generation neeeded upfront.
var jsonSerializer = new JsonMessageSerializer();

services
.AddSlimMessageBus(mbb =>
{
// Note: remember that Memory provider does not support req-resp yet.
var provider = Provider.Redis;

/*
var sl = new DictionarySchemaLookupStrategy();
/// register all your types
sl.Add(typeof(AddCommand), AddCommand._SCHEMA);
sl.Add(typeof(MultiplyRequest), MultiplyRequest._SCHEMA);
sl.Add(typeof(MultiplyResponse), MultiplyResponse._SCHEMA);
var mf = new DictionaryMessageCreationStategy();
/// register all your types
mf.Add(typeof(AddCommand), () => new AddCommand());
mf.Add(typeof(MultiplyRequest), () => new MultiplyRequest());
mf.Add(typeof(MultiplyResponse), () => new MultiplyResponse());
// longer approach, but should be faster as it's not using reflection
var avroSerializer = new AvroMessageSerializer(mf, sl);
*/
var provider = Provider.Memory;

mbb
.AddServicesFromAssemblyContaining<AddCommandConsumer>()
// Note: Certain messages will be serialized by one Avro serializer, other using the Json serializer
.AddHybridSerializer(new Dictionary<IMessageSerializer, Type[]>

// Note: Certain messages will be serialized by the Avro serializer, others will fall back to the Json serializer (the default)
.AddHybridSerializer(builder =>
{
[jsonSerializer] = new[] { typeof(SubtractCommand) }, // the first one will be the default serializer, no need to declare types here
[avroSerializer] = new[] { typeof(AddCommand), typeof(MultiplyRequest), typeof(MultiplyResponse) },
}, defaultMessageSerializer: jsonSerializer)
builder
.AsDefault()
.AddJsonSerializer();

builder
.For(typeof(AddCommand), typeof(MultiplyRequest), typeof(MultiplyResponse))
.AddAvroSerializer();
})

.Produce<AddCommand>(x => x.DefaultTopic("AddCommand"))
.Consume<AddCommand>(x => x.Topic("AddCommand").WithConsumer<AddCommandConsumer>())
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
namespace SlimMessageBus.Host;

public interface ISerializationBuilder : IHasPostConfigurationActions
public interface ISerializationBuilder
{
void RegisterSerializer<TMessageSerializer>(Action<IServiceCollection> services) where TMessageSerializer : class, IMessageSerializer;
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
namespace SlimMessageBus.Host;

public class MessageBusBuilder : ISerializationBuilder
using Microsoft.Extensions.DependencyInjection.Extensions;

public class MessageBusBuilder : IHasPostConfigurationActions, ISerializationBuilder
{
/// <summary>
/// Parent bus builder.
Expand Down Expand Up @@ -224,6 +226,13 @@ public MessageBusBuilder WithSerializer(Type serializerType)
return this;
}

public void RegisterSerializer<TMessageSerializer>(Action<IServiceCollection> services)
where TMessageSerializer : class, IMessageSerializer
{
PostConfigurationActions.Add(services);
PostConfigurationActions.Add(services => services.TryAddSingleton<IMessageSerializer>(sp => sp.GetRequiredService<TMessageSerializer>()));
}

public MessageBusBuilder WithDependencyResolver(IServiceProvider serviceProvider)
{
Settings.ServiceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
Expand Down Expand Up @@ -264,7 +273,7 @@ public MessageBusBuilder WithMessageTypeResolver(Type messageTypeResolverType)
public MessageBusBuilder WithMessageTypeResolver<T>() => WithMessageTypeResolver(typeof(T));

/// <summary>
/// Hook called whenver message is being produced. Can be used to change message headers.
/// Hook called whenever message is being produced. Can be used to change message headers.
/// </summary>
/// <param name="executePrevious">Should the previously set modifier be executed as well?</param>
public MessageBusBuilder WithHeaderModifier(MessageHeaderModifier<object> headerModifier, bool executePrevious = true)
Expand Down Expand Up @@ -338,5 +347,5 @@ public IMessageBusProvider Build()
throw new ConfigurationMessageBusException($"{busName}The bus provider was not configured. Check the MessageBus configuration and ensure the has the '.WithProviderXxx()' setting for one of the available transports.");
}
return BusFactory(Settings);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ public static class SerializationBuilderExtensions
public static TBuilder AddAvroSerializer<TBuilder>(this TBuilder builder, IMessageCreationStrategy messageCreationStrategy, ISchemaLookupStrategy schemaLookupStrategy)
where TBuilder : ISerializationBuilder
{
builder.PostConfigurationActions.Add(services =>
builder.RegisterSerializer<AvroMessageSerializer>(services =>
{
services.TryAddSingleton(svp => new AvroMessageSerializer(svp.GetRequiredService<ILoggerFactory>(), messageCreationStrategy, schemaLookupStrategy));
services.TryAddSingleton<IMessageSerializer>(svp => svp.GetRequiredService<AvroMessageSerializer>());
});
return builder;
}
Expand All @@ -35,10 +34,9 @@ public static TBuilder AddAvroSerializer<TBuilder>(this TBuilder builder, IMessa
public static TBuilder AddAvroSerializer<TBuilder>(this TBuilder builder)
where TBuilder : ISerializationBuilder
{
builder.PostConfigurationActions.Add(services =>
builder.RegisterSerializer<AvroMessageSerializer>(services =>
{
services.TryAddSingleton(svp => new AvroMessageSerializer(svp.GetRequiredService<ILoggerFactory>()));
services.TryAddSingleton<IMessageSerializer>(svp => svp.GetRequiredService<AvroMessageSerializer>());
});
return builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@ public static class SerializationBuilderExtensions
public static TBuilder AddGoogleProtobufSerializer<TBuilder>(this TBuilder builder, IMessageParserFactory messageParserFactory = null)
where TBuilder : ISerializationBuilder
{
builder.PostConfigurationActions.Add(services =>
builder.RegisterSerializer<GoogleProtobufMessageSerializer>(services =>
{
services.TryAddSingleton(svp => new GoogleProtobufMessageSerializer(svp.GetRequiredService<ILoggerFactory>(), messageParserFactory));
services.TryAddSingleton<IMessageSerializer>(svp => svp.GetRequiredService<GoogleProtobufMessageSerializer>());
});
return builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@
public class HybridMessageSerializer : IMessageSerializer
{
private readonly ILogger _logger;
private readonly IList<IMessageSerializer> _serializers = new List<IMessageSerializer>();
private readonly IDictionary<Type, IMessageSerializer> _serializerByType = new Dictionary<Type, IMessageSerializer>();
private readonly Dictionary<Type, IMessageSerializer> _serializerByType = [];

public IMessageSerializer DefaultSerializer { get; set; }

internal IReadOnlyDictionary<Type, IMessageSerializer> SerializerByType => _serializerByType;

public HybridMessageSerializer(ILogger<HybridMessageSerializer> logger, IDictionary<IMessageSerializer, Type[]> registration, IMessageSerializer defaultMessageSerializer = null)
{
_logger = logger;
Expand All @@ -24,12 +26,14 @@ public HybridMessageSerializer(ILogger<HybridMessageSerializer> logger, IDiction

public void Add(IMessageSerializer serializer, params Type[] supportedTypes)
{
if (_serializers.Count == 0 && DefaultSerializer == null)
{
DefaultSerializer = serializer;
}
#if NETSTANDARD2_0
if (serializer is null) throw new ArgumentNullException(nameof(serializer));
#else
ArgumentNullException.ThrowIfNull(serializer);
#endif

DefaultSerializer ??= serializer;

_serializers.Add(serializer);
foreach (var type in supportedTypes)
{
_serializerByType.Add(type, serializer);
Expand All @@ -38,19 +42,19 @@ public void Add(IMessageSerializer serializer, params Type[] supportedTypes)

protected virtual IMessageSerializer MatchSerializer(Type t)
{
if (_serializers.Count == 0)
{
throw new InvalidOperationException("No serializers registered.");
}

if (!_serializerByType.TryGetValue(t, out var serializer))
{
// use first as default
_logger.LogTrace("Serializer for type {0} not registered, will use default serializer", t);
_logger.LogTrace("Serializer for type {MessageType} not registered, will use default serializer", t);

if (DefaultSerializer == null)
{
throw new InvalidOperationException("No serializers registered.");
}

serializer = DefaultSerializer;
}

_logger.LogDebug("Serializer for type {0} will be {1}", t, serializer);
_logger.LogDebug("Serializer for type {MessageType} will be {Serializer}", t, serializer);
return serializer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,120 @@ public static class SerializationBuilderExtensions
public static TBuilder AddHybridSerializer<TBuilder>(this TBuilder builder, IDictionary<IMessageSerializer, Type[]> registration, IMessageSerializer defaultMessageSerializer)
where TBuilder : ISerializationBuilder
{
builder.PostConfigurationActions.Add(services =>
builder.RegisterSerializer<HybridMessageSerializer>(services =>
{
services.TryAddSingleton(svp => new HybridMessageSerializer(svp.GetRequiredService<ILogger<HybridMessageSerializer>>(), registration, defaultMessageSerializer));
services.TryAddSingleton<IMessageSerializer>(svp => svp.GetRequiredService<HybridMessageSerializer>());
});
return builder;
}

/// <summary>
/// Registers the <see cref="IMessageSerializer"/> with implementation as <see cref="HybridMessageSerializer"/> using serializers as registered in the <see cref="IServiceCollection"/>.
/// </summary>
/// <param name="mbb"><see cref="MessageBusBuilder"/></param>
/// <param name="registration">Action to register serializers for dependency injection resolution.</param>
/// <returns><see cref="MessageBusBuilder"/></returns>
public static MessageBusBuilder AddHybridSerializer(this MessageBusBuilder mbb, Action<HybridSerializerOptionsBuilder> registration)
{
var builder = new HybridSerializerOptionsBuilder();
registration(builder);

foreach (var action in builder.ServiceRegistrations)
{
mbb.PostConfigurationActions.Add(action);
}

mbb.PostConfigurationActions.Add(services =>
{
services.TryAddSingleton(svp =>
{
if (services.Count(x => x.ServiceType == typeof(IMessageSerializer)) > 1)
{
throw new NotSupportedException($"Registering instances of {nameof(IMessageSerializer)} outside of {nameof(AddHybridSerializer)} is not supported.");
}

var defaultMessageSerializer = builder.DefaultSerializer != null ? (IMessageSerializer)svp.GetRequiredService(builder.DefaultSerializer) : null;
var typeRegistrations = builder.TypeRegistrations.ToDictionary(x => (IMessageSerializer)svp.GetRequiredService(x.Key), x => x.Value);
return new HybridMessageSerializer(svp.GetRequiredService<ILogger<HybridMessageSerializer>>(), typeRegistrations, defaultMessageSerializer);
});

services.AddSingleton<IMessageSerializer>(svp => svp.GetRequiredService<HybridMessageSerializer>());
});
return mbb;
}

public sealed class HybridSerializerOptionsBuilder
{
private readonly List<SerializerConfiguration> _configurations = [];

public Type DefaultSerializer
{
get
{
return _configurations
.OfType<DefaultSerializerConfiguration>()
.LastOrDefault(x => x.IsValid)
.Type;
}
}

public IReadOnlyList<Action<IServiceCollection>> ServiceRegistrations
{
get
{
return _configurations
.Where(x => x.IsValid)
.Select(x => x.Action)
.ToList();
}
}

public IReadOnlyDictionary<Type, Type[]> TypeRegistrations
{
get
{
return _configurations
.OfType<ForSerializerConfiguration>()
.Where(x => x.IsValid)
.ToDictionary(x => x.Type, x => x.Types);
}
}

public ISerializationBuilder AsDefault()
{
var configuration = new DefaultSerializerConfiguration();
this._configurations.Add(configuration);
return configuration;
}

public ISerializationBuilder For(params Type[] types)
{
var configuration = new ForSerializerConfiguration(types);
this._configurations.Add(configuration);
return configuration;
}

public abstract class SerializerConfiguration : ISerializationBuilder
{
public Action<IServiceCollection> Action { get; private set; }
public bool IsValid => Type != null;
public Type Type { get; private set; } = null;

public void RegisterSerializer<TMessageSerializer>(Action<IServiceCollection> services)
where TMessageSerializer : class, IMessageSerializer
{
Type = typeof(TMessageSerializer);
Action = services;
}
}

public class ForSerializerConfiguration(Type[] types) : SerializerConfiguration, ISerializationBuilder
{
public Type[] Types { get; } = types;
}

public class DefaultSerializerConfiguration : SerializerConfiguration, ISerializationBuilder
{
}
}
}
Loading

0 comments on commit 30d9177

Please sign in to comment.