Skip to content
This repository has been archived by the owner on Aug 29, 2019. It is now read-only.

Commit

Permalink
All users to use NSB error queue for failures
Browse files Browse the repository at this point in the history
  • Loading branch information
andreasohlund committed May 20, 2019
1 parent 9a2c9b0 commit aa157a6
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 11 deletions.
3 changes: 3 additions & 0 deletions Demo.ASB/AsbConnectedFunction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ static AsbConnectedFunction()
endpoint = new FunctionsAwareServiceBusEndpoint(endpointName);

endpoint.Routing.RouteToEndpoint(typeof(SomeRoutedMessage), endpointName); //route to our self just to demo

//use NSB for poison message handling to not have failed messages go into the DLQ
endpoint.UseNServiceBusPoisonMessageHandling("error");
}

[FunctionName(endpointName)] // this is the "one function to all handlers for different messages" - A junction function
Expand Down
56 changes: 45 additions & 11 deletions NServiceBus.AzureFuntions/FunctionsAwareServiceBusEndpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,31 @@ public async Task Invoke(Message message, ILogger logger, IAsyncCollector<string

var instance = await GetEndpoint(logger, executionContext);

//TODO: right now the native retries are used, should we have an option to move to "our" error?
await instance.PushMessage(messageContext);

try
{
await instance.PushMessage(messageContext);
}
catch (Exception ex)
{
// TODO: is 4 the right value?
// TODO: Should we provide delayed retries as well?
if (moveFailedMessagesToError && message.SystemProperties.DeliveryCount > 4)
{
var errorContext = new ErrorContext(ex, headers, messageId, body, new TransportTransaction(), 0);

var result = await instance.PushError(errorContext);

if (result == ErrorHandleResult.RetryRequired)
{
throw;
}

return;
}

throw;
}
}

public async Task Invoke(HttpRequest request, string messageType, ILogger logger, IAsyncCollector<string> collector, ExecutionContext executionContext)
Expand Down Expand Up @@ -75,28 +98,38 @@ public async Task Invoke(HttpRequest request, string messageType, ILogger logger
await instance.PushMessage(messageContext);
}

public async Task Send<T>(T message, ILogger logger, ExecutionContext executionContext)
{
var instance = await GetEndpoint(logger, executionContext);

await instance.Send(message);
}

public void UseNServiceBusPoisonMessageHandling(string errorQueue)
{
endpointConfiguration.Recoverability().CustomPolicy((c, e) =>
{
return RecoverabilityAction.MoveToError(errorQueue);
});

moveFailedMessagesToError = true;
}

public void EnablePassThroughRoutingForUnknownMessages(Func<string, string> routingRule)
{
endpointConfiguration.Pipeline.Register(b=>
endpointConfiguration.Pipeline.Register(b =>
{
var registry = endpointConfiguration.GetSettings().Get<MessageMetadataRegistry>();

return new PassThroughBehavior(registry, routingRule);
}, "Forwards unknown messages to the configured destination");
}, "Forwards unknown messages to the configured destination");
}

void WarnAgainstMultipleHandlersForSameMessageType()
{
endpointConfiguration.Pipeline.Register(builder => new WarnAgainstMultipleHandlersForSameMessageTypeBehavior(builder.Build<MessageHandlerRegistry>()), "Warns against multiple handlers for same message type");
}

public async Task Send<T>(T message, ILogger logger, ExecutionContext executionContext)
{
var instance = await GetEndpoint(logger, executionContext);

await instance.Send(message);
}

async Task<IEndpointInstance> GetEndpoint(ILogger logger, ExecutionContext executionContext)
{
//TODO: locking or lazy
Expand Down Expand Up @@ -132,5 +165,6 @@ Task<IEndpointInstance> InitializeEndpoint(ILogger logger, ExecutionContext exec
EndpointConfiguration endpointConfiguration;
IEndpointInstance endpointInstance;
TransportExtensions<AzureServiceBusTransport> transport;
bool moveFailedMessagesToError;
}
}

0 comments on commit aa157a6

Please sign in to comment.