Skip to content

Commit

Permalink
Resolve HybridMessageSerializer dependencies from IServiceProvider
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Pringle <richard.pringle@gmail.com>
  • Loading branch information
Richard Pringle committed Apr 4, 2024
1 parent f061530 commit f29f09e
Show file tree
Hide file tree
Showing 12 changed files with 351 additions and 36 deletions.
27 changes: 11 additions & 16 deletions src/Samples/Sample.Serialization.ConsoleApp/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
using SlimMessageBus.Host;
using SlimMessageBus.Host.Memory;
using SlimMessageBus.Host.Redis;
using SlimMessageBus.Host.Serialization;
using SlimMessageBus.Host.Serialization.Avro;
using SlimMessageBus.Host.Serialization.Hybrid;
using SlimMessageBus.Host.Serialization.Json;
Expand All @@ -27,7 +26,7 @@ enum Provider

/// <summary>
/// This sample shows:
/// 1. How tu use the Avro serializer (for contract Avro IDL first apprach to generate C# code)
/// 1. How tu use the Avro serializer (for contract Avro IDL first approach to generate C# code)
/// 2. How to combine two serializer approaches in one app (using the Hybrid serializer).
/// </summary>
class Program
Expand All @@ -40,17 +39,11 @@ static async Task Main(string[] args) => await Host.CreateDefaultBuilder(args)

services.AddHostedService<MainProgram>();

// alternatively a simpler approach, but using the slower ReflectionMessageCreationStategy and ReflectionSchemaLookupStrategy
var avroSerializer = new AvroMessageSerializer();

// Avro serialized using the AvroConvert library - no schema generation neeeded upfront.
var jsonSerializer = new JsonMessageSerializer();

services
.AddSlimMessageBus(mbb =>
{
// Note: remember that Memory provider does not support req-resp yet.
var provider = Provider.Redis;
var provider = Provider.Memory;

/*
var sl = new DictionarySchemaLookupStrategy();
Expand All @@ -59,7 +52,7 @@ static async Task Main(string[] args) => await Host.CreateDefaultBuilder(args)
sl.Add(typeof(MultiplyRequest), MultiplyRequest._SCHEMA);
sl.Add(typeof(MultiplyResponse), MultiplyResponse._SCHEMA);
var mf = new DictionaryMessageCreationStategy();
var mf = new DictionaryMessageCreationStrategy();
/// register all your types
mf.Add(typeof(AddCommand), () => new AddCommand());
mf.Add(typeof(MultiplyRequest), () => new MultiplyRequest());
Expand All @@ -72,12 +65,14 @@ static async Task Main(string[] args) => await Host.CreateDefaultBuilder(args)
mbb
.AddServicesFromAssemblyContaining<AddCommandConsumer>()
// Note: Certain messages will be serialized by one Avro serializer, other using the Json serializer
.AddHybridSerializer(new Dictionary<IMessageSerializer, Type[]>
.AddAvroSerializer()
.AddJsonSerializer()
// Include AddHybridSerializer after other serializers so that the DI container can be updated
.AddHybridSerializer<JsonMessageSerializer>(o =>
{
[jsonSerializer] = new[] { typeof(SubtractCommand) }, // the first one will be the default serializer, no need to declare types here
[avroSerializer] = new[] { typeof(AddCommand), typeof(MultiplyRequest), typeof(MultiplyResponse) },
}, defaultMessageSerializer: jsonSerializer)

//o.Add<JsonMessageSerializer>(typeof(SubtractCommand)); // can also be omitted as JsonMessageSerializer is the default
o.Add<AvroMessageSerializer>(typeof(AddCommand), typeof(MultiplyRequest), typeof(MultiplyResponse));
})
.Produce<AddCommand>(x => x.DefaultTopic("AddCommand"))
.Consume<AddCommand>(x => x.Topic("AddCommand").WithConsumer<AddCommandConsumer>())

Expand Down Expand Up @@ -221,7 +216,7 @@ public class SubtractCommandConsumer : IConsumer<SubtractCommand>
{
public async Task OnHandle(SubtractCommand message)
{
Console.WriteLine("Consumer: Subracting {0} and {1} gives {2}", message.Left, message.Right, message.Left - message.Right);
Console.WriteLine("Consumer: Subtracting {0} and {1} gives {2}", message.Left, message.Right, message.Left - message.Right);
await Task.Delay(50); // Simulate some work
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public object Deserialize(Type t, byte[] payload)
var writerSchema = WriteSchemaLookup(t);
AssertSchemaNotNull(t, writerSchema, true);

_logger.LogDebug("Type {0} writer schema: {1}, reader schema: {2}", t, writerSchema, readerSchema);
_logger.LogDebug("Type {MessageType} writer schema: {WriterSchema}, reader schema: {ReaderSchema}", t, writerSchema, readerSchema);

var reader = new SpecificDefaultReader(writerSchema, readerSchema);
reader.Read(message, dec);
Expand All @@ -108,7 +108,7 @@ public byte[] Serialize(Type t, object message)
var writerSchema = WriteSchemaLookup(t);
AssertSchemaNotNull(t, writerSchema, true);

_logger.LogDebug("Type {0} writer schema: {1}", t, writerSchema);
_logger.LogDebug("Type {MessageType} writer schema: {WriterSchema}", t, writerSchema);

var writer = new SpecificDefaultWriter(writerSchema); // Schema comes from pre-compiled, code-gen phase
writer.Write(message, enc);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@
public class HybridMessageSerializer : IMessageSerializer
{
private readonly ILogger _logger;
private readonly IList<IMessageSerializer> _serializers = new List<IMessageSerializer>();
private readonly IDictionary<Type, IMessageSerializer> _serializerByType = new Dictionary<Type, IMessageSerializer>();
private readonly Dictionary<Type, IMessageSerializer> _serializerByType = [];

public IMessageSerializer DefaultSerializer { get; set; }

internal IReadOnlyDictionary<Type, IMessageSerializer> SerializerByType => _serializerByType;

public HybridMessageSerializer(ILogger<HybridMessageSerializer> logger, IDictionary<IMessageSerializer, Type[]> registration, IMessageSerializer defaultMessageSerializer = null)
{
_logger = logger;
Expand All @@ -24,12 +26,14 @@ public HybridMessageSerializer(ILogger<HybridMessageSerializer> logger, IDiction

public void Add(IMessageSerializer serializer, params Type[] supportedTypes)
{
if (_serializers.Count == 0 && DefaultSerializer == null)
{
DefaultSerializer = serializer;
}
#if NETSTANDARD2_0
if (serializer is null) throw new ArgumentNullException(nameof(serializer));
#else
ArgumentNullException.ThrowIfNull(serializer);
#endif

DefaultSerializer ??= serializer;

_serializers.Add(serializer);
foreach (var type in supportedTypes)
{
_serializerByType.Add(type, serializer);
Expand All @@ -38,19 +42,19 @@ public void Add(IMessageSerializer serializer, params Type[] supportedTypes)

protected virtual IMessageSerializer MatchSerializer(Type t)
{
if (_serializers.Count == 0)
{
throw new InvalidOperationException("No serializers registered.");
}

if (!_serializerByType.TryGetValue(t, out var serializer))
{
// use first as default
_logger.LogTrace("Serializer for type {0} not registered, will use default serializer", t);
_logger.LogTrace("Serializer for type {MessageType} not registered, will use default serializer", t);

if (DefaultSerializer == null)
{
throw new InvalidOperationException("No serializers registered.");
}

serializer = DefaultSerializer;
}

_logger.LogDebug("Serializer for type {0} will be {1}", t, serializer);
_logger.LogDebug("Serializer for type {MessageType} will be {Serializer}", t, serializer);
return serializer;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
namespace SlimMessageBus.Host.Serialization.Hybrid;

using System.Collections.Concurrent;

using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Logging;
Expand All @@ -24,4 +26,50 @@ public static MessageBusBuilder AddHybridSerializer(this MessageBusBuilder mbb,
});
return mbb;
}

/// <summary>
/// Registers the <see cref="IMessageSerializer"/> with implementation as <see cref="HybridMessageSerializer"/> using serializers as registered in the <see cref="IServiceCollection"/>.
/// </summary>
/// <param name="mbb"><see cref="MessageBusBuilder"/></param>
/// <param name="registration">Action to register serializers for dependency injection resolution.</param>
/// <param name="defaultMessageSerializer">The default serializer to be used when the message type cannot be matched</param>
/// <returns><see cref="MessageBusBuilder"/></returns>
public static MessageBusBuilder AddHybridSerializer<TDefaultSerializer>(this MessageBusBuilder mbb, Action<HybridSerializerOptionsBuilder> registration)
where TDefaultSerializer : class, IMessageSerializer
{
mbb.PostConfigurationActions.Add(services =>
{
services.RemoveAll(typeof(IMessageSerializer));
services.TryAddSingleton(svp =>
{
var builder = new HybridSerializerOptionsBuilder();
registration(builder);

var registrations = builder.Registrations.ToDictionary(x => (IMessageSerializer)svp.GetRequiredService(x.Key), x => x.Value.ToArray());
var defaultMessageSerializer = svp.GetRequiredService<TDefaultSerializer>();
return new HybridMessageSerializer(svp.GetRequiredService<ILogger<HybridMessageSerializer>>(), registrations, defaultMessageSerializer);
});

services.TryAddSingleton<IMessageSerializer>(svp => svp.GetRequiredService<HybridMessageSerializer>());
});
return mbb;
}

public sealed class HybridSerializerOptionsBuilder
{
internal ConcurrentDictionary<Type, List<Type>> Registrations { get; } = new();

public HybridSerializerOptionsBuilder Add<TMessageSerializer>(params Type[] types)
where TMessageSerializer : IMessageSerializer
{
if (types.Length == 0)
{
return this;
}

Registrations.GetOrAdd(typeof(TMessageSerializer), _ => []).AddRange(types);

return this;
}
}
}
28 changes: 27 additions & 1 deletion src/SlimMessageBus.Host.Serialization.Hybrid/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,30 @@
# What

Message serialization that based on message type delegates to the respective serializer.
Message serialization that is based on message type, delegating serialization to the respective serializer.

```c#

services
.AddSlimMessageBus(mbb =>
{
mbb
// add required serializers to the DI container first
.AddAvroSerializer()
.AddGoogleProtobufSerializer()
.AddJsonSerializer()

// Add hybrid serializer last so that it can update DI container and set itself as the default
.AddHybridSerializer<JsonMessageSerializer>(
o => {

// Message1, Message2 => AvroSerializer
// Message3 => GoogleProtobufMessageSerializer
// all other messages by JsonMessageSerializer
o.Add<AvroSerializer>(typeof(Message1), typeof(Message2));
o.Add<GoogleProtobufMessageSerializer>(typeof(Message3));
})
...
}

```
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,8 @@
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="3.1.0" />
</ItemGroup>

<ItemGroup>
<InternalsVisibleTo Include="SlimMessageBus.Host.Serialization.Hybrid.Test" />
</ItemGroup>

</Project>
15 changes: 13 additions & 2 deletions src/SlimMessageBus.sln
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,11 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SlimMessageBus.Host.RabbitM
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SlimMessageBus.Host.RabbitMQ.Test", "Tests\SlimMessageBus.Host.RabbitMQ.Test\SlimMessageBus.Host.RabbitMQ.Test.csproj", "{F5373E1D-A2B4-46CC-9B07-94F6655C8E29}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SlimMessageBus.Host.Sql", "SlimMessageBus.Host.Sql\SlimMessageBus.Host.Sql.csproj", "{5EED0E89-2475-40E0-81EF-0F05C9326612}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SlimMessageBus.Host.Sql", "SlimMessageBus.Host.Sql\SlimMessageBus.Host.Sql.csproj", "{5EED0E89-2475-40E0-81EF-0F05C9326612}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SlimMessageBus.Host.Sql.Common", "SlimMessageBus.Host.Sql.Common\SlimMessageBus.Host.Sql.Common.csproj", "{F19B7A21-7749-465A-8810-4C274A9E8956}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SlimMessageBus.Host.Sql.Common", "SlimMessageBus.Host.Sql.Common\SlimMessageBus.Host.Sql.Common.csproj", "{F19B7A21-7749-465A-8810-4C274A9E8956}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SlimMessageBus.Host.Serialization.Hybrid.Test", "Tests\SlimMessageBus.Host.Serialization.Hybrid.Test\SlimMessageBus.Host.Serialization.Hybrid.Test.csproj", "{DB624D5F-CB7C-4E16-B1E2-3B368FCB5A46}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Expand Down Expand Up @@ -730,6 +732,14 @@ Global
{F19B7A21-7749-465A-8810-4C274A9E8956}.Release|Any CPU.Build.0 = Release|Any CPU
{F19B7A21-7749-465A-8810-4C274A9E8956}.Release|x86.ActiveCfg = Release|Any CPU
{F19B7A21-7749-465A-8810-4C274A9E8956}.Release|x86.Build.0 = Release|Any CPU
{DB624D5F-CB7C-4E16-B1E2-3B368FCB5A46}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{DB624D5F-CB7C-4E16-B1E2-3B368FCB5A46}.Debug|Any CPU.Build.0 = Debug|Any CPU
{DB624D5F-CB7C-4E16-B1E2-3B368FCB5A46}.Debug|x86.ActiveCfg = Debug|Any CPU
{DB624D5F-CB7C-4E16-B1E2-3B368FCB5A46}.Debug|x86.Build.0 = Debug|Any CPU
{DB624D5F-CB7C-4E16-B1E2-3B368FCB5A46}.Release|Any CPU.ActiveCfg = Release|Any CPU
{DB624D5F-CB7C-4E16-B1E2-3B368FCB5A46}.Release|Any CPU.Build.0 = Release|Any CPU
{DB624D5F-CB7C-4E16-B1E2-3B368FCB5A46}.Release|x86.ActiveCfg = Release|Any CPU
{DB624D5F-CB7C-4E16-B1E2-3B368FCB5A46}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -803,6 +813,7 @@ Global
{F5373E1D-A2B4-46CC-9B07-94F6655C8E29} = {9F005B5C-A856-4351-8C0C-47A8B785C637}
{5EED0E89-2475-40E0-81EF-0F05C9326612} = {9291D340-B4FA-44A3-8060-C14743FB1712}
{F19B7A21-7749-465A-8810-4C274A9E8956} = {9291D340-B4FA-44A3-8060-C14743FB1712}
{DB624D5F-CB7C-4E16-B1E2-3B368FCB5A46} = {9F005B5C-A856-4351-8C0C-47A8B785C637}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {435A0D65-610C-4B84-B1AA-2C7FBE72DB80}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace SlimMessageBus.Host.Serialization.Hybrid.Test.Helpers
{
public record SampleOne;
public record SampleTwo;
public record SampleThree;
}
Loading

0 comments on commit f29f09e

Please sign in to comment.