From 6eb987780a80d2ba83feafb3b0a98a9c60e0153a Mon Sep 17 00:00:00 2001 From: Jan Tattermusch <jtattermusch@google.com> Date: Fri, 16 Sep 2016 17:19:11 +0200 Subject: [PATCH] simplify delayed streaming write logic --- src/csharp/Grpc.Core/Internal/AsyncCall.cs | 28 ++++++++++++------- .../Grpc.Core/Internal/AsyncCallBase.cs | 20 +++++++------ .../Grpc.Core/Internal/AsyncCallServer.cs | 2 +- 3 files changed, 31 insertions(+), 19 deletions(-) diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index 264c28ae7a..9abaf1120f 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -372,8 +372,7 @@ namespace Grpc.Core.Internal private Task CheckSendPreconditionsClientSide() { GrpcPreconditions.CheckState(!halfcloseRequested, "Request stream has already been completed."); - // if there is a delayed streaming write, we will treat that as if the write was still in progress until the call finishes. - GrpcPreconditions.CheckState(streamingWriteTcs == null && (finished || delayedStreamingWriteTcs == null), "Only one write can be pending at a time."); + GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time."); if (cancelRequested) { @@ -458,7 +457,7 @@ namespace Grpc.Core.Internal using (Profilers.ForCurrentThread().NewScope("AsyncCall.HandleUnaryResponse")) { - TaskCompletionSource<object> delayedTcs; + TaskCompletionSource<object> delayedStreamingWriteTcs = null; TResponse msg = default(TResponse); var deserializeException = TryDeserialize(receivedMessage, out msg); @@ -471,16 +470,21 @@ namespace Grpc.Core.Internal receivedStatus = new ClientSideStatus(DeserializeResponseFailureStatus, receivedStatus.Trailers); } finishedStatus = receivedStatus; - delayedTcs = delayedStreamingWriteTcs; + + if (isStreamingWriteCompletionDelayed) + { + delayedStreamingWriteTcs = streamingWriteTcs; + streamingWriteTcs = null; + } ReleaseResourcesIfPossible(); } responseHeadersTcs.SetResult(responseHeaders); - if (delayedTcs != null) + if (delayedStreamingWriteTcs != null) { - delayedTcs.SetException(GetRpcExceptionClientOnly()); + delayedStreamingWriteTcs.SetException(GetRpcExceptionClientOnly()); } var status = receivedStatus.Status; @@ -502,20 +506,24 @@ namespace Grpc.Core.Internal // NOTE: because this event is a result of batch containing GRPC_OP_RECV_STATUS_ON_CLIENT, // success will be always set to true. - TaskCompletionSource<object> delayedTcs; + TaskCompletionSource<object> delayedStreamingWriteTcs = null; lock (myLock) { finished = true; finishedStatus = receivedStatus; - delayedTcs = delayedStreamingWriteTcs; + if (isStreamingWriteCompletionDelayed) + { + delayedStreamingWriteTcs = streamingWriteTcs; + streamingWriteTcs = null; + } ReleaseResourcesIfPossible(); } - if (delayedTcs != null) + if (delayedStreamingWriteTcs != null) { - delayedTcs.SetException(GetRpcExceptionClientOnly()); + delayedStreamingWriteTcs.SetException(GetRpcExceptionClientOnly()); } var status = receivedStatus.Status; diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs index b27baba942..9f9d260e7e 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs @@ -68,8 +68,8 @@ namespace Grpc.Core.Internal protected TaskCompletionSource<TRead> streamingReadTcs; // Completion of a pending streaming read if not null. protected TaskCompletionSource<object> streamingWriteTcs; // Completion of a pending streaming write or send close from client if not null. - protected TaskCompletionSource<object> delayedStreamingWriteTcs; protected TaskCompletionSource<object> sendStatusFromServerTcs; + protected bool isStreamingWriteCompletionDelayed; // Only used for the client side. protected bool readingDone; // True if last read (i.e. read with null payload) was already received. protected bool halfcloseRequested; // True if send close have been initiated. @@ -263,16 +263,20 @@ namespace Grpc.Core.Internal TaskCompletionSource<object> origTcs = null; lock (myLock) { - origTcs = streamingWriteTcs; - streamingWriteTcs = null; + if (!success && !finished && IsClient) { + // We should be setting this only once per call, following writes will be short circuited + // because they cannot start until the entire call finishes. + GrpcPreconditions.CheckState(!isStreamingWriteCompletionDelayed); - if (!success && !finished && IsClient) - { - // We should be setting this only once per call, following writes will be short circuited. - GrpcPreconditions.CheckState (delayedStreamingWriteTcs == null); - delayedStreamingWriteTcs = origTcs; + // leave streamingWriteTcs set, it will be completed once call finished. + isStreamingWriteCompletionDelayed = true; delayCompletion = true; } + else + { + origTcs = streamingWriteTcs; + streamingWriteTcs = null; + } ReleaseResourcesIfPossible(); } diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs index 8d9f548d62..50fdfa9006 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs @@ -208,7 +208,7 @@ namespace Grpc.Core.Internal { GrpcPreconditions.CheckState(!halfcloseRequested, "Response stream has already been completed."); GrpcPreconditions.CheckState(!finished, "Already finished."); - GrpcPreconditions.CheckState(streamingWriteTcs == null && delayedStreamingWriteTcs == null, "Only one write can be pending at a time"); + GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time"); GrpcPreconditions.CheckState(!disposed); return null; -- GitLab