Skip to content

Commit

Permalink
Add C# server-side interceptor machinery
Browse files Browse the repository at this point in the history
  • Loading branch information
mehrdada committed Feb 22, 2018
1 parent b8e3624 commit 4df68ae
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 8 deletions.
42 changes: 34 additions & 8 deletions src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core.Interceptors;
using Grpc.Core.Internal;
using Grpc.Core.Logging;
using Grpc.Core.Utils;
Expand All @@ -32,7 +33,12 @@ internal interface IServerCallHandler
Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq);
}

internal class UnaryServerCallHandler<TRequest, TResponse> : IServerCallHandler
internal interface IInterceptableCallHandler
{
IServerCallHandler Intercept(Interceptor interceptor);
}

internal class UnaryServerCallHandler<TRequest, TResponse> : IServerCallHandler, IInterceptableCallHandler
where TRequest : class
where TResponse : class
{
Expand Down Expand Up @@ -74,7 +80,7 @@ public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
{
if (!(e is RpcException))
{
Logger.Warning(e, "Exception occured in handler.");
Logger.Warning(e, "Exception occured in handler or interceptors.");
}
status = HandlerUtils.GetStatusFromExceptionAndMergeTrailers(e, context.ResponseTrailers);
}
Expand All @@ -89,9 +95,14 @@ public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
}
await finishedTask.ConfigureAwait(false);
}

IServerCallHandler IInterceptableCallHandler.Intercept(Interceptor interceptor)
{
return new UnaryServerCallHandler<TRequest, TResponse>(method, (request, context) => interceptor.UnaryServerHandler(request, context, handler));
}
}

internal class ServerStreamingServerCallHandler<TRequest, TResponse> : IServerCallHandler
internal class ServerStreamingServerCallHandler<TRequest, TResponse> : IServerCallHandler, IInterceptableCallHandler
where TRequest : class
where TResponse : class
{
Expand Down Expand Up @@ -131,7 +142,7 @@ public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
{
if (!(e is RpcException))
{
Logger.Warning(e, "Exception occured in handler.");
Logger.Warning(e, "Exception occured in handler or interceptors.");
}
status = HandlerUtils.GetStatusFromExceptionAndMergeTrailers(e, context.ResponseTrailers);
}
Expand All @@ -147,9 +158,14 @@ public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
}
await finishedTask.ConfigureAwait(false);
}

IServerCallHandler IInterceptableCallHandler.Intercept(Interceptor interceptor)
{
return new ServerStreamingServerCallHandler<TRequest, TResponse>(method, (request, responseStream, context) => interceptor.ServerStreamingServerHandler(request, responseStream, context, handler));
}
}

internal class ClientStreamingServerCallHandler<TRequest, TResponse> : IServerCallHandler
internal class ClientStreamingServerCallHandler<TRequest, TResponse> : IServerCallHandler, IInterceptableCallHandler
where TRequest : class
where TResponse : class
{
Expand Down Expand Up @@ -189,7 +205,7 @@ public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
{
if (!(e is RpcException))
{
Logger.Warning(e, "Exception occured in handler.");
Logger.Warning(e, "Exception occured in handler or interceptor.");
}
status = HandlerUtils.GetStatusFromExceptionAndMergeTrailers(e, context.ResponseTrailers);
}
Expand All @@ -205,9 +221,14 @@ public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
}
await finishedTask.ConfigureAwait(false);
}

IServerCallHandler IInterceptableCallHandler.Intercept(Interceptor interceptor)
{
return new ClientStreamingServerCallHandler<TRequest, TResponse>(method, (requestStream, context) => interceptor.ClientStreamingServerHandler(requestStream, context, handler));
}
}

internal class DuplexStreamingServerCallHandler<TRequest, TResponse> : IServerCallHandler
internal class DuplexStreamingServerCallHandler<TRequest, TResponse> : IServerCallHandler, IInterceptableCallHandler
where TRequest : class
where TResponse : class
{
Expand Down Expand Up @@ -245,7 +266,7 @@ public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
{
if (!(e is RpcException))
{
Logger.Warning(e, "Exception occured in handler.");
Logger.Warning(e, "Exception occured in handler or interceptor.");
}
status = HandlerUtils.GetStatusFromExceptionAndMergeTrailers(e, context.ResponseTrailers);
}
Expand All @@ -260,6 +281,11 @@ public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
}
await finishedTask.ConfigureAwait(false);
}

IServerCallHandler IInterceptableCallHandler.Intercept(Interceptor interceptor)
{
return new DuplexStreamingServerCallHandler<TRequest, TResponse>(method, (requestStream, responseStream, context) => interceptor.DuplexStreamingServerHandler(requestStream, responseStream, context, handler));
}
}

internal class UnimplementedMethodCallHandler : IServerCallHandler
Expand Down
25 changes: 25 additions & 0 deletions src/csharp/Grpc.Core/ServerServiceDefinition.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
using Grpc.Core.Interceptors;
using Grpc.Core.Internal;
using Grpc.Core.Utils;

namespace Grpc.Core
{
Expand All @@ -45,6 +48,28 @@ internal IDictionary<string, IServerCallHandler> CallHandlers
}
}

/// <summary>
/// Returns a <see cref="Grpc.Core.ServerServiceDefinition" /> instance that
/// intercepts calls to the underlying service handler via the given interceptor.
/// This is an EXPERIMENTAL API.
/// </summary>
/// <param name="interceptor">The interceptor to register on service.</param>
public ServerServiceDefinition Intercept(Interceptor interceptor)
{
GrpcPreconditions.CheckNotNull(interceptor, "interceptor");
return new ServerServiceDefinition(CallHandlers.ToDictionary(
x => x.Key, x =>
{
var value = x.Value;
var interceptable = value as IInterceptableCallHandler;
if (interceptable == null)
{
return value;
}
return interceptable.Intercept(interceptor);
}));
}

/// <summary>
/// Creates a new builder object for <c>ServerServiceDefinition</c>.
/// </summary>
Expand Down

0 comments on commit 4df68ae

Please sign in to comment.