Skip to content
This repository has been archived by the owner on Aug 29, 2019. It is now read-only.

Commit

Permalink
merge conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
yvesgoeleven committed May 17, 2019
2 parents 42ad4cf + 6869638 commit 4983a3a
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 6 deletions.
4 changes: 2 additions & 2 deletions AsqFunctionApp/DemoFunctions/AsqConnectedFunction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ static AsqConnectedFunction()
}

[FunctionName(endpointName)] //this is the "one function to all many handler for different messages"
public static Task Run([QueueTrigger(endpointName, Connection = "AzureWebJobsStorage")]CloudQueueMessage message,
[Queue("some-queue", Connection = "AzureWebJobsStorage")]IAsyncCollector<string> collector,
public static Task Run([QueueTrigger(endpointName, Connection = FunctionsConstants.StorageConnectionString)]CloudQueueMessage message,
[Queue("some-queue", Connection = FunctionsConstants.StorageConnectionString)]IAsyncCollector<string> collector,
ILogger logger,
ExecutionContext context)
{
Expand Down
8 changes: 7 additions & 1 deletion Demo.HttpApi/MyApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,21 @@
using Microsoft.Extensions.Logging;
using NServiceBus;
using NServiceBus.Logging;
using NServiceBus.Unicast.Subscriptions;

namespace Demo.HttpApi
{
public static class MyApi
{

static MyApi()
{
endpoint = new FunctionsAwareServiceBusEndpoint(endpointName);

endpoint.EnablePassThroughRoutingForUnknownMessages(messageType =>
{
//route everything to a backend function
return "my-api-backend";
});
}

[FunctionName(endpointName)] //this is the "one function to all many handler for different messages"
Expand Down
42 changes: 41 additions & 1 deletion NServiceBus.AzureFuntions/FunctionsAwareServiceBusEndpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
using NServiceBus.Transport;
using NServiceBus.Transport.AzureServiceBus;
using NServiceBus.Configuration.AdvancedExtensibility;
using NServiceBus.Pipeline;
using NServiceBus.Unicast.Messages;
using ExecutionContext = Microsoft.Azure.WebJobs.ExecutionContext;

namespace NServiceBus
Expand Down Expand Up @@ -70,6 +72,16 @@ public async Task Invoke(HttpRequest request, string messageType, ILogger logger
await instance.PushMessage(messageContext);
}

public void EnablePassThroughRoutingForUnknownMessages(Func<string, string> routingRule)
{
endpointConfiguration.Pipeline.Register(b=>
{
var registry = endpointConfiguration.GetSettings().Get<MessageMetadataRegistry>();

return new PassThroughBehavior(registry, routingRule);
}, "Forwards unknown messages to the configured destination");
}

public async Task Send<T>(T message, ILogger logger, ExecutionContext executionContext)
{
var instance = await GetEndpoint(logger, executionContext);
Expand Down Expand Up @@ -101,7 +113,7 @@ Task<IEndpointInstance> InitializeEndpoint(ILogger logger, ExecutionContext exec
.AddEnvironmentVariables()
.Build();

transport.ConnectionString(configuration["my-sb-connstring"]);
transport.ConnectionString(configuration[FunctionsConstants.ConnectionString]);


return Endpoint.Start(endpointConfiguration);
Expand All @@ -112,5 +124,33 @@ Task<IEndpointInstance> InitializeEndpoint(ILogger logger, ExecutionContext exec
EndpointConfiguration endpointConfiguration;
IEndpointInstance endpointInstance;
TransportExtensions<AzureServiceBusTransport> transport;

private Func<string, string> passThroughRoutingRule;
}

public class PassThroughBehavior : Behavior<IIncomingPhysicalMessageContext>
{
private readonly MessageMetadataRegistry messageMetadataRegistry;
private readonly Func<string, string> passThroughRoutingRule;

public PassThroughBehavior(MessageMetadataRegistry messageMetadataRegistry, Func<string,string> passThroughRoutingRule)
{
this.messageMetadataRegistry = messageMetadataRegistry;
this.passThroughRoutingRule = passThroughRoutingRule;
}

public override Task Invoke(IIncomingPhysicalMessageContext context, Func<Task> next)
{
var messageType = context.MessageHeaders[Headers.EnclosedMessageTypes];
var messageMetadata = messageMetadataRegistry.GetMessageMetadata(messageType);

if (messageMetadata == null)
{
var destination = passThroughRoutingRule(messageType);
return context.ForwardCurrentMessageTo(destination);
}

return next();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ Task<IEndpointInstance> InitializeEndpoint(ILogger logger, ExecutionContext exec
.AddEnvironmentVariables()
.Build();

transport.ConnectionString(configuration["AzureWebJobsStorage"]);
transport.ConnectionString(configuration[FunctionsConstants.StorageConnectionString]);

var instance = Endpoint.Start(endpointConfiguration);

Expand Down
3 changes: 2 additions & 1 deletion NServiceBus.AzureFuntions/FunctionsConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
{
public static class FunctionsConstants
{
public const string ConnectionString = "my-sb-connstring";
public const string ConnectionString = "NServiceBus.ConnectionString";
public const string StorageConnectionString = "NServiceBus.Storage.ConnectionString";
}
}

0 comments on commit 4983a3a

Please sign in to comment.