diff --git a/Demo.ASB/AsbConnectedFunction.cs b/Demo.ASB/AsbConnectedFunction.cs index 1b4b520..ab3cfc3 100644 --- a/Demo.ASB/AsbConnectedFunction.cs +++ b/Demo.ASB/AsbConnectedFunction.cs @@ -13,6 +13,9 @@ static AsbConnectedFunction() endpoint = new FunctionsAwareServiceBusEndpoint(endpointName); endpoint.Routing.RouteToEndpoint(typeof(SomeRoutedMessage), endpointName); //route to our self just to demo + + //use NSB for poison message handling to not have failed messages go into the DLQ + endpoint.UseNServiceBusPoisonMessageHandling("error"); } [FunctionName(endpointName)] // this is the "one function to all handlers for different messages" - A junction function diff --git a/NServiceBus.AzureFuntions/FunctionsAwareServiceBusEndpoint.cs b/NServiceBus.AzureFuntions/FunctionsAwareServiceBusEndpoint.cs index 2366024..4f92727 100644 --- a/NServiceBus.AzureFuntions/FunctionsAwareServiceBusEndpoint.cs +++ b/NServiceBus.AzureFuntions/FunctionsAwareServiceBusEndpoint.cs @@ -46,8 +46,31 @@ public async Task Invoke(Message message, ILogger logger, IAsyncCollector 4) + { + var errorContext = new ErrorContext(ex, headers, messageId, body, new TransportTransaction(), 0); + + var result = await instance.PushError(errorContext); + + if (result == ErrorHandleResult.RetryRequired) + { + throw; + } + + return; + } + + throw; + } } public async Task Invoke(HttpRequest request, string messageType, ILogger logger, IAsyncCollector collector, ExecutionContext executionContext) @@ -75,14 +98,31 @@ public async Task Invoke(HttpRequest request, string messageType, ILogger logger await instance.PushMessage(messageContext); } + public async Task Send(T message, ILogger logger, ExecutionContext executionContext) + { + var instance = await GetEndpoint(logger, executionContext); + + await instance.Send(message); + } + + public void UseNServiceBusPoisonMessageHandling(string errorQueue) + { + endpointConfiguration.Recoverability().CustomPolicy((c, e) => + { + return RecoverabilityAction.MoveToError(errorQueue); + }); + + moveFailedMessagesToError = true; + } + public void EnablePassThroughRoutingForUnknownMessages(Func routingRule) { - endpointConfiguration.Pipeline.Register(b=> + endpointConfiguration.Pipeline.Register(b => { var registry = endpointConfiguration.GetSettings().Get(); return new PassThroughBehavior(registry, routingRule); - }, "Forwards unknown messages to the configured destination"); + }, "Forwards unknown messages to the configured destination"); } void WarnAgainstMultipleHandlersForSameMessageType() @@ -90,13 +130,6 @@ void WarnAgainstMultipleHandlersForSameMessageType() endpointConfiguration.Pipeline.Register(builder => new WarnAgainstMultipleHandlersForSameMessageTypeBehavior(builder.Build()), "Warns against multiple handlers for same message type"); } - public async Task Send(T message, ILogger logger, ExecutionContext executionContext) - { - var instance = await GetEndpoint(logger, executionContext); - - await instance.Send(message); - } - async Task GetEndpoint(ILogger logger, ExecutionContext executionContext) { //TODO: locking or lazy @@ -132,5 +165,6 @@ Task InitializeEndpoint(ILogger logger, ExecutionContext exec EndpointConfiguration endpointConfiguration; IEndpointInstance endpointInstance; TransportExtensions transport; + bool moveFailedMessagesToError; } } \ No newline at end of file