diff --git a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs index 9e7acab7edf19ecd04409b8632cda956f669d0e3..ab9dae6dd66a4ae3ddc90b0df0cb87c2d2ca8fa8 100644 --- a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs +++ b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs @@ -105,17 +105,17 @@ namespace Grpc.Core.Tests [Test] public void UnaryCall() { - var internalCall = new Call<string, string>(channel, EchoMethod, new CallContext()); - Assert.AreEqual("ABC", Calls.BlockingUnaryCall(internalCall, "ABC")); + var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallContext()); + Assert.AreEqual("ABC", Calls.BlockingUnaryCall(callDetails, "ABC")); } [Test] public void UnaryCall_ServerHandlerThrows() { - var internalCall = new Call<string, string>(channel, EchoMethod, new CallContext()); + var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallContext()); try { - Calls.BlockingUnaryCall(internalCall, "THROW"); + Calls.BlockingUnaryCall(callDetails, "THROW"); Assert.Fail(); } catch (RpcException e) @@ -127,10 +127,10 @@ namespace Grpc.Core.Tests [Test] public void UnaryCall_ServerHandlerThrowsRpcException() { - var internalCall = new Call<string, string>(channel, EchoMethod, new CallContext()); + var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallContext()); try { - Calls.BlockingUnaryCall(internalCall, "THROW_UNAUTHENTICATED"); + Calls.BlockingUnaryCall(callDetails, "THROW_UNAUTHENTICATED"); Assert.Fail(); } catch (RpcException e) @@ -142,10 +142,10 @@ namespace Grpc.Core.Tests [Test] public void UnaryCall_ServerHandlerSetsStatus() { - var internalCall = new Call<string, string>(channel, EchoMethod, new CallContext()); + var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallContext()); try { - Calls.BlockingUnaryCall(internalCall, "SET_UNAUTHENTICATED"); + Calls.BlockingUnaryCall(callDetails, "SET_UNAUTHENTICATED"); Assert.Fail(); } catch (RpcException e) @@ -157,18 +157,18 @@ namespace Grpc.Core.Tests [Test] public async Task AsyncUnaryCall() { - var internalCall = new Call<string, string>(channel, EchoMethod, new CallContext()); - var result = await Calls.AsyncUnaryCall(internalCall, "ABC"); + var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallContext()); + var result = await Calls.AsyncUnaryCall(callDetails, "ABC"); Assert.AreEqual("ABC", result); } [Test] public async Task AsyncUnaryCall_ServerHandlerThrows() { - var internalCall = new Call<string, string>(channel, EchoMethod, new CallContext()); + var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallContext()); try { - await Calls.AsyncUnaryCall(internalCall, "THROW"); + await Calls.AsyncUnaryCall(callDetails, "THROW"); Assert.Fail(); } catch (RpcException e) @@ -180,8 +180,8 @@ namespace Grpc.Core.Tests [Test] public async Task ClientStreamingCall() { - var internalCall = new Call<string, string>(channel, ConcatAndEchoMethod, new CallContext()); - var call = Calls.AsyncClientStreamingCall(internalCall); + var callDetails = new CallInvocationDetails<string, string>(channel, ConcatAndEchoMethod, new CallContext()); + var call = Calls.AsyncClientStreamingCall(callDetails); await call.RequestStream.WriteAll(new string[] { "A", "B", "C" }); Assert.AreEqual("ABC", await call.ResponseAsync); @@ -191,8 +191,8 @@ namespace Grpc.Core.Tests public async Task ClientStreamingCall_CancelAfterBegin() { var cts = new CancellationTokenSource(); - var internalCall = new Call<string, string>(channel, ConcatAndEchoMethod, new CallContext(cancellationToken: cts.Token)); - var call = Calls.AsyncClientStreamingCall(internalCall); + var callDetails = new CallInvocationDetails<string, string>(channel, ConcatAndEchoMethod, new CallContext(cancellationToken: cts.Token)); + var call = Calls.AsyncClientStreamingCall(callDetails); // TODO(jtattermusch): we need this to ensure call has been initiated once we cancel it. await Task.Delay(1000); @@ -216,8 +216,8 @@ namespace Grpc.Core.Tests new Metadata.Entry("ascii-header", "abcdefg"), new Metadata.Entry("binary-header-bin", new byte[] { 1, 2, 3, 0, 0xff }), }; - var internalCall = new Call<string, string>(channel, EchoMethod, new CallContext(headers: headers)); - var call = Calls.AsyncUnaryCall(internalCall, "ABC"); + var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallContext(headers: headers)); + var call = Calls.AsyncUnaryCall(callDetails, "ABC"); Assert.AreEqual("ABC", call.ResponseAsync.Result); @@ -237,25 +237,25 @@ namespace Grpc.Core.Tests { channel.Dispose(); - var internalCall = new Call<string, string>(channel, EchoMethod, new CallContext()); - Assert.Throws(typeof(ObjectDisposedException), () => Calls.BlockingUnaryCall(internalCall, "ABC")); + var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallContext()); + Assert.Throws(typeof(ObjectDisposedException), () => Calls.BlockingUnaryCall(callDetails, "ABC")); } [Test] public void UnaryCallPerformance() { - var internalCall = new Call<string, string>(channel, EchoMethod, new CallContext()); + var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallContext()); BenchmarkUtil.RunBenchmark(100, 100, - () => { Calls.BlockingUnaryCall(internalCall, "ABC"); }); + () => { Calls.BlockingUnaryCall(callDetails, "ABC"); }); } [Test] public void UnknownMethodHandler() { - var internalCall = new Call<string, string>(channel, NonexistentMethod, new CallContext()); + var callDetails = new CallInvocationDetails<string, string>(channel, NonexistentMethod, new CallContext()); try { - Calls.BlockingUnaryCall(internalCall, "ABC"); + Calls.BlockingUnaryCall(callDetails, "ABC"); Assert.Fail(); } catch (RpcException e) @@ -267,16 +267,16 @@ namespace Grpc.Core.Tests [Test] public void UserAgentStringPresent() { - var internalCall = new Call<string, string>(channel, EchoMethod, new CallContext()); - string userAgent = Calls.BlockingUnaryCall(internalCall, "RETURN-USER-AGENT"); + var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallContext()); + string userAgent = Calls.BlockingUnaryCall(callDetails, "RETURN-USER-AGENT"); Assert.IsTrue(userAgent.StartsWith("grpc-csharp/")); } [Test] public void PeerInfoPresent() { - var internalCall = new Call<string, string>(channel, EchoMethod, new CallContext()); - string peer = Calls.BlockingUnaryCall(internalCall, "RETURN-PEER"); + var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallContext()); + string peer = Calls.BlockingUnaryCall(callDetails, "RETURN-PEER"); Assert.IsTrue(peer.Contains(Host)); } @@ -288,8 +288,8 @@ namespace Grpc.Core.Tests var stateChangedTask = channel.WaitForStateChangedAsync(channel.State); - var internalCall = new Call<string, string>(channel, EchoMethod, new CallContext()); - await Calls.AsyncUnaryCall(internalCall, "abc"); + var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallContext()); + await Calls.AsyncUnaryCall(callDetails, "abc"); await stateChangedTask; Assert.AreEqual(ChannelState.Ready, channel.State); diff --git a/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs b/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs index e5fb2e54042c8a1172b6d83058de71747e9f66c5..54cd024670b98f6e321bcf40902b415079a0597c 100644 --- a/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs +++ b/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs @@ -99,12 +99,12 @@ namespace Grpc.Core.Tests public void InfiniteDeadline() { // no deadline specified, check server sees infinite deadline - var internalCall = new Call<string, string>(channel, TestMethod, new CallContext()); - Assert.AreEqual("DATETIME_MAXVALUE", Calls.BlockingUnaryCall(internalCall, "RETURN_DEADLINE")); + var callDetails = new CallInvocationDetails<string, string>(channel, TestMethod, new CallContext()); + Assert.AreEqual("DATETIME_MAXVALUE", Calls.BlockingUnaryCall(callDetails, "RETURN_DEADLINE")); // DateTime.MaxValue deadline specified, check server sees infinite deadline - var internalCall2 = new Call<string, string>(channel, TestMethod, new CallContext()); - Assert.AreEqual("DATETIME_MAXVALUE", Calls.BlockingUnaryCall(internalCall2, "RETURN_DEADLINE")); + var callDetails2 = new CallInvocationDetails<string, string>(channel, TestMethod, new CallContext()); + Assert.AreEqual("DATETIME_MAXVALUE", Calls.BlockingUnaryCall(callDetails2, "RETURN_DEADLINE")); } [Test] @@ -113,9 +113,9 @@ namespace Grpc.Core.Tests var remainingTimeClient = TimeSpan.FromDays(7); var deadline = DateTime.UtcNow + remainingTimeClient; Thread.Sleep(1000); - var internalCall = new Call<string, string>(channel, TestMethod, new CallContext(deadline: deadline)); + var callDetails = new CallInvocationDetails<string, string>(channel, TestMethod, new CallContext(deadline: deadline)); - var serverDeadlineTicksString = Calls.BlockingUnaryCall(internalCall, "RETURN_DEADLINE"); + var serverDeadlineTicksString = Calls.BlockingUnaryCall(callDetails, "RETURN_DEADLINE"); var serverDeadline = new DateTime(long.Parse(serverDeadlineTicksString), DateTimeKind.Utc); // A fairly relaxed check that the deadline set by client and deadline seen by server @@ -127,11 +127,11 @@ namespace Grpc.Core.Tests [Test] public void DeadlineInThePast() { - var internalCall = new Call<string, string>(channel, TestMethod, new CallContext(deadline: DateTime.MinValue)); + var callDetails = new CallInvocationDetails<string, string>(channel, TestMethod, new CallContext(deadline: DateTime.MinValue)); try { - Calls.BlockingUnaryCall(internalCall, "TIMEOUT"); + Calls.BlockingUnaryCall(callDetails, "TIMEOUT"); Assert.Fail(); } catch (RpcException e) @@ -145,11 +145,11 @@ namespace Grpc.Core.Tests public void DeadlineExceededStatusOnTimeout() { var deadline = DateTime.UtcNow.Add(TimeSpan.FromSeconds(5)); - var internalCall = new Call<string, string>(channel, TestMethod, new CallContext(deadline: deadline)); + var callDetails = new CallInvocationDetails<string, string>(channel, TestMethod, new CallContext(deadline: deadline)); try { - Calls.BlockingUnaryCall(internalCall, "TIMEOUT"); + Calls.BlockingUnaryCall(callDetails, "TIMEOUT"); Assert.Fail(); } catch (RpcException e) @@ -163,11 +163,11 @@ namespace Grpc.Core.Tests public void ServerReceivesCancellationOnTimeout() { var deadline = DateTime.UtcNow.Add(TimeSpan.FromSeconds(5)); - var internalCall = new Call<string, string>(channel, TestMethod, new CallContext(deadline: deadline)); + var callDetails = new CallInvocationDetails<string, string>(channel, TestMethod, new CallContext(deadline: deadline)); try { - Calls.BlockingUnaryCall(internalCall, "CHECK_CANCELLATION_RECEIVED"); + Calls.BlockingUnaryCall(callDetails, "CHECK_CANCELLATION_RECEIVED"); Assert.Fail(); } catch (RpcException e) diff --git a/src/csharp/Grpc.Core/Call.cs b/src/csharp/Grpc.Core/CallInvocationDetails.cs similarity index 68% rename from src/csharp/Grpc.Core/Call.cs rename to src/csharp/Grpc.Core/CallInvocationDetails.cs index 00ccb9d1b5612f2668d71de951828c7ebda0e635..6678b7f430a45a46b244c3b96351680ffd0b44a5 100644 --- a/src/csharp/Grpc.Core/Call.cs +++ b/src/csharp/Grpc.Core/CallInvocationDetails.cs @@ -38,25 +38,30 @@ using Grpc.Core.Utils; namespace Grpc.Core { /// <summary> - /// Abstraction of a call to be invoked on a client. + /// Details about a client-side call to be invoked. /// </summary> - public class Call<TRequest, TResponse> + public class CallInvocationDetails<TRequest, TResponse> { readonly Channel channel; - readonly Method<TRequest, TResponse> method; + readonly string method; readonly string host; + readonly Marshaller<TRequest> requestMarshaller; + readonly Marshaller<TResponse> responseMarshaller; readonly CallContext context; - public Call(Channel channel, Method<TRequest, TResponse> method, CallContext context) - : this(channel, method, null, context) + + public CallInvocationDetails(Channel channel, Method<TRequest, TResponse> method, CallContext context) : + this(channel, method.FullName, null, method.RequestMarshaller, method.ResponseMarshaller, context) { } - public Call(Channel channel, Method<TRequest, TResponse> method, string host, CallContext context) + public CallInvocationDetails(Channel channel, string method, string host, Marshaller<TRequest> requestMarshaller, Marshaller<TResponse> responseMarshaller, CallContext context) { this.channel = Preconditions.CheckNotNull(channel); this.method = Preconditions.CheckNotNull(method); this.host = host; + this.requestMarshaller = Preconditions.CheckNotNull(requestMarshaller); + this.responseMarshaller = Preconditions.CheckNotNull(responseMarshaller); this.context = Preconditions.CheckNotNull(context); } @@ -68,7 +73,7 @@ namespace Grpc.Core } } - public Method<TRequest, TResponse> Method + public string Method { get { @@ -84,6 +89,22 @@ namespace Grpc.Core } } + public Marshaller<TRequest> RequestMarshaller + { + get + { + return this.requestMarshaller; + } + } + + public Marshaller<TResponse> ResponseMarshaller + { + get + { + return this.responseMarshaller; + } + } + /// <summary> /// Call context. /// </summary> diff --git a/src/csharp/Grpc.Core/Calls.cs b/src/csharp/Grpc.Core/Calls.cs index f975bcde2247583098b7fa4de984243097899bcb..00a8cabf82a364958d125f0d697b97333f934ecd 100644 --- a/src/csharp/Grpc.Core/Calls.cs +++ b/src/csharp/Grpc.Core/Calls.cs @@ -43,71 +43,52 @@ namespace Grpc.Core /// </summary> public static class Calls { - public static TResponse BlockingUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req) + public static TResponse BlockingUnaryCall<TRequest, TResponse>(CallInvocationDetails<TRequest, TResponse> call, TRequest req) where TRequest : class where TResponse : class { - var asyncCall = new AsyncCall<TRequest, TResponse>(call.Channel, call.Method.FullName, call.Host, call.Context, - call.Method.RequestMarshaller.Serializer, call.Method.ResponseMarshaller.Deserializer); - // TODO(jtattermusch): this gives a race that cancellation can be requested before the call even starts. - RegisterCancellationCallback(asyncCall, call.Context.CancellationToken); + var asyncCall = new AsyncCall<TRequest, TResponse>(call); return asyncCall.UnaryCall(req); } - public static AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req) + public static AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(CallInvocationDetails<TRequest, TResponse> call, TRequest req) where TRequest : class where TResponse : class { - var asyncCall = new AsyncCall<TRequest, TResponse>(call.Channel, call.Method.FullName, call.Host, call.Context, - call.Method.RequestMarshaller.Serializer, call.Method.ResponseMarshaller.Deserializer); + var asyncCall = new AsyncCall<TRequest, TResponse>(call); var asyncResult = asyncCall.UnaryCallAsync(req); - RegisterCancellationCallback(asyncCall, call.Context.CancellationToken); return new AsyncUnaryCall<TResponse>(asyncResult, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); } - public static AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req) + public static AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(CallInvocationDetails<TRequest, TResponse> call, TRequest req) where TRequest : class where TResponse : class { - var asyncCall = new AsyncCall<TRequest, TResponse>(call.Channel, call.Method.FullName, call.Host, call.Context, - call.Method.RequestMarshaller.Serializer, call.Method.ResponseMarshaller.Deserializer); + var asyncCall = new AsyncCall<TRequest, TResponse>(call); asyncCall.StartServerStreamingCall(req); - RegisterCancellationCallback(asyncCall, call.Context.CancellationToken); var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall); return new AsyncServerStreamingCall<TResponse>(responseStream, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); } - public static AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call) + public static AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(CallInvocationDetails<TRequest, TResponse> call) where TRequest : class where TResponse : class { - var asyncCall = new AsyncCall<TRequest, TResponse>(call.Channel, call.Method.FullName, call.Host, call.Context, - call.Method.RequestMarshaller.Serializer, call.Method.ResponseMarshaller.Deserializer); + var asyncCall = new AsyncCall<TRequest, TResponse>(call); var resultTask = asyncCall.ClientStreamingCallAsync(); - RegisterCancellationCallback(asyncCall, call.Context.CancellationToken); var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall); return new AsyncClientStreamingCall<TRequest, TResponse>(requestStream, resultTask, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); } - public static AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call) + public static AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(CallInvocationDetails<TRequest, TResponse> call) where TRequest : class where TResponse : class { - var asyncCall = new AsyncCall<TRequest, TResponse>(call.Channel, call.Method.FullName, call.Host, call.Context, - call.Method.RequestMarshaller.Serializer, call.Method.ResponseMarshaller.Deserializer); + var asyncCall = new AsyncCall<TRequest, TResponse>(call); asyncCall.StartDuplexStreamingCall(); - RegisterCancellationCallback(asyncCall, call.Context.CancellationToken); var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall); var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall); return new AsyncDuplexStreamingCall<TRequest, TResponse>(requestStream, responseStream, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); } - - private static void RegisterCancellationCallback<TRequest, TResponse>(AsyncCall<TRequest, TResponse> asyncCall, CancellationToken token) - { - if (token.CanBeCanceled) - { - token.Register(() => asyncCall.Cancel()); - } - } } } diff --git a/src/csharp/Grpc.Core/ClientBase.cs b/src/csharp/Grpc.Core/ClientBase.cs index e9a968669454ca7378dd41936fc4bfe28ebb4e41..5cebe4e7b93b4d6c17cd59b6c7c46f1931f56e72 100644 --- a/src/csharp/Grpc.Core/ClientBase.cs +++ b/src/csharp/Grpc.Core/ClientBase.cs @@ -76,7 +76,7 @@ namespace Grpc.Core /// <summary> /// Creates a new call to given method. /// </summary> - protected Call<TRequest, TResponse> CreateCall<TRequest, TResponse>(Method<TRequest, TResponse> method, CallContext context) + protected CallInvocationDetails<TRequest, TResponse> CreateCall<TRequest, TResponse>(Method<TRequest, TResponse> method, CallContext context) where TRequest : class where TResponse : class { @@ -86,7 +86,7 @@ namespace Grpc.Core interceptor(context.Headers); context.Headers.Freeze(); } - return new Call<TRequest, TResponse>(channel, method, null, context); + return new CallInvocationDetails<TRequest, TResponse>(channel, method, context); } } } diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj index a282d57d998f2f137b27a80fbf35236beb6ef6e8..9370a0b2f5e0eb150dd3c4b5bd2647275b943aef 100644 --- a/src/csharp/Grpc.Core/Grpc.Core.csproj +++ b/src/csharp/Grpc.Core/Grpc.Core.csproj @@ -58,7 +58,6 @@ <Compile Include="Properties\AssemblyInfo.cs" /> <Compile Include="RpcException.cs" /> <Compile Include="Calls.cs" /> - <Compile Include="Call.cs" /> <Compile Include="AsyncClientStreamingCall.cs" /> <Compile Include="GrpcEnvironment.cs" /> <Compile Include="Status.cs" /> @@ -115,6 +114,7 @@ <Compile Include="Logging\ConsoleLogger.cs" /> <Compile Include="Internal\NativeLogRedirector.cs" /> <Compile Include="ChannelState.cs" /> + <Compile Include="CallInvocationDetails.cs" /> </ItemGroup> <ItemGroup> <None Include="Grpc.Core.nuspec" /> diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index ff3e99d30d0f4c1a9af133094526c5e78800f6b3..939c24acafa35f80b46e8f00b673b30b223ea959 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -50,10 +50,7 @@ namespace Grpc.Core.Internal { static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<AsyncCall<TRequest, TResponse>>(); - readonly Channel channel; - readonly string method; - readonly string host; - readonly CallContext context; + readonly CallInvocationDetails<TRequest, TResponse> callDetails; // Completion of a pending unary response if not null. TaskCompletionSource<TResponse> unaryResponseTcs; @@ -63,12 +60,10 @@ namespace Grpc.Core.Internal bool readObserverCompleted; // True if readObserver has already been completed. - public AsyncCall(Channel channel, string method, string host, CallContext context, Func<TRequest, byte[]> serializer, Func<byte[], TResponse> deserializer) : base(serializer, deserializer) + public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails) + : base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer) { - this.channel = channel; - this.method = Preconditions.CheckNotNull(method); - this.host = host; // null host means default host will be used by C-core. - this.context = context; + this.callDetails = callDetails; } // TODO: this method is not Async, so it shouldn't be in AsyncCall class, but @@ -87,13 +82,14 @@ namespace Grpc.Core.Internal lock (myLock) { Preconditions.CheckState(!started); - Initialize(cq); started = true; + Initialize(cq); + halfcloseRequested = true; readingDone = true; } - using (var metadataArray = MetadataArraySafeHandle.Create(context.Headers)) + using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Context.Headers)) { using (var ctx = BatchContextSafeHandle.Create()) { @@ -132,16 +128,17 @@ namespace Grpc.Core.Internal lock (myLock) { Preconditions.CheckState(!started); - Initialize(channel.Environment.CompletionQueue); - started = true; + + Initialize(callDetails.Channel.Environment.CompletionQueue); + halfcloseRequested = true; readingDone = true; byte[] payload = UnsafeSerialize(msg); unaryResponseTcs = new TaskCompletionSource<TResponse>(); - using (var metadataArray = MetadataArraySafeHandle.Create(context.Headers)) + using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Context.Headers)) { call.StartUnary(payload, HandleUnaryResponse, metadataArray); } @@ -158,13 +155,14 @@ namespace Grpc.Core.Internal lock (myLock) { Preconditions.CheckState(!started); - Initialize(channel.Environment.CompletionQueue); - started = true; + + Initialize(callDetails.Channel.Environment.CompletionQueue); + readingDone = true; unaryResponseTcs = new TaskCompletionSource<TResponse>(); - using (var metadataArray = MetadataArraySafeHandle.Create(context.Headers)) + using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Context.Headers)) { call.StartClientStreaming(HandleUnaryResponse, metadataArray); } @@ -181,15 +179,16 @@ namespace Grpc.Core.Internal lock (myLock) { Preconditions.CheckState(!started); - Initialize(channel.Environment.CompletionQueue); - started = true; + + Initialize(callDetails.Channel.Environment.CompletionQueue); + halfcloseRequested = true; halfclosed = true; // halfclose not confirmed yet, but it will be once finishedHandler is called. byte[] payload = UnsafeSerialize(msg); - using (var metadataArray = MetadataArraySafeHandle.Create(context.Headers)) + using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Context.Headers)) { call.StartServerStreaming(payload, HandleFinished, metadataArray); } @@ -205,11 +204,11 @@ namespace Grpc.Core.Internal lock (myLock) { Preconditions.CheckState(!started); - Initialize(channel.Environment.CompletionQueue); - started = true; - using (var metadataArray = MetadataArraySafeHandle.Create(context.Headers)) + Initialize(callDetails.Channel.Environment.CompletionQueue); + + using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Context.Headers)) { call.StartDuplexStreaming(HandleFinished, metadataArray); } @@ -311,14 +310,26 @@ namespace Grpc.Core.Internal protected override void OnReleaseResources() { - channel.Environment.DebugStats.ActiveClientCalls.Decrement(); + callDetails.Channel.Environment.DebugStats.ActiveClientCalls.Decrement(); } private void Initialize(CompletionQueueSafeHandle cq) { - var call = channel.Handle.CreateCall(channel.Environment.CompletionRegistry, cq, method, host, Timespec.FromDateTime(context.Deadline)); - channel.Environment.DebugStats.ActiveClientCalls.Increment(); + var call = callDetails.Channel.Handle.CreateCall(callDetails.Channel.Environment.CompletionRegistry, cq, + callDetails.Method, callDetails.Host, Timespec.FromDateTime(callDetails.Context.Deadline)); + callDetails.Channel.Environment.DebugStats.ActiveClientCalls.Increment(); InitializeInternal(call); + RegisterCancellationCallback(); + } + + // Make sure that once cancellationToken for this call is cancelled, Cancel() will be called. + private void RegisterCancellationCallback() + { + var token = callDetails.Context.CancellationToken; + if (token.CanBeCanceled) + { + token.Register(() => this.Cancel()); + } } /// <summary>