Skip to content
Snippets Groups Projects
Commit 6eb98778 authored by Jan Tattermusch's avatar Jan Tattermusch
Browse files

simplify delayed streaming write logic

parent 7a73bec0
No related branches found
No related tags found
No related merge requests found
...@@ -372,8 +372,7 @@ namespace Grpc.Core.Internal ...@@ -372,8 +372,7 @@ namespace Grpc.Core.Internal
private Task CheckSendPreconditionsClientSide() private Task CheckSendPreconditionsClientSide()
{ {
GrpcPreconditions.CheckState(!halfcloseRequested, "Request stream has already been completed."); GrpcPreconditions.CheckState(!halfcloseRequested, "Request stream has already been completed.");
// if there is a delayed streaming write, we will treat that as if the write was still in progress until the call finishes. GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time.");
GrpcPreconditions.CheckState(streamingWriteTcs == null && (finished || delayedStreamingWriteTcs == null), "Only one write can be pending at a time.");
if (cancelRequested) if (cancelRequested)
{ {
...@@ -458,7 +457,7 @@ namespace Grpc.Core.Internal ...@@ -458,7 +457,7 @@ namespace Grpc.Core.Internal
using (Profilers.ForCurrentThread().NewScope("AsyncCall.HandleUnaryResponse")) using (Profilers.ForCurrentThread().NewScope("AsyncCall.HandleUnaryResponse"))
{ {
TaskCompletionSource<object> delayedTcs; TaskCompletionSource<object> delayedStreamingWriteTcs = null;
TResponse msg = default(TResponse); TResponse msg = default(TResponse);
var deserializeException = TryDeserialize(receivedMessage, out msg); var deserializeException = TryDeserialize(receivedMessage, out msg);
...@@ -471,16 +470,21 @@ namespace Grpc.Core.Internal ...@@ -471,16 +470,21 @@ namespace Grpc.Core.Internal
receivedStatus = new ClientSideStatus(DeserializeResponseFailureStatus, receivedStatus.Trailers); receivedStatus = new ClientSideStatus(DeserializeResponseFailureStatus, receivedStatus.Trailers);
} }
finishedStatus = receivedStatus; finishedStatus = receivedStatus;
delayedTcs = delayedStreamingWriteTcs;
if (isStreamingWriteCompletionDelayed)
{
delayedStreamingWriteTcs = streamingWriteTcs;
streamingWriteTcs = null;
}
ReleaseResourcesIfPossible(); ReleaseResourcesIfPossible();
} }
responseHeadersTcs.SetResult(responseHeaders); responseHeadersTcs.SetResult(responseHeaders);
if (delayedTcs != null) if (delayedStreamingWriteTcs != null)
{ {
delayedTcs.SetException(GetRpcExceptionClientOnly()); delayedStreamingWriteTcs.SetException(GetRpcExceptionClientOnly());
} }
var status = receivedStatus.Status; var status = receivedStatus.Status;
...@@ -502,20 +506,24 @@ namespace Grpc.Core.Internal ...@@ -502,20 +506,24 @@ namespace Grpc.Core.Internal
// NOTE: because this event is a result of batch containing GRPC_OP_RECV_STATUS_ON_CLIENT, // NOTE: because this event is a result of batch containing GRPC_OP_RECV_STATUS_ON_CLIENT,
// success will be always set to true. // success will be always set to true.
TaskCompletionSource<object> delayedTcs; TaskCompletionSource<object> delayedStreamingWriteTcs = null;
lock (myLock) lock (myLock)
{ {
finished = true; finished = true;
finishedStatus = receivedStatus; finishedStatus = receivedStatus;
delayedTcs = delayedStreamingWriteTcs; if (isStreamingWriteCompletionDelayed)
{
delayedStreamingWriteTcs = streamingWriteTcs;
streamingWriteTcs = null;
}
ReleaseResourcesIfPossible(); ReleaseResourcesIfPossible();
} }
if (delayedTcs != null) if (delayedStreamingWriteTcs != null)
{ {
delayedTcs.SetException(GetRpcExceptionClientOnly()); delayedStreamingWriteTcs.SetException(GetRpcExceptionClientOnly());
} }
var status = receivedStatus.Status; var status = receivedStatus.Status;
......
...@@ -68,8 +68,8 @@ namespace Grpc.Core.Internal ...@@ -68,8 +68,8 @@ namespace Grpc.Core.Internal
protected TaskCompletionSource<TRead> streamingReadTcs; // Completion of a pending streaming read if not null. protected TaskCompletionSource<TRead> streamingReadTcs; // Completion of a pending streaming read if not null.
protected TaskCompletionSource<object> streamingWriteTcs; // Completion of a pending streaming write or send close from client if not null. protected TaskCompletionSource<object> streamingWriteTcs; // Completion of a pending streaming write or send close from client if not null.
protected TaskCompletionSource<object> delayedStreamingWriteTcs;
protected TaskCompletionSource<object> sendStatusFromServerTcs; protected TaskCompletionSource<object> sendStatusFromServerTcs;
protected bool isStreamingWriteCompletionDelayed; // Only used for the client side.
protected bool readingDone; // True if last read (i.e. read with null payload) was already received. protected bool readingDone; // True if last read (i.e. read with null payload) was already received.
protected bool halfcloseRequested; // True if send close have been initiated. protected bool halfcloseRequested; // True if send close have been initiated.
...@@ -263,16 +263,20 @@ namespace Grpc.Core.Internal ...@@ -263,16 +263,20 @@ namespace Grpc.Core.Internal
TaskCompletionSource<object> origTcs = null; TaskCompletionSource<object> origTcs = null;
lock (myLock) lock (myLock)
{ {
origTcs = streamingWriteTcs; if (!success && !finished && IsClient) {
streamingWriteTcs = null; // We should be setting this only once per call, following writes will be short circuited
// because they cannot start until the entire call finishes.
GrpcPreconditions.CheckState(!isStreamingWriteCompletionDelayed);
if (!success && !finished && IsClient) // leave streamingWriteTcs set, it will be completed once call finished.
{ isStreamingWriteCompletionDelayed = true;
// We should be setting this only once per call, following writes will be short circuited.
GrpcPreconditions.CheckState (delayedStreamingWriteTcs == null);
delayedStreamingWriteTcs = origTcs;
delayCompletion = true; delayCompletion = true;
} }
else
{
origTcs = streamingWriteTcs;
streamingWriteTcs = null;
}
ReleaseResourcesIfPossible(); ReleaseResourcesIfPossible();
} }
......
...@@ -208,7 +208,7 @@ namespace Grpc.Core.Internal ...@@ -208,7 +208,7 @@ namespace Grpc.Core.Internal
{ {
GrpcPreconditions.CheckState(!halfcloseRequested, "Response stream has already been completed."); GrpcPreconditions.CheckState(!halfcloseRequested, "Response stream has already been completed.");
GrpcPreconditions.CheckState(!finished, "Already finished."); GrpcPreconditions.CheckState(!finished, "Already finished.");
GrpcPreconditions.CheckState(streamingWriteTcs == null && delayedStreamingWriteTcs == null, "Only one write can be pending at a time"); GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time");
GrpcPreconditions.CheckState(!disposed); GrpcPreconditions.CheckState(!disposed);
return null; return null;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment