diff --git a/src/compiler/csharp_generator.cc b/src/compiler/csharp_generator.cc index e0c1bcda19c9ee4c050a576e53e08deeed4d431c..efd39e8ac502a6c469ba0df5234166e411d001cb 100644 --- a/src/compiler/csharp_generator.cc +++ b/src/compiler/csharp_generator.cc @@ -273,6 +273,13 @@ void GenerateClientInterface(Printer* out, const ServiceDescriptor *service) { "methodname", method->name(), "request", GetClassName(method->input_type()), "response", GetClassName(method->output_type())); + + // overload taking CallContext as a param + out->Print( + "$response$ $methodname$($request$ request, CallContext context);\n", + "methodname", method->name(), "request", + GetClassName(method->input_type()), "response", + GetClassName(method->output_type())); } std::string method_name = method->name(); @@ -284,6 +291,13 @@ void GenerateClientInterface(Printer* out, const ServiceDescriptor *service) { "methodname", method_name, "request_maybe", GetMethodRequestParamMaybe(method), "returntype", GetMethodReturnTypeClient(method)); + + // overload taking CallContext as a param + out->Print( + "$returntype$ $methodname$($request_maybe$CallContext context);\n", + "methodname", method_name, "request_maybe", + GetMethodRequestParamMaybe(method), "returntype", + GetMethodReturnTypeClient(method)); } out->Outdent(); out->Print("}\n"); @@ -340,10 +354,25 @@ void GenerateClientStub(Printer* out, const ServiceDescriptor *service) { GetClassName(method->output_type())); out->Print("{\n"); out->Indent(); - out->Print("var call = CreateCall($servicenamefield$, $methodfield$, headers, deadline);\n", + out->Print("var call = CreateCall($servicenamefield$, $methodfield$, new CallContext(headers, deadline, cancellationToken));\n", + "servicenamefield", GetServiceNameFieldName(), "methodfield", + GetMethodFieldName(method)); + out->Print("return Calls.BlockingUnaryCall(call, request);\n"); + out->Outdent(); + out->Print("}\n"); + + // overload taking CallContext as a param + out->Print( + "public $response$ $methodname$($request$ request, CallContext context)\n", + "methodname", method->name(), "request", + GetClassName(method->input_type()), "response", + GetClassName(method->output_type())); + out->Print("{\n"); + out->Indent(); + out->Print("var call = CreateCall($servicenamefield$, $methodfield$, context);\n", "servicenamefield", GetServiceNameFieldName(), "methodfield", GetMethodFieldName(method)); - out->Print("return Calls.BlockingUnaryCall(call, request, cancellationToken);\n"); + out->Print("return Calls.BlockingUnaryCall(call, request);\n"); out->Outdent(); out->Print("}\n"); } @@ -359,26 +388,57 @@ void GenerateClientStub(Printer* out, const ServiceDescriptor *service) { GetMethodReturnTypeClient(method)); out->Print("{\n"); out->Indent(); - out->Print("var call = CreateCall($servicenamefield$, $methodfield$, headers, deadline);\n", + out->Print("var call = CreateCall($servicenamefield$, $methodfield$, new CallContext(headers, deadline, cancellationToken));\n", "servicenamefield", GetServiceNameFieldName(), "methodfield", GetMethodFieldName(method)); switch (GetMethodType(method)) { case METHODTYPE_NO_STREAMING: - out->Print("return Calls.AsyncUnaryCall(call, request, cancellationToken);\n"); + out->Print("return Calls.AsyncUnaryCall(call, request);\n"); break; case METHODTYPE_CLIENT_STREAMING: - out->Print("return Calls.AsyncClientStreamingCall(call, cancellationToken);\n"); + out->Print("return Calls.AsyncClientStreamingCall(call);\n"); break; case METHODTYPE_SERVER_STREAMING: out->Print( - "return Calls.AsyncServerStreamingCall(call, request, cancellationToken);\n"); + "return Calls.AsyncServerStreamingCall(call, request);\n"); break; case METHODTYPE_BIDI_STREAMING: - out->Print("return Calls.AsyncDuplexStreamingCall(call, cancellationToken);\n"); + out->Print("return Calls.AsyncDuplexStreamingCall(call);\n"); break; default: GOOGLE_LOG(FATAL)<< "Can't get here."; - } + } + out->Outdent(); + out->Print("}\n"); + + // overload taking CallContext as a param + out->Print( + "public $returntype$ $methodname$($request_maybe$CallContext context)\n", + "methodname", method_name, "request_maybe", + GetMethodRequestParamMaybe(method), "returntype", + GetMethodReturnTypeClient(method)); + out->Print("{\n"); + out->Indent(); + out->Print("var call = CreateCall($servicenamefield$, $methodfield$, context);\n", + "servicenamefield", GetServiceNameFieldName(), "methodfield", + GetMethodFieldName(method)); + switch (GetMethodType(method)) { + case METHODTYPE_NO_STREAMING: + out->Print("return Calls.AsyncUnaryCall(call, request);\n"); + break; + case METHODTYPE_CLIENT_STREAMING: + out->Print("return Calls.AsyncClientStreamingCall(call);\n"); + break; + case METHODTYPE_SERVER_STREAMING: + out->Print( + "return Calls.AsyncServerStreamingCall(call, request);\n"); + break; + case METHODTYPE_BIDI_STREAMING: + out->Print("return Calls.AsyncDuplexStreamingCall(call);\n"); + break; + default: + GOOGLE_LOG(FATAL)<< "Can't get here."; + } out->Outdent(); out->Print("}\n"); } diff --git a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs index bf7cc3fbf31c86c32466112fe2f7d9a88d07f821..d289ded6bd1254e2f7bc0d008e1aee4689de071d 100644 --- a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs +++ b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs @@ -102,17 +102,17 @@ namespace Grpc.Core.Tests [Test] public void UnaryCall() { - var internalCall = new Call<string, string>(ServiceName, EchoMethod, channel, Metadata.Empty); - Assert.AreEqual("ABC", Calls.BlockingUnaryCall(internalCall, "ABC", CancellationToken.None)); + var internalCall = new Call<string, string>(ServiceName, EchoMethod, channel, new CallContext()); + Assert.AreEqual("ABC", Calls.BlockingUnaryCall(internalCall, "ABC")); } [Test] public void UnaryCall_ServerHandlerThrows() { - var internalCall = new Call<string, string>(ServiceName, EchoMethod, channel, Metadata.Empty); + var internalCall = new Call<string, string>(ServiceName, EchoMethod, channel, new CallContext()); try { - Calls.BlockingUnaryCall(internalCall, "THROW", CancellationToken.None); + Calls.BlockingUnaryCall(internalCall, "THROW"); Assert.Fail(); } catch (RpcException e) @@ -124,10 +124,10 @@ namespace Grpc.Core.Tests [Test] public void UnaryCall_ServerHandlerThrowsRpcException() { - var internalCall = new Call<string, string>(ServiceName, EchoMethod, channel, Metadata.Empty); + var internalCall = new Call<string, string>(ServiceName, EchoMethod, channel, new CallContext()); try { - Calls.BlockingUnaryCall(internalCall, "THROW_UNAUTHENTICATED", CancellationToken.None); + Calls.BlockingUnaryCall(internalCall, "THROW_UNAUTHENTICATED"); Assert.Fail(); } catch (RpcException e) @@ -139,10 +139,10 @@ namespace Grpc.Core.Tests [Test] public void UnaryCall_ServerHandlerSetsStatus() { - var internalCall = new Call<string, string>(ServiceName, EchoMethod, channel, Metadata.Empty); + var internalCall = new Call<string, string>(ServiceName, EchoMethod, channel, new CallContext()); try { - Calls.BlockingUnaryCall(internalCall, "SET_UNAUTHENTICATED", CancellationToken.None); + Calls.BlockingUnaryCall(internalCall, "SET_UNAUTHENTICATED"); Assert.Fail(); } catch (RpcException e) @@ -152,20 +152,20 @@ namespace Grpc.Core.Tests } [Test] - public void AsyncUnaryCall() + public async Task AsyncUnaryCall() { - var internalCall = new Call<string, string>(ServiceName, EchoMethod, channel, Metadata.Empty); - var result = Calls.AsyncUnaryCall(internalCall, "ABC", CancellationToken.None).ResponseAsync.Result; + var internalCall = new Call<string, string>(ServiceName, EchoMethod, channel, new CallContext()); + var result = await Calls.AsyncUnaryCall(internalCall, "ABC"); Assert.AreEqual("ABC", result); } [Test] public async Task AsyncUnaryCall_ServerHandlerThrows() { - var internalCall = new Call<string, string>(ServiceName, EchoMethod, channel, Metadata.Empty); + var internalCall = new Call<string, string>(ServiceName, EchoMethod, channel, new CallContext()); try { - await Calls.AsyncUnaryCall(internalCall, "THROW", CancellationToken.None); + await Calls.AsyncUnaryCall(internalCall, "THROW"); Assert.Fail(); } catch (RpcException e) @@ -177,8 +177,8 @@ namespace Grpc.Core.Tests [Test] public async Task ClientStreamingCall() { - var internalCall = new Call<string, string>(ServiceName, ConcatAndEchoMethod, channel, Metadata.Empty); - var call = Calls.AsyncClientStreamingCall(internalCall, CancellationToken.None); + var internalCall = new Call<string, string>(ServiceName, ConcatAndEchoMethod, channel, new CallContext()); + var call = Calls.AsyncClientStreamingCall(internalCall); await call.RequestStream.WriteAll(new string[] { "A", "B", "C" }); Assert.AreEqual("ABC", await call.ResponseAsync); @@ -187,10 +187,9 @@ namespace Grpc.Core.Tests [Test] public async Task ClientStreamingCall_CancelAfterBegin() { - var internalCall = new Call<string, string>(ServiceName, ConcatAndEchoMethod, channel, Metadata.Empty); - var cts = new CancellationTokenSource(); - var call = Calls.AsyncClientStreamingCall(internalCall, cts.Token); + var internalCall = new Call<string, string>(ServiceName, ConcatAndEchoMethod, channel, new CallContext(cancellationToken: cts.Token)); + var call = Calls.AsyncClientStreamingCall(internalCall); // TODO(jtattermusch): we need this to ensure call has been initiated once we cancel it. await Task.Delay(1000); @@ -214,8 +213,8 @@ namespace Grpc.Core.Tests new Metadata.Entry("ascii-header", "abcdefg"), new Metadata.Entry("binary-header-bin", new byte[] { 1, 2, 3, 0, 0xff }), }; - var internalCall = new Call<string, string>(ServiceName, EchoMethod, channel, headers); - var call = Calls.AsyncUnaryCall(internalCall, "ABC", CancellationToken.None); + var internalCall = new Call<string, string>(ServiceName, EchoMethod, channel, new CallContext(headers: headers)); + var call = Calls.AsyncUnaryCall(internalCall, "ABC"); Assert.AreEqual("ABC", call.ResponseAsync.Result); @@ -235,25 +234,25 @@ namespace Grpc.Core.Tests { channel.Dispose(); - var internalCall = new Call<string, string>(ServiceName, EchoMethod, channel, Metadata.Empty); - Assert.Throws(typeof(ObjectDisposedException), () => Calls.BlockingUnaryCall(internalCall, "ABC", CancellationToken.None)); + var internalCall = new Call<string, string>(ServiceName, EchoMethod, channel, new CallContext()); + Assert.Throws(typeof(ObjectDisposedException), () => Calls.BlockingUnaryCall(internalCall, "ABC")); } [Test] public void UnaryCallPerformance() { - var internalCall = new Call<string, string>(ServiceName, EchoMethod, channel, Metadata.Empty); + var internalCall = new Call<string, string>(ServiceName, EchoMethod, channel, new CallContext()); BenchmarkUtil.RunBenchmark(100, 100, - () => { Calls.BlockingUnaryCall(internalCall, "ABC", default(CancellationToken)); }); + () => { Calls.BlockingUnaryCall(internalCall, "ABC"); }); } [Test] public void UnknownMethodHandler() { - var internalCall = new Call<string, string>(ServiceName, NonexistentMethod, channel, Metadata.Empty); + var internalCall = new Call<string, string>(ServiceName, NonexistentMethod, channel, new CallContext()); try { - Calls.BlockingUnaryCall(internalCall, "ABC", default(CancellationToken)); + Calls.BlockingUnaryCall(internalCall, "ABC"); Assert.Fail(); } catch (RpcException e) @@ -265,16 +264,16 @@ namespace Grpc.Core.Tests [Test] public void UserAgentStringPresent() { - var internalCall = new Call<string, string>(ServiceName, EchoMethod, channel, Metadata.Empty); - string userAgent = Calls.BlockingUnaryCall(internalCall, "RETURN-USER-AGENT", CancellationToken.None); + var internalCall = new Call<string, string>(ServiceName, EchoMethod, channel, new CallContext()); + string userAgent = Calls.BlockingUnaryCall(internalCall, "RETURN-USER-AGENT"); Assert.IsTrue(userAgent.StartsWith("grpc-csharp/")); } [Test] public void PeerInfoPresent() { - var internalCall = new Call<string, string>(ServiceName, EchoMethod, channel, Metadata.Empty); - string peer = Calls.BlockingUnaryCall(internalCall, "RETURN-PEER", CancellationToken.None); + var internalCall = new Call<string, string>(ServiceName, EchoMethod, channel, new CallContext()); + string peer = Calls.BlockingUnaryCall(internalCall, "RETURN-PEER"); Assert.IsTrue(peer.Contains(Host)); } @@ -286,8 +285,8 @@ namespace Grpc.Core.Tests var stateChangedTask = channel.WaitForStateChangedAsync(channel.State); - var internalCall = new Call<string, string>(ServiceName, EchoMethod, channel, Metadata.Empty); - await Calls.AsyncUnaryCall(internalCall, "abc", CancellationToken.None); + var internalCall = new Call<string, string>(ServiceName, EchoMethod, channel, new CallContext()); + await Calls.AsyncUnaryCall(internalCall, "abc"); await stateChangedTask; Assert.AreEqual(ChannelState.Ready, channel.State); diff --git a/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs b/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs index d84801fbacea93fdeaacd9d1c1e37b6e110e962e..2dea8d06e1c4f382d05981135a6331183a0d60fe 100644 --- a/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs +++ b/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs @@ -98,12 +98,12 @@ namespace Grpc.Core.Tests public void InfiniteDeadline() { // no deadline specified, check server sees infinite deadline - var internalCall = new Call<string, string>(ServiceName, TestMethod, channel, Metadata.Empty); - Assert.AreEqual("DATETIME_MAXVALUE", Calls.BlockingUnaryCall(internalCall, "RETURN_DEADLINE", CancellationToken.None)); + var internalCall = new Call<string, string>(ServiceName, TestMethod, channel, new CallContext()); + Assert.AreEqual("DATETIME_MAXVALUE", Calls.BlockingUnaryCall(internalCall, "RETURN_DEADLINE")); // DateTime.MaxValue deadline specified, check server sees infinite deadline - var internalCall2 = new Call<string, string>(ServiceName, TestMethod, channel, Metadata.Empty, DateTime.MaxValue); - Assert.AreEqual("DATETIME_MAXVALUE", Calls.BlockingUnaryCall(internalCall2, "RETURN_DEADLINE", CancellationToken.None)); + var internalCall2 = new Call<string, string>(ServiceName, TestMethod, channel, new CallContext()); + Assert.AreEqual("DATETIME_MAXVALUE", Calls.BlockingUnaryCall(internalCall2, "RETURN_DEADLINE")); } [Test] @@ -112,9 +112,9 @@ namespace Grpc.Core.Tests var remainingTimeClient = TimeSpan.FromDays(7); var deadline = DateTime.UtcNow + remainingTimeClient; Thread.Sleep(1000); - var internalCall = new Call<string, string>(ServiceName, TestMethod, channel, Metadata.Empty, deadline); + var internalCall = new Call<string, string>(ServiceName, TestMethod, channel, new CallContext(deadline: deadline)); - var serverDeadlineTicksString = Calls.BlockingUnaryCall(internalCall, "RETURN_DEADLINE", CancellationToken.None); + var serverDeadlineTicksString = Calls.BlockingUnaryCall(internalCall, "RETURN_DEADLINE"); var serverDeadline = new DateTime(long.Parse(serverDeadlineTicksString), DateTimeKind.Utc); // A fairly relaxed check that the deadline set by client and deadline seen by server @@ -127,11 +127,11 @@ namespace Grpc.Core.Tests public void DeadlineInThePast() { var deadline = DateTime.MinValue; - var internalCall = new Call<string, string>(ServiceName, TestMethod, channel, Metadata.Empty, deadline); + var internalCall = new Call<string, string>(ServiceName, TestMethod, channel, new CallContext()); try { - Calls.BlockingUnaryCall(internalCall, "TIMEOUT", CancellationToken.None); + Calls.BlockingUnaryCall(internalCall, "TIMEOUT"); Assert.Fail(); } catch (RpcException e) @@ -145,11 +145,11 @@ namespace Grpc.Core.Tests public void DeadlineExceededStatusOnTimeout() { var deadline = DateTime.UtcNow.Add(TimeSpan.FromSeconds(5)); - var internalCall = new Call<string, string>(ServiceName, TestMethod, channel, Metadata.Empty, deadline); + var internalCall = new Call<string, string>(ServiceName, TestMethod, channel, new CallContext(deadline: deadline)); try { - Calls.BlockingUnaryCall(internalCall, "TIMEOUT", CancellationToken.None); + Calls.BlockingUnaryCall(internalCall, "TIMEOUT"); Assert.Fail(); } catch (RpcException e) @@ -163,11 +163,11 @@ namespace Grpc.Core.Tests public void ServerReceivesCancellationOnTimeout() { var deadline = DateTime.UtcNow.Add(TimeSpan.FromSeconds(5)); - var internalCall = new Call<string, string>(ServiceName, TestMethod, channel, Metadata.Empty, deadline); + var internalCall = new Call<string, string>(ServiceName, TestMethod, channel, new CallContext(deadline: deadline)); try { - Calls.BlockingUnaryCall(internalCall, "CHECK_CANCELLATION_RECEIVED", CancellationToken.None); + Calls.BlockingUnaryCall(internalCall, "CHECK_CANCELLATION_RECEIVED"); Assert.Fail(); } catch (RpcException e) diff --git a/src/csharp/Grpc.Core/Call.cs b/src/csharp/Grpc.Core/Call.cs index 94c5e26082264f720a367d1263ad715416bae536..f9d1fde5489da80ba6639e20f8da513685fb608d 100644 --- a/src/csharp/Grpc.Core/Call.cs +++ b/src/csharp/Grpc.Core/Call.cs @@ -46,22 +46,14 @@ namespace Grpc.Core readonly Marshaller<TRequest> requestMarshaller; readonly Marshaller<TResponse> responseMarshaller; readonly Channel channel; - readonly Metadata headers; - readonly DateTime deadline; + readonly CallContext context; - public Call(string serviceName, Method<TRequest, TResponse> method, Channel channel, Metadata headers) - : this(serviceName, method, channel, headers, DateTime.MaxValue) - { - } - - public Call(string serviceName, Method<TRequest, TResponse> method, Channel channel, Metadata headers, DateTime deadline) + public Call(string serviceName, Method<TRequest, TResponse> method, Channel channel, CallContext context) { this.name = method.GetFullName(serviceName); this.requestMarshaller = method.RequestMarshaller; this.responseMarshaller = method.ResponseMarshaller; - this.channel = Preconditions.CheckNotNull(channel); - this.headers = Preconditions.CheckNotNull(headers); - this.deadline = deadline; + this.context = context; } public Channel Channel @@ -84,21 +76,13 @@ namespace Grpc.Core } /// <summary> - /// Headers to send at the beginning of the call. + /// Call context. /// </summary> - public Metadata Headers - { - get - { - return headers; - } - } - - public DateTime Deadline + public CallContext Context { get { - return this.deadline; + return context; } } diff --git a/src/csharp/Grpc.Core/CallContext.cs b/src/csharp/Grpc.Core/CallContext.cs new file mode 100644 index 0000000000000000000000000000000000000000..2787d3f5b3a01e80a17c5d6f18b65f3346c78377 --- /dev/null +++ b/src/csharp/Grpc.Core/CallContext.cs @@ -0,0 +1,89 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Threading; + +using Grpc.Core.Internal; +using Grpc.Core.Utils; + +namespace Grpc.Core +{ + /// <summary> + /// Context for calls made by client. + /// </summary> + public class CallContext + { + readonly Metadata headers; + readonly DateTime deadline; + readonly CancellationToken cancellationToken; + + /// <summary> + /// Creates a new call context. + /// </summary> + /// <param name="headers">Headers to be sent with the call.</param> + /// <param name="deadline">Deadline for the call to finish. null means no deadline.</param> + /// <param name="cancellationToken">Can be used to request cancellation of the call.</param> + public CallContext(Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)) + { + // TODO(jtattermusch): consider only creating metadata object once it's really needed. + this.headers = headers != null ? headers : new Metadata(); + this.deadline = deadline.HasValue ? deadline.Value : DateTime.MaxValue; + this.cancellationToken = cancellationToken; + } + + /// <summary> + /// Headers to send at the beginning of the call. + /// </summary> + public Metadata Headers + { + get { return headers; } + } + + /// <summary> + /// Call deadline. + /// </summary> + public DateTime Deadline + { + get { return deadline; } + } + + /// <summary> + /// Token that can be used for cancelling the call. + /// </summary> + public CancellationToken CancellationToken + { + get { return cancellationToken; } + } + } +} diff --git a/src/csharp/Grpc.Core/Calls.cs b/src/csharp/Grpc.Core/Calls.cs index 054fc2749170516f4c4ead69ea0439a4e18815c0..f3c363bda2fbed1e3af7ade09ac1b4e39e499861 100644 --- a/src/csharp/Grpc.Core/Calls.cs +++ b/src/csharp/Grpc.Core/Calls.cs @@ -43,59 +43,59 @@ namespace Grpc.Core /// </summary> public static class Calls { - public static TResponse BlockingUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, CancellationToken token) + public static TResponse BlockingUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req) where TRequest : class where TResponse : class { var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer); // TODO(jtattermusch): this gives a race that cancellation can be requested before the call even starts. - RegisterCancellationCallback(asyncCall, token); - return asyncCall.UnaryCall(call.Channel, call.Name, req, call.Headers, call.Deadline); + RegisterCancellationCallback(asyncCall, call.Context.CancellationToken); + return asyncCall.UnaryCall(call.Channel, call.Name, req, call.Context.Headers, call.Context.Deadline); } - public static AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, CancellationToken token) + public static AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req) where TRequest : class where TResponse : class { var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer); - asyncCall.Initialize(call.Channel, call.Channel.CompletionQueue, call.Name, Timespec.FromDateTime(call.Deadline)); - var asyncResult = asyncCall.UnaryCallAsync(req, call.Headers, call.Deadline); - RegisterCancellationCallback(asyncCall, token); + asyncCall.Initialize(call.Channel, call.Channel.Environment.CompletionQueue, call.Name, Timespec.FromDateTime(call.Context.Deadline)); + var asyncResult = asyncCall.UnaryCallAsync(req, call.Context.Headers, call.Context.Deadline); + RegisterCancellationCallback(asyncCall, call.Context.CancellationToken); return new AsyncUnaryCall<TResponse>(asyncResult, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); } - public static AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, CancellationToken token) + public static AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req) where TRequest : class where TResponse : class { var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer); - asyncCall.Initialize(call.Channel, call.Channel.CompletionQueue, call.Name, Timespec.FromDateTime(call.Deadline)); - asyncCall.StartServerStreamingCall(req, call.Headers, call.Deadline); - RegisterCancellationCallback(asyncCall, token); + asyncCall.Initialize(call.Channel, call.Channel.Environment.CompletionQueue, call.Name, Timespec.FromDateTime(call.Context.Deadline)); + asyncCall.StartServerStreamingCall(req, call.Context.Headers, call.Context.Deadline); + RegisterCancellationCallback(asyncCall, call.Context.CancellationToken); var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall); return new AsyncServerStreamingCall<TResponse>(responseStream, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); } - public static AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, CancellationToken token) + public static AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call) where TRequest : class where TResponse : class { var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer); - asyncCall.Initialize(call.Channel, call.Channel.CompletionQueue, call.Name, Timespec.FromDateTime(call.Deadline)); - var resultTask = asyncCall.ClientStreamingCallAsync(call.Headers, call.Deadline); - RegisterCancellationCallback(asyncCall, token); + asyncCall.Initialize(call.Channel, call.Channel.Environment.CompletionQueue, call.Name, Timespec.FromDateTime(call.Context.Deadline)); + var resultTask = asyncCall.ClientStreamingCallAsync(call.Context.Headers, call.Context.Deadline); + RegisterCancellationCallback(asyncCall, call.Context.CancellationToken); var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall); return new AsyncClientStreamingCall<TRequest, TResponse>(requestStream, resultTask, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); } - public static AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, CancellationToken token) + public static AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call) where TRequest : class where TResponse : class { var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer); - asyncCall.Initialize(call.Channel, call.Channel.CompletionQueue, call.Name, Timespec.FromDateTime(call.Deadline)); - asyncCall.StartDuplexStreamingCall(call.Headers, call.Deadline); - RegisterCancellationCallback(asyncCall, token); + asyncCall.Initialize(call.Channel, call.Channel.Environment.CompletionQueue, call.Name, Timespec.FromDateTime(call.Context.Deadline)); + asyncCall.StartDuplexStreamingCall(call.Context.Headers, call.Context.Deadline); + RegisterCancellationCallback(asyncCall, call.Context.CancellationToken); var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall); var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall); return new AsyncDuplexStreamingCall<TRequest, TResponse>(requestStream, responseStream, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); diff --git a/src/csharp/Grpc.Core/Channel.cs b/src/csharp/Grpc.Core/Channel.cs index 0b696104438978eff78e35f3f9d98414355b573b..9273ea4582c2853c738b8c62798ac42ce46f56a3 100644 --- a/src/csharp/Grpc.Core/Channel.cs +++ b/src/csharp/Grpc.Core/Channel.cs @@ -178,22 +178,6 @@ namespace Grpc.Core } } - internal CompletionQueueSafeHandle CompletionQueue - { - get - { - return this.environment.CompletionQueue; - } - } - - internal CompletionRegistry CompletionRegistry - { - get - { - return this.environment.CompletionRegistry; - } - } - internal GrpcEnvironment Environment { get diff --git a/src/csharp/Grpc.Core/ClientBase.cs b/src/csharp/Grpc.Core/ClientBase.cs index fd3473128a975faebfc99fc59cdd75f5cb3a4dd6..55e3f33b3e21a45f4c421cabe89abd8d58c627e3 100644 --- a/src/csharp/Grpc.Core/ClientBase.cs +++ b/src/csharp/Grpc.Core/ClientBase.cs @@ -76,19 +76,17 @@ namespace Grpc.Core /// <summary> /// Creates a new call to given method. /// </summary> - protected Call<TRequest, TResponse> CreateCall<TRequest, TResponse>(string serviceName, Method<TRequest, TResponse> method, Metadata metadata, DateTime? deadline) + protected Call<TRequest, TResponse> CreateCall<TRequest, TResponse>(string serviceName, Method<TRequest, TResponse> method, CallContext context) where TRequest : class where TResponse : class { var interceptor = HeaderInterceptor; if (interceptor != null) { - metadata = metadata ?? new Metadata(); - interceptor(metadata); - metadata.Freeze(); + interceptor(context.Headers); + context.Headers.Freeze(); } - return new Call<TRequest, TResponse>(serviceName, method, channel, - metadata ?? Metadata.Empty, deadline ?? DateTime.MaxValue); + return new Call<TRequest, TResponse>(serviceName, method, channel, context); } } } diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj index 17add7716498d7a19bb7e5006a40bc20f20b7a04..a282d57d998f2f137b27a80fbf35236beb6ef6e8 100644 --- a/src/csharp/Grpc.Core/Grpc.Core.csproj +++ b/src/csharp/Grpc.Core/Grpc.Core.csproj @@ -48,6 +48,7 @@ <ItemGroup> <Compile Include="AsyncDuplexStreamingCall.cs" /> <Compile Include="AsyncServerStreamingCall.cs" /> + <Compile Include="CallContext.cs" /> <Compile Include="IClientStreamWriter.cs" /> <Compile Include="IServerStreamWriter.cs" /> <Compile Include="IAsyncStreamWriter.cs" /> diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index 48f466460f5a22cce6061443fd51b44b5e5329f7..f84c4b46331e4843df4015e176636b35bdfb035d 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -67,7 +67,7 @@ namespace Grpc.Core.Internal public void Initialize(Channel channel, CompletionQueueSafeHandle cq, string methodName, Timespec deadline) { this.channel = channel; - var call = channel.Handle.CreateCall(channel.CompletionRegistry, cq, methodName, null, deadline); + var call = channel.Handle.CreateCall(channel.Environment.CompletionRegistry, cq, methodName, null, deadline); channel.Environment.DebugStats.ActiveClientCalls.Increment(); InitializeInternal(call); } diff --git a/src/csharp/Grpc.Core/ServerCallContext.cs b/src/csharp/Grpc.Core/ServerCallContext.cs index 0c48adaea52b1a569a4a31de068a8ee06c82de47..032b1390db349318075646bdc5d245b4162df853 100644 --- a/src/csharp/Grpc.Core/ServerCallContext.cs +++ b/src/csharp/Grpc.Core/ServerCallContext.cs @@ -65,7 +65,7 @@ namespace Grpc.Core this.cancellationToken = cancellationToken; } - /// <summary> Name of method called in this RPC. </summary> + /// <summary>Name of method called in this RPC.</summary> public string Method { get @@ -74,7 +74,7 @@ namespace Grpc.Core } } - /// <summary> Name of host called in this RPC. </summary> + /// <summary>Name of host called in this RPC.</summary> public string Host { get @@ -83,7 +83,7 @@ namespace Grpc.Core } } - /// <summary> Address of the remote endpoint in URI format. </summary> + /// <summary>Address of the remote endpoint in URI format.</summary> public string Peer { get @@ -92,7 +92,7 @@ namespace Grpc.Core } } - /// <summary> Deadline for this RPC. </summary> + /// <summary>Deadline for this RPC.</summary> public DateTime Deadline { get @@ -101,7 +101,7 @@ namespace Grpc.Core } } - /// <summary> Initial metadata sent by client. </summary> + /// <summary>Initial metadata sent by client.</summary> public Metadata RequestHeaders { get @@ -110,8 +110,7 @@ namespace Grpc.Core } } - // TODO(jtattermusch): support signalling cancellation. - /// <summary> Cancellation token signals when call is cancelled. </summary> + ///<summary>Cancellation token signals when call is cancelled.</summary> public CancellationToken CancellationToken { get @@ -120,7 +119,7 @@ namespace Grpc.Core } } - /// <summary> Trailers to send back to client after RPC finishes.</summary> + /// <summary>Trailers to send back to client after RPC finishes.</summary> public Metadata ResponseTrailers { get