Skip to content

Commit

Permalink
Merge pull request #8084 from jtattermusch/throw_rpcexception_on_failure
Browse files Browse the repository at this point in the history
Fix wrong exceptions being thrown on send failure.
  • Loading branch information
jtattermusch authored Sep 19, 2016
2 parents 7fbe5c5 + 6eb9877 commit c4f9c9a
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 11 deletions.
4 changes: 2 additions & 2 deletions src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

using System;
using System.Collections.Generic;
using System.IO;
using System.Runtime.InteropServices;
using System.Threading.Tasks;

Expand Down Expand Up @@ -149,8 +150,7 @@ public void WriteCompletionFailureThrows()

var writeTask = responseStream.WriteAsync("request1");
fakeCall.SendCompletionHandler(false);
// TODO(jtattermusch): should we throw a different exception type instead?
Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await writeTask);
Assert.ThrowsAsync(typeof(IOException), async () => await writeTask);

fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
AssertFinished(asyncCallServer, fakeCall, finishedTask);
Expand Down
102 changes: 99 additions & 3 deletions src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -180,21 +180,74 @@ public void ClientStreaming_MoreRequests_Success()
}

[Test]
public void ClientStreaming_WriteCompletionFailure()
public void ClientStreaming_WriteFailureThrowsRpcException()
{
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<string, string>(asyncCall);

var writeTask = requestStream.WriteAsync("request1");
fakeCall.SendCompletionHandler(false);
// TODO: maybe IOException or waiting for RPCException is more appropriate here.
Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await writeTask);

// The write will wait for call to finish to receive the status code.
Assert.IsFalse(writeTask.IsCompleted);

fakeCall.UnaryResponseClientHandler(true,
CreateClientSideStatus(StatusCode.Internal),
null,
new Metadata());

var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
Assert.AreEqual(StatusCode.Internal, ex.Status.StatusCode);

AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Internal);
}

[Test]
public void ClientStreaming_WriteFailureThrowsRpcException2()
{
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<string, string>(asyncCall);

var writeTask = requestStream.WriteAsync("request1");

fakeCall.UnaryResponseClientHandler(true,
CreateClientSideStatus(StatusCode.Internal),
null,
new Metadata());

fakeCall.SendCompletionHandler(false);

var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
Assert.AreEqual(StatusCode.Internal, ex.Status.StatusCode);

AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Internal);
}

[Test]
public void ClientStreaming_WriteFailureThrowsRpcException3()
{
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<string, string>(asyncCall);

var writeTask = requestStream.WriteAsync("request1");
fakeCall.SendCompletionHandler(false);

// Until the delayed write completion has been triggered,
// we still act as if there was an active write.
Assert.Throws(typeof(InvalidOperationException), () => requestStream.WriteAsync("request2"));

fakeCall.UnaryResponseClientHandler(true,
CreateClientSideStatus(StatusCode.Internal),
null,
new Metadata());

var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
Assert.AreEqual(StatusCode.Internal, ex.Status.StatusCode);

// Following attempts to write keep delivering the same status
var ex2 = Assert.ThrowsAsync<RpcException>(async () => await requestStream.WriteAsync("after call has finished"));
Assert.AreEqual(StatusCode.Internal, ex2.Status.StatusCode);

AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Internal);
}

Expand Down Expand Up @@ -415,6 +468,49 @@ public void DuplexStreaming_CompleteAfterReceivingStatusSuceeds()
Assert.DoesNotThrowAsync(async () => await requestStream.CompleteAsync());
}

[Test]
public void DuplexStreaming_WriteFailureThrowsRpcException()
{
asyncCall.StartDuplexStreamingCall();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
var responseStream = new ClientResponseStream<string, string>(asyncCall);

var writeTask = requestStream.WriteAsync("request1");
fakeCall.SendCompletionHandler(false);

// The write will wait for call to finish to receive the status code.
Assert.IsFalse(writeTask.IsCompleted);

var readTask = responseStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, null);
fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.PermissionDenied));

var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
Assert.AreEqual(StatusCode.PermissionDenied, ex.Status.StatusCode);

AssertStreamingResponseError(asyncCall, fakeCall, readTask, StatusCode.PermissionDenied);
}

[Test]
public void DuplexStreaming_WriteFailureThrowsRpcException2()
{
asyncCall.StartDuplexStreamingCall();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
var responseStream = new ClientResponseStream<string, string>(asyncCall);

var writeTask = requestStream.WriteAsync("request1");

var readTask = responseStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, null);
fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.PermissionDenied));
fakeCall.SendCompletionHandler(false);

var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
Assert.AreEqual(StatusCode.PermissionDenied, ex.Status.StatusCode);

AssertStreamingResponseError(asyncCall, fakeCall, readTask, StatusCode.PermissionDenied);
}

[Test]
public void DuplexStreaming_WriteAfterCancellationRequestThrowsTaskCanceledException()
{
Expand Down
31 changes: 29 additions & 2 deletions src/csharp/Grpc.Core/Internal/AsyncCall.cs
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,11 @@ protected override bool IsClient
get { return true; }
}

protected override Exception GetRpcExceptionClientOnly()
{
return new RpcException(finishedStatus.Value.Status);
}

protected override Task CheckSendAllowedOrEarlyResult()
{
var earlyResult = CheckSendPreconditionsClientSide();
Expand Down Expand Up @@ -452,6 +457,7 @@ private void HandleUnaryResponse(bool success, ClientSideStatus receivedStatus,

using (Profilers.ForCurrentThread().NewScope("AsyncCall.HandleUnaryResponse"))
{
TaskCompletionSource<object> delayedStreamingWriteTcs = null;
TResponse msg = default(TResponse);
var deserializeException = TryDeserialize(receivedMessage, out msg);

Expand All @@ -465,13 +471,23 @@ private void HandleUnaryResponse(bool success, ClientSideStatus receivedStatus,
}
finishedStatus = receivedStatus;

if (isStreamingWriteCompletionDelayed)
{
delayedStreamingWriteTcs = streamingWriteTcs;
streamingWriteTcs = null;
}

ReleaseResourcesIfPossible();
}

responseHeadersTcs.SetResult(responseHeaders);

var status = receivedStatus.Status;
if (delayedStreamingWriteTcs != null)
{
delayedStreamingWriteTcs.SetException(GetRpcExceptionClientOnly());
}

var status = receivedStatus.Status;
if (status.StatusCode != StatusCode.OK)
{
unaryResponseTcs.SetException(new RpcException(status));
Expand All @@ -490,16 +506,27 @@ private void HandleFinished(bool success, ClientSideStatus receivedStatus)
// NOTE: because this event is a result of batch containing GRPC_OP_RECV_STATUS_ON_CLIENT,
// success will be always set to true.

TaskCompletionSource<object> delayedStreamingWriteTcs = null;

lock (myLock)
{
finished = true;
finishedStatus = receivedStatus;
if (isStreamingWriteCompletionDelayed)
{
delayedStreamingWriteTcs = streamingWriteTcs;
streamingWriteTcs = null;
}

ReleaseResourcesIfPossible();
}

var status = receivedStatus.Status;
if (delayedStreamingWriteTcs != null)
{
delayedStreamingWriteTcs.SetException(GetRpcExceptionClientOnly());
}

var status = receivedStatus.Status;
if (status.StatusCode != StatusCode.OK)
{
streamingCallFinishedTcs.SetException(new RpcException(status));
Expand Down
40 changes: 36 additions & 4 deletions src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ internal abstract class AsyncCallBase<TWrite, TRead>
protected TaskCompletionSource<TRead> streamingReadTcs; // Completion of a pending streaming read if not null.
protected TaskCompletionSource<object> streamingWriteTcs; // Completion of a pending streaming write or send close from client if not null.
protected TaskCompletionSource<object> sendStatusFromServerTcs;
protected bool isStreamingWriteCompletionDelayed; // Only used for the client side.

protected bool readingDone; // True if last read (i.e. read with null payload) was already received.
protected bool halfcloseRequested; // True if send close have been initiated.
Expand Down Expand Up @@ -200,6 +201,12 @@ protected abstract bool IsClient
get;
}

/// <summary>
/// Returns an exception to throw for a failed send operation.
/// It is only allowed to call this method for a call that has already finished.
/// </summary>
protected abstract Exception GetRpcExceptionClientOnly();

private void ReleaseResources()
{
if (call != null)
Expand Down Expand Up @@ -252,18 +259,43 @@ protected Exception TryDeserialize(byte[] payload, out TRead msg)
/// </summary>
protected void HandleSendFinished(bool success)
{
bool delayCompletion = false;
TaskCompletionSource<object> origTcs = null;
lock (myLock)
{
origTcs = streamingWriteTcs;
streamingWriteTcs = null;
if (!success && !finished && IsClient) {
// We should be setting this only once per call, following writes will be short circuited
// because they cannot start until the entire call finishes.
GrpcPreconditions.CheckState(!isStreamingWriteCompletionDelayed);

// leave streamingWriteTcs set, it will be completed once call finished.
isStreamingWriteCompletionDelayed = true;
delayCompletion = true;
}
else
{
origTcs = streamingWriteTcs;
streamingWriteTcs = null;
}

ReleaseResourcesIfPossible();
}

if (!success)
{
origTcs.SetException(new InvalidOperationException("Send failed"));
if (!delayCompletion)
{
if (IsClient)
{
GrpcPreconditions.CheckState(finished); // implied by !success && !delayCompletion && IsClient
origTcs.SetException(GetRpcExceptionClientOnly());
}
else
{
origTcs.SetException (new IOException("Error sending from server."));
}
}
// if delayCompletion == true, postpone SetException until call finishes.
}
else
{
Expand All @@ -283,7 +315,7 @@ protected void HandleSendStatusFromServerFinished(bool success)

if (!success)
{
sendStatusFromServerTcs.SetException(new InvalidOperationException("Error sending status from server."));
sendStatusFromServerTcs.SetException(new IOException("Error sending status from server."));
}
else
{
Expand Down
6 changes: 6 additions & 0 deletions src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

using System;
using System.Diagnostics;
using System.IO;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;
Expand Down Expand Up @@ -193,6 +194,11 @@ protected override bool IsClient
get { return false; }
}

protected override Exception GetRpcExceptionClientOnly()
{
throw new InvalidOperationException("Call be only called for client calls");
}

protected override void OnAfterReleaseResources()
{
server.RemoveCallReference(this);
Expand Down

0 comments on commit c4f9c9a

Please sign in to comment.