Skip to content

Commit

Permalink
zarusz#271 Allow for publishing of messages to a request/response que…
Browse files Browse the repository at this point in the history
…ue. Drop response as there is no listener.
  • Loading branch information
EtherZa committed Jun 25, 2024
1 parent 89b36cd commit f68c490
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 6 deletions.
13 changes: 7 additions & 6 deletions src/SlimMessageBus.Host/MessageBusBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -610,20 +610,21 @@ public virtual Task ProduceResponse(string requestId, object request, IReadOnlyD
if (consumerInvoker == null) throw new ArgumentNullException(nameof(consumerInvoker));

var responseType = consumerInvoker.ParentSettings.ResponseType;
if (!requestHeaders.TryGetHeader(ReqRespMessageHeaders.ReplyTo, out object replyTo))
{
_logger.LogDebug($$"""Skipping sending response {Response} of type {MessageType} as the header {{ReqRespMessageHeaders.ReplyTo}} is missing for RequestId: {RequestId}""", response, responseType, requestId);
return Task.CompletedTask;
}

_logger.LogDebug("Sending the response {Response} of type {MessageType} for RequestId: {RequestId}...", response, responseType, requestId);


var responseHeaders = CreateHeaders();
responseHeaders.SetHeader(ReqRespMessageHeaders.RequestId, requestId);
if (responseException != null)
{
responseHeaders.SetHeader(ReqRespMessageHeaders.Error, responseException.Message);
}

if (!requestHeaders.TryGetHeader(ReqRespMessageHeaders.ReplyTo, out object replyTo))
{
throw new MessageBusException($"The header {ReqRespMessageHeaders.ReplyTo} was missing on the message");
}

_headerService.AddMessageTypeHeader(response, responseHeaders);

return ProduceToTransport(response, responseType, (string)replyTo, responseHeaders, null);
Expand Down
51 changes: 51 additions & 0 deletions src/Tests/SlimMessageBus.Host.Test/MessageBusBaseTests.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
namespace SlimMessageBus.Host.Test;

using System.Threading;

using Moq.Protected;

using SlimMessageBus.Host.Test.Common;

public class MessageBusBaseTests : IDisposable
Expand Down Expand Up @@ -655,4 +660,50 @@ public async Task When_Stop_Given_ConcurrentCalls_Then_ItOnlyStopsConsumersOnce(
bus._startedCount.Should().Be(1);
bus._stoppedCount.Should().Be(1);
}

public class ProduceResponseTests
{
[Fact]
public async Task WhenCalled_Given_NoReplyToHeader_DoNothing()
{
// arrange
var requestId = "req-123";
var request = new object();
var response = new object();

object value;
var mockRequestHeaders = new Mock<IReadOnlyDictionary<string, object>>();
mockRequestHeaders.Setup(x => x.TryGetValue(ReqRespMessageHeaders.ReplyTo, out value)).Returns(false).Verifiable(Times.Once);

var mockMessageTypeResolver = new Mock<IMessageTypeResolver>();

var mockServiceProvider = new Mock<IServiceProvider>();
mockServiceProvider.Setup(x => x.GetService(typeof(IMessageTypeResolver))).Returns(mockMessageTypeResolver.Object);

var mockMessageTypeConsumerInvokerSettings = new Mock<IMessageTypeConsumerInvokerSettings>();
mockMessageTypeConsumerInvokerSettings.SetupGet(x => x.ParentSettings).Returns(() => new ConsumerSettings() { ResponseType = response.GetType() });

var settings = new MessageBusSettings { ServiceProvider = mockServiceProvider.Object };

var mockMessageBus = new Mock<MessageBusBase>(settings) { CallBase = true };
mockMessageBus.Protected().Setup<Task<(IReadOnlyCollection<Envelope> Dispatched, Exception Exception)>>(
"ProduceToTransport",
[typeof(Envelope)],
false,
ItExpr.IsAny<IReadOnlyCollection<Envelope>>(),
ItExpr.IsAny<string>(),
ItExpr.IsAny<IMessageBusTarget>(),
ItExpr.IsAny<CancellationToken>())
.Verifiable(Times.Never);

var target = mockMessageBus.Object;

// act
await target.ProduceResponse(requestId, request, mockRequestHeaders.Object, response, null, mockMessageTypeConsumerInvokerSettings.Object);

// assert
mockRequestHeaders.VerifyAll();
mockMessageBus.VerifyAll();
}
}
}

0 comments on commit f68c490

Please sign in to comment.