Skip to content

Commit

Permalink
simplify delayed streaming write logic
Browse files Browse the repository at this point in the history
  • Loading branch information
jtattermusch committed Sep 16, 2016
1 parent 7a73bec commit 6eb9877
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 19 deletions.
28 changes: 18 additions & 10 deletions src/csharp/Grpc.Core/Internal/AsyncCall.cs
Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,7 @@ protected override Task CheckSendAllowedOrEarlyResult()
private Task CheckSendPreconditionsClientSide()
{
GrpcPreconditions.CheckState(!halfcloseRequested, "Request stream has already been completed.");
// if there is a delayed streaming write, we will treat that as if the write was still in progress until the call finishes.
GrpcPreconditions.CheckState(streamingWriteTcs == null && (finished || delayedStreamingWriteTcs == null), "Only one write can be pending at a time.");
GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time.");

if (cancelRequested)
{
Expand Down Expand Up @@ -458,7 +457,7 @@ private void HandleUnaryResponse(bool success, ClientSideStatus receivedStatus,

using (Profilers.ForCurrentThread().NewScope("AsyncCall.HandleUnaryResponse"))
{
TaskCompletionSource<object> delayedTcs;
TaskCompletionSource<object> delayedStreamingWriteTcs = null;
TResponse msg = default(TResponse);
var deserializeException = TryDeserialize(receivedMessage, out msg);

Expand All @@ -471,16 +470,21 @@ private void HandleUnaryResponse(bool success, ClientSideStatus receivedStatus,
receivedStatus = new ClientSideStatus(DeserializeResponseFailureStatus, receivedStatus.Trailers);
}
finishedStatus = receivedStatus;
delayedTcs = delayedStreamingWriteTcs;

if (isStreamingWriteCompletionDelayed)
{
delayedStreamingWriteTcs = streamingWriteTcs;
streamingWriteTcs = null;
}

ReleaseResourcesIfPossible();
}

responseHeadersTcs.SetResult(responseHeaders);

if (delayedTcs != null)
if (delayedStreamingWriteTcs != null)
{
delayedTcs.SetException(GetRpcExceptionClientOnly());
delayedStreamingWriteTcs.SetException(GetRpcExceptionClientOnly());
}

var status = receivedStatus.Status;
Expand All @@ -502,20 +506,24 @@ private void HandleFinished(bool success, ClientSideStatus receivedStatus)
// NOTE: because this event is a result of batch containing GRPC_OP_RECV_STATUS_ON_CLIENT,
// success will be always set to true.

TaskCompletionSource<object> delayedTcs;
TaskCompletionSource<object> delayedStreamingWriteTcs = null;

lock (myLock)
{
finished = true;
finishedStatus = receivedStatus;
delayedTcs = delayedStreamingWriteTcs;
if (isStreamingWriteCompletionDelayed)
{
delayedStreamingWriteTcs = streamingWriteTcs;
streamingWriteTcs = null;
}

ReleaseResourcesIfPossible();
}

if (delayedTcs != null)
if (delayedStreamingWriteTcs != null)
{
delayedTcs.SetException(GetRpcExceptionClientOnly());
delayedStreamingWriteTcs.SetException(GetRpcExceptionClientOnly());
}

var status = receivedStatus.Status;
Expand Down
20 changes: 12 additions & 8 deletions src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ internal abstract class AsyncCallBase<TWrite, TRead>

protected TaskCompletionSource<TRead> streamingReadTcs; // Completion of a pending streaming read if not null.
protected TaskCompletionSource<object> streamingWriteTcs; // Completion of a pending streaming write or send close from client if not null.
protected TaskCompletionSource<object> delayedStreamingWriteTcs;
protected TaskCompletionSource<object> sendStatusFromServerTcs;
protected bool isStreamingWriteCompletionDelayed; // Only used for the client side.

protected bool readingDone; // True if last read (i.e. read with null payload) was already received.
protected bool halfcloseRequested; // True if send close have been initiated.
Expand Down Expand Up @@ -263,16 +263,20 @@ protected void HandleSendFinished(bool success)
TaskCompletionSource<object> origTcs = null;
lock (myLock)
{
origTcs = streamingWriteTcs;
streamingWriteTcs = null;
if (!success && !finished && IsClient) {
// We should be setting this only once per call, following writes will be short circuited
// because they cannot start until the entire call finishes.
GrpcPreconditions.CheckState(!isStreamingWriteCompletionDelayed);

if (!success && !finished && IsClient)
{
// We should be setting this only once per call, following writes will be short circuited.
GrpcPreconditions.CheckState (delayedStreamingWriteTcs == null);
delayedStreamingWriteTcs = origTcs;
// leave streamingWriteTcs set, it will be completed once call finished.
isStreamingWriteCompletionDelayed = true;
delayCompletion = true;
}
else
{
origTcs = streamingWriteTcs;
streamingWriteTcs = null;
}

ReleaseResourcesIfPossible();
}
Expand Down
2 changes: 1 addition & 1 deletion src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ protected override Task CheckSendAllowedOrEarlyResult()
{
GrpcPreconditions.CheckState(!halfcloseRequested, "Response stream has already been completed.");
GrpcPreconditions.CheckState(!finished, "Already finished.");
GrpcPreconditions.CheckState(streamingWriteTcs == null && delayedStreamingWriteTcs == null, "Only one write can be pending at a time");
GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time");
GrpcPreconditions.CheckState(!disposed);

return null;
Expand Down

0 comments on commit 6eb9877

Please sign in to comment.