diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs index 0b6981f871dd0abc7d8231af863efae45561857f..058371521d63ff19d7b6b69b2ea0c4bcb1cdcab1 100644 --- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs +++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs @@ -80,16 +80,24 @@ namespace Grpc.Core.Internal.Tests [Test] public void CancelNotificationAfterStartDisposes() + { + var finishedTask = asyncCallServer.ServerSideCallAsync(); + fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); + AssertFinished(asyncCallServer, fakeCall, finishedTask); + } + + [Test] + public void CancelNotificationAfterStartDisposesAfterPendingReadFinishes() { var finishedTask = asyncCallServer.ServerSideCallAsync(); var requestStream = new ServerRequestStream<string, string>(asyncCallServer); - // Finishing requestStream is needed for dispose to happen. var moveNextTask = requestStream.MoveNext(); + + fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); fakeCall.ReceivedMessageHandler(true, null); Assert.IsFalse(moveNextTask.Result); - fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); AssertFinished(asyncCallServer, fakeCall, finishedTask); } @@ -101,9 +109,8 @@ namespace Grpc.Core.Internal.Tests fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); - // Check that startin a read after cancel notification has been processed is legal. + // Check that starting a read after cancel notification has been processed is legal. var moveNextTask = requestStream.MoveNext(); - fakeCall.ReceivedMessageHandler(true, null); Assert.IsFalse(moveNextTask.Result); AssertFinished(asyncCallServer, fakeCall, finishedTask); @@ -136,12 +143,6 @@ namespace Grpc.Core.Internal.Tests // TODO(jtattermusch): should we throw a different exception type instead? Assert.Throws(typeof(InvalidOperationException), () => responseStream.WriteAsync("request1")); - - // Finishing requestStream is needed for dispose to happen. - var moveNextTask = requestStream.MoveNext(); - fakeCall.ReceivedMessageHandler(true, null); - Assert.IsFalse(moveNextTask.Result); - AssertFinished(asyncCallServer, fakeCall, finishedTask); } @@ -149,7 +150,6 @@ namespace Grpc.Core.Internal.Tests public void WriteCompletionFailureThrows() { var finishedTask = asyncCallServer.ServerSideCallAsync(); - var requestStream = new ServerRequestStream<string, string>(asyncCallServer); var responseStream = new ServerResponseStream<string, string>(asyncCallServer); var writeTask = responseStream.WriteAsync("request1"); @@ -157,13 +157,7 @@ namespace Grpc.Core.Internal.Tests // TODO(jtattermusch): should we throw a different exception type instead? Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await writeTask); - // Finishing requestStream is needed for dispose to happen. - var moveNextTask = requestStream.MoveNext(); - fakeCall.ReceivedMessageHandler(true, null); - Assert.IsFalse(moveNextTask.Result); - fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); - AssertFinished(asyncCallServer, fakeCall, finishedTask); } @@ -171,7 +165,6 @@ namespace Grpc.Core.Internal.Tests public void WriteAndWriteStatusCanRunConcurrently() { var finishedTask = asyncCallServer.ServerSideCallAsync(); - var requestStream = new ServerRequestStream<string, string>(asyncCallServer); var responseStream = new ServerResponseStream<string, string>(asyncCallServer); var writeTask = responseStream.WriteAsync("request1"); @@ -183,11 +176,6 @@ namespace Grpc.Core.Internal.Tests Assert.DoesNotThrowAsync(async () => await writeTask); Assert.DoesNotThrowAsync(async () => await writeStatusTask); - // Finishing requestStream is needed for dispose to happen. - var moveNextTask = requestStream.MoveNext(); - fakeCall.ReceivedMessageHandler(true, null); - Assert.IsFalse(moveNextTask.Result); - fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); AssertFinished(asyncCallServer, fakeCall, finishedTask); diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs index 18dbe87734b0347fa384b88198efa4bf7e08b6b6..42234dcac217d819a68f4f0b57570b76b6e1f660 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs @@ -155,7 +155,7 @@ namespace Grpc.Core.Internal { lock (myLock) { - CheckReadingAllowed(); + GrpcPreconditions.CheckState(started); if (readingDone) { // the last read that returns null or throws an exception is idempotent @@ -224,11 +224,6 @@ namespace Grpc.Core.Internal GrpcPreconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time"); } - protected virtual void CheckReadingAllowed() - { - GrpcPreconditions.CheckState(started); - } - protected void CheckNotCancelled() { if (cancelRequested) diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs index 44f2988e21e76a04005ab15a86014c893de28918..eafe2ccab8713bc47cf890d24b9f22bb32142d70 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs @@ -183,12 +183,6 @@ namespace Grpc.Core.Internal get { return false; } } - protected override void CheckReadingAllowed() - { - base.CheckReadingAllowed(); - GrpcPreconditions.CheckArgument(!cancelRequested); - } - protected override void OnAfterReleaseResources() { server.RemoveCallReference(this); @@ -204,6 +198,14 @@ namespace Grpc.Core.Internal lock (myLock) { finished = true; + if (streamingReadTcs == null) + { + // if there's no pending read, readingDone=true will dispose now. + // if there is a pending read, we will dispose once that read finishes. + readingDone = true; + streamingReadTcs = new TaskCompletionSource<TRequest>(); + streamingReadTcs.SetResult(default(TRequest)); + } ReleaseResourcesIfPossible(); }