From 7a73bec0e5cb11cf57b2f3ee77cee1249988ca4f Mon Sep 17 00:00:00 2001 From: Jan Tattermusch <jtattermusch@google.com> Date: Fri, 16 Sep 2016 16:55:17 +0200 Subject: [PATCH] dont allow new writes if theres a write with delayed completion --- .../Grpc.Core.Tests/Internal/AsyncCallTest.cs | 28 +++++++++++++++++++ src/csharp/Grpc.Core/Internal/AsyncCall.cs | 3 +- .../Grpc.Core/Internal/AsyncCallServer.cs | 2 +- 3 files changed, 31 insertions(+), 2 deletions(-) diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs index 8ee4d184ab..616bc06d76 100644 --- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs +++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs @@ -223,6 +223,34 @@ namespace Grpc.Core.Internal.Tests AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Internal); } + [Test] + public void ClientStreaming_WriteFailureThrowsRpcException3() + { + var resultTask = asyncCall.ClientStreamingCallAsync(); + var requestStream = new ClientRequestStream<string, string>(asyncCall); + + var writeTask = requestStream.WriteAsync("request1"); + fakeCall.SendCompletionHandler(false); + + // Until the delayed write completion has been triggered, + // we still act as if there was an active write. + Assert.Throws(typeof(InvalidOperationException), () => requestStream.WriteAsync("request2")); + + fakeCall.UnaryResponseClientHandler(true, + CreateClientSideStatus(StatusCode.Internal), + null, + new Metadata()); + + var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask); + Assert.AreEqual(StatusCode.Internal, ex.Status.StatusCode); + + // Following attempts to write keep delivering the same status + var ex2 = Assert.ThrowsAsync<RpcException>(async () => await requestStream.WriteAsync("after call has finished")); + Assert.AreEqual(StatusCode.Internal, ex2.Status.StatusCode); + + AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Internal); + } + [Test] public void ClientStreaming_WriteAfterReceivingStatusThrowsRpcException() { diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index db40b553ce..264c28ae7a 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -372,7 +372,8 @@ namespace Grpc.Core.Internal private Task CheckSendPreconditionsClientSide() { GrpcPreconditions.CheckState(!halfcloseRequested, "Request stream has already been completed."); - GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time."); + // if there is a delayed streaming write, we will treat that as if the write was still in progress until the call finishes. + GrpcPreconditions.CheckState(streamingWriteTcs == null && (finished || delayedStreamingWriteTcs == null), "Only one write can be pending at a time."); if (cancelRequested) { diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs index 50fdfa9006..8d9f548d62 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs @@ -208,7 +208,7 @@ namespace Grpc.Core.Internal { GrpcPreconditions.CheckState(!halfcloseRequested, "Response stream has already been completed."); GrpcPreconditions.CheckState(!finished, "Already finished."); - GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time"); + GrpcPreconditions.CheckState(streamingWriteTcs == null && delayedStreamingWriteTcs == null, "Only one write can be pending at a time"); GrpcPreconditions.CheckState(!disposed); return null; -- GitLab