Skip to content

Commit

Permalink
move core of Async*Call into a new internal AsyncCallState struct; ab…
Browse files Browse the repository at this point in the history
…stracts over Foo() vs Foo(object), avoiding delegate allocations
  • Loading branch information
mgravell authored and JamesNK committed Dec 4, 2019
1 parent 1d5a5af commit 2030a4e
Show file tree
Hide file tree
Showing 7 changed files with 309 additions and 52 deletions.
92 changes: 92 additions & 0 deletions src/csharp/Grpc.Core.Api/AsyncCallState.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
#region Copyright notice and license

// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#endregion


using System;
using System.Threading.Tasks;

namespace Grpc.Core
{
/// <summary>
/// Provides an abstraction over the callback providers
/// used by AsyncUnaryCall, AsyncDuplexStreamingCall, etc
/// </summary>
internal /* readonly */ struct AsyncCallState // can be made readonly in C# 7.2
{
readonly object responseHeadersAsync; // Task<Metadata> or Func<object, Task<Metadata>>
readonly object getStatusFunc; // Func<Status> or Func<object, Status>
readonly object getTrailersFunc; // Func<Metadata> or Func<object, Metadata>
readonly object disposeAction; // Action or Action<object>
readonly object callbackState; // arg0 for the callbacks above, if needed

internal AsyncCallState(
Func<object, Task<Metadata>> responseHeadersAsync,
Func<object, Status> getStatusFunc,
Func<object, Metadata> getTrailersFunc,
Action<object> disposeAction,
object callbackState)
{
this.responseHeadersAsync = responseHeadersAsync;
this.getStatusFunc = getStatusFunc;
this.getTrailersFunc = getTrailersFunc;
this.disposeAction = disposeAction;
this.callbackState = callbackState;
}

internal AsyncCallState(
Task<Metadata> responseHeadersAsync,
Func<Status> getStatusFunc,
Func<Metadata> getTrailersFunc,
Action disposeAction)
{
this.responseHeadersAsync = responseHeadersAsync;
this.getStatusFunc = getStatusFunc;
this.getTrailersFunc = getTrailersFunc;
this.disposeAction = disposeAction;
this.callbackState = null;
}

internal Task<Metadata> ResponseHeadersAsync()
{
var withState = responseHeadersAsync as Func<object, Task<Metadata>>;
return withState != null ? withState(callbackState)
: (Task<Metadata>)responseHeadersAsync;
}

internal Status GetStatus()
{
var withState = getStatusFunc as Func<object, Status>;
return withState != null ? withState(callbackState)
: ((Func<Status>)getStatusFunc)();
}

internal Metadata GetTrailers()
{
var withState = getTrailersFunc as Func<object, Metadata>;
return withState != null ? withState(callbackState)
: ((Func<Metadata>)getTrailersFunc)();
}

internal void Dispose()
{
var withState = disposeAction as Action<object>;
if (withState != null) withState(callbackState);
else ((Action)disposeAction)();
}
}
}
41 changes: 29 additions & 12 deletions src/csharp/Grpc.Core.Api/AsyncClientStreamingCall.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,7 @@ public sealed class AsyncClientStreamingCall<TRequest, TResponse> : IDisposable
{
readonly IClientStreamWriter<TRequest> requestStream;
readonly Task<TResponse> responseAsync;
readonly Task<Metadata> responseHeadersAsync;
readonly Func<Status> getStatusFunc;
readonly Func<Metadata> getTrailersFunc;
readonly Action disposeAction;
readonly AsyncCallState callState;

/// <summary>
/// Creates a new AsyncClientStreamingCall object with the specified properties.
Expand All @@ -54,10 +51,30 @@ public AsyncClientStreamingCall(IClientStreamWriter<TRequest> requestStream,
{
this.requestStream = requestStream;
this.responseAsync = responseAsync;
this.responseHeadersAsync = responseHeadersAsync;
this.getStatusFunc = getStatusFunc;
this.getTrailersFunc = getTrailersFunc;
this.disposeAction = disposeAction;
this.callState = new AsyncCallState(responseHeadersAsync, getStatusFunc, getTrailersFunc, disposeAction);
}

/// <summary>
/// Creates a new AsyncClientStreamingCall object with the specified properties.
/// </summary>
/// <param name="requestStream">Stream of request values.</param>
/// <param name="responseAsync">The response of the asynchronous call.</param>
/// <param name="responseHeadersAsync">Response headers of the asynchronous call.</param>
/// <param name="getStatusFunc">Delegate returning the status of the call.</param>
/// <param name="getTrailersFunc">Delegate returning the trailing metadata of the call.</param>
/// <param name="disposeAction">Delegate to invoke when Dispose is called on the call object.</param>
/// <param name="state">State object for use with the callback parameters.</param>
public AsyncClientStreamingCall(IClientStreamWriter<TRequest> requestStream,
Task<TResponse> responseAsync,
Func<object, Task<Metadata>> responseHeadersAsync,
Func<object, Status> getStatusFunc,
Func<object, Metadata> getTrailersFunc,
Action<object> disposeAction,
object state)
{
this.requestStream = requestStream;
this.responseAsync = responseAsync;
this.callState = new AsyncCallState(responseHeadersAsync, getStatusFunc, getTrailersFunc, disposeAction, state);
}

/// <summary>
Expand All @@ -78,7 +95,7 @@ public Task<Metadata> ResponseHeadersAsync
{
get
{
return this.responseHeadersAsync;
return callState.ResponseHeadersAsync();
}
}

Expand Down Expand Up @@ -108,7 +125,7 @@ public TaskAwaiter<TResponse> GetAwaiter()
/// </summary>
public Status GetStatus()
{
return getStatusFunc();
return callState.GetStatus();
}

/// <summary>
Expand All @@ -117,7 +134,7 @@ public Status GetStatus()
/// </summary>
public Metadata GetTrailers()
{
return getTrailersFunc();
return callState.GetTrailers();
}

/// <summary>
Expand All @@ -132,7 +149,7 @@ public Metadata GetTrailers()
/// </remarks>
public void Dispose()
{
disposeAction.Invoke();
callState.Dispose();
}
}
}
41 changes: 29 additions & 12 deletions src/csharp/Grpc.Core.Api/AsyncDuplexStreamingCall.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,7 @@ public sealed class AsyncDuplexStreamingCall<TRequest, TResponse> : IDisposable
{
readonly IClientStreamWriter<TRequest> requestStream;
readonly IAsyncStreamReader<TResponse> responseStream;
readonly Task<Metadata> responseHeadersAsync;
readonly Func<Status> getStatusFunc;
readonly Func<Metadata> getTrailersFunc;
readonly Action disposeAction;
readonly AsyncCallState callState;

/// <summary>
/// Creates a new AsyncDuplexStreamingCall object with the specified properties.
Expand All @@ -53,10 +50,30 @@ public AsyncDuplexStreamingCall(IClientStreamWriter<TRequest> requestStream,
{
this.requestStream = requestStream;
this.responseStream = responseStream;
this.responseHeadersAsync = responseHeadersAsync;
this.getStatusFunc = getStatusFunc;
this.getTrailersFunc = getTrailersFunc;
this.disposeAction = disposeAction;
this.callState = new AsyncCallState(responseHeadersAsync, getStatusFunc, getTrailersFunc, disposeAction);
}

/// <summary>
/// Creates a new AsyncDuplexStreamingCall object with the specified properties.
/// </summary>
/// <param name="requestStream">Stream of request values.</param>
/// <param name="responseStream">Stream of response values.</param>
/// <param name="responseHeadersAsync">Response headers of the asynchronous call.</param>
/// <param name="getStatusFunc">Delegate returning the status of the call.</param>
/// <param name="getTrailersFunc">Delegate returning the trailing metadata of the call.</param>
/// <param name="disposeAction">Delegate to invoke when Dispose is called on the call object.</param>
/// <param name="state">State object for use with the callback parameters.</param>
public AsyncDuplexStreamingCall(IClientStreamWriter<TRequest> requestStream,
IAsyncStreamReader<TResponse> responseStream,
Func<object, Task<Metadata>> responseHeadersAsync,
Func<object, Status> getStatusFunc,
Func<object, Metadata> getTrailersFunc,
Action<object> disposeAction,
object state)
{
this.requestStream = requestStream;
this.responseStream = responseStream;
this.callState = new AsyncCallState(responseHeadersAsync, getStatusFunc, getTrailersFunc, disposeAction, state);
}

/// <summary>
Expand Down Expand Up @@ -88,7 +105,7 @@ public Task<Metadata> ResponseHeadersAsync
{
get
{
return this.responseHeadersAsync;
return callState.ResponseHeadersAsync();
}
}

Expand All @@ -98,7 +115,7 @@ public Task<Metadata> ResponseHeadersAsync
/// </summary>
public Status GetStatus()
{
return getStatusFunc();
return callState.GetStatus();
}

/// <summary>
Expand All @@ -107,7 +124,7 @@ public Status GetStatus()
/// </summary>
public Metadata GetTrailers()
{
return getTrailersFunc();
return callState.GetTrailers();
}

/// <summary>
Expand All @@ -122,7 +139,7 @@ public Metadata GetTrailers()
/// </remarks>
public void Dispose()
{
disposeAction.Invoke();
callState.Dispose();
}
}
}
38 changes: 26 additions & 12 deletions src/csharp/Grpc.Core.Api/AsyncServerStreamingCall.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,7 @@ namespace Grpc.Core
public sealed class AsyncServerStreamingCall<TResponse> : IDisposable
{
readonly IAsyncStreamReader<TResponse> responseStream;
readonly Task<Metadata> responseHeadersAsync;
readonly Func<Status> getStatusFunc;
readonly Func<Metadata> getTrailersFunc;
readonly Action disposeAction;
readonly AsyncCallState callState;

/// <summary>
/// Creates a new AsyncDuplexStreamingCall object with the specified properties.
Expand All @@ -48,10 +45,27 @@ public AsyncServerStreamingCall(IAsyncStreamReader<TResponse> responseStream,
Action disposeAction)
{
this.responseStream = responseStream;
this.responseHeadersAsync = responseHeadersAsync;
this.getStatusFunc = getStatusFunc;
this.getTrailersFunc = getTrailersFunc;
this.disposeAction = disposeAction;
this.callState = new AsyncCallState(responseHeadersAsync, getStatusFunc, getTrailersFunc, disposeAction);
}

/// <summary>
/// Creates a new AsyncDuplexStreamingCall object with the specified properties.
/// </summary>
/// <param name="responseStream">Stream of response values.</param>
/// <param name="responseHeadersAsync">Response headers of the asynchronous call.</param>
/// <param name="getStatusFunc">Delegate returning the status of the call.</param>
/// <param name="getTrailersFunc">Delegate returning the trailing metadata of the call.</param>
/// <param name="disposeAction">Delegate to invoke when Dispose is called on the call object.</param>
/// <param name="state">State object for use with the callback parameters.</param>
public AsyncServerStreamingCall(IAsyncStreamReader<TResponse> responseStream,
Func<object, Task<Metadata>> responseHeadersAsync,
Func<object, Status> getStatusFunc,
Func<object, Metadata> getTrailersFunc,
Action<object> disposeAction,
object state)
{
this.responseStream = responseStream;
this.callState = new AsyncCallState(responseHeadersAsync, getStatusFunc, getTrailersFunc, disposeAction, state);
}

/// <summary>
Expand All @@ -72,7 +86,7 @@ public Task<Metadata> ResponseHeadersAsync
{
get
{
return this.responseHeadersAsync;
return callState.ResponseHeadersAsync();
}
}

Expand All @@ -82,7 +96,7 @@ public Task<Metadata> ResponseHeadersAsync
/// </summary>
public Status GetStatus()
{
return getStatusFunc();
return callState.GetStatus();
}

/// <summary>
Expand All @@ -91,7 +105,7 @@ public Status GetStatus()
/// </summary>
public Metadata GetTrailers()
{
return getTrailersFunc();
return callState.GetTrailers();
}

/// <summary>
Expand All @@ -106,7 +120,7 @@ public Metadata GetTrailers()
/// </remarks>
public void Dispose()
{
disposeAction.Invoke();
callState.Dispose();
}
}
}
Loading

0 comments on commit 2030a4e

Please sign in to comment.