Skip to content

Commit

Permalink
finishing serverside request stream should not be required for disposal
Browse files Browse the repository at this point in the history
jtattermusch committed May 4, 2016
1 parent f21f465 commit 739e86c
Showing 3 changed files with 20 additions and 35 deletions.
34 changes: 11 additions & 23 deletions src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs
Original file line number Diff line number Diff line change
@@ -80,16 +80,24 @@ public void Cleanup()

[Test]
public void CancelNotificationAfterStartDisposes()
{
var finishedTask = asyncCallServer.ServerSideCallAsync();
fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
AssertFinished(asyncCallServer, fakeCall, finishedTask);
}

[Test]
public void CancelNotificationAfterStartDisposesAfterPendingReadFinishes()
{
var finishedTask = asyncCallServer.ServerSideCallAsync();
var requestStream = new ServerRequestStream<string, string>(asyncCallServer);

// Finishing requestStream is needed for dispose to happen.
var moveNextTask = requestStream.MoveNext();

fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
fakeCall.ReceivedMessageHandler(true, null);
Assert.IsFalse(moveNextTask.Result);

fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
AssertFinished(asyncCallServer, fakeCall, finishedTask);
}

@@ -101,9 +109,8 @@ public void ReadAfterCancelNotificationCanSucceed()

fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);

// Check that startin a read after cancel notification has been processed is legal.
// Check that starting a read after cancel notification has been processed is legal.
var moveNextTask = requestStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, null);
Assert.IsFalse(moveNextTask.Result);

AssertFinished(asyncCallServer, fakeCall, finishedTask);
@@ -136,42 +143,28 @@ public void WriteAfterCancelNotificationFails()

// TODO(jtattermusch): should we throw a different exception type instead?
Assert.Throws(typeof(InvalidOperationException), () => responseStream.WriteAsync("request1"));

// Finishing requestStream is needed for dispose to happen.
var moveNextTask = requestStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, null);
Assert.IsFalse(moveNextTask.Result);

AssertFinished(asyncCallServer, fakeCall, finishedTask);
}

[Test]
public void WriteCompletionFailureThrows()
{
var finishedTask = asyncCallServer.ServerSideCallAsync();
var requestStream = new ServerRequestStream<string, string>(asyncCallServer);
var responseStream = new ServerResponseStream<string, string>(asyncCallServer);

var writeTask = responseStream.WriteAsync("request1");
fakeCall.SendCompletionHandler(false);
// TODO(jtattermusch): should we throw a different exception type instead?
Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await writeTask);

// Finishing requestStream is needed for dispose to happen.
var moveNextTask = requestStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, null);
Assert.IsFalse(moveNextTask.Result);

fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);

AssertFinished(asyncCallServer, fakeCall, finishedTask);
}

[Test]
public void WriteAndWriteStatusCanRunConcurrently()
{
var finishedTask = asyncCallServer.ServerSideCallAsync();
var requestStream = new ServerRequestStream<string, string>(asyncCallServer);
var responseStream = new ServerResponseStream<string, string>(asyncCallServer);

var writeTask = responseStream.WriteAsync("request1");
@@ -183,11 +176,6 @@ public void WriteAndWriteStatusCanRunConcurrently()
Assert.DoesNotThrowAsync(async () => await writeTask);
Assert.DoesNotThrowAsync(async () => await writeStatusTask);

// Finishing requestStream is needed for dispose to happen.
var moveNextTask = requestStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, null);
Assert.IsFalse(moveNextTask.Result);

fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);

AssertFinished(asyncCallServer, fakeCall, finishedTask);
7 changes: 1 addition & 6 deletions src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
Original file line number Diff line number Diff line change
@@ -155,7 +155,7 @@ protected Task<TRead> ReadMessageInternalAsync()
{
lock (myLock)
{
CheckReadingAllowed();
GrpcPreconditions.CheckState(started);
if (readingDone)
{
// the last read that returns null or throws an exception is idempotent
@@ -224,11 +224,6 @@ protected void CheckSendingAllowed(bool allowFinished)
GrpcPreconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time");
}

protected virtual void CheckReadingAllowed()
{
GrpcPreconditions.CheckState(started);
}

protected void CheckNotCancelled()
{
if (cancelRequested)
14 changes: 8 additions & 6 deletions src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
Original file line number Diff line number Diff line change
@@ -183,12 +183,6 @@ protected override bool IsClient
get { return false; }
}

protected override void CheckReadingAllowed()
{
base.CheckReadingAllowed();
GrpcPreconditions.CheckArgument(!cancelRequested);
}

protected override void OnAfterReleaseResources()
{
server.RemoveCallReference(this);
@@ -204,6 +198,14 @@ private void HandleFinishedServerside(bool success, bool cancelled)
lock (myLock)
{
finished = true;
if (streamingReadTcs == null)
{
// if there's no pending read, readingDone=true will dispose now.
// if there is a pending read, we will dispose once that read finishes.
readingDone = true;
streamingReadTcs = new TaskCompletionSource<TRequest>();
streamingReadTcs.SetResult(default(TRequest));
}
ReleaseResourcesIfPossible();
}

0 comments on commit 739e86c

Please sign in to comment.