Skip to content
Snippets Groups Projects
Commit 5cb5ceda authored by Jan Tattermusch's avatar Jan Tattermusch
Browse files

refactoring client side calls

parent cc97fed6
No related branches found
No related tags found
No related merge requests found
...@@ -105,17 +105,17 @@ namespace Grpc.Core.Tests ...@@ -105,17 +105,17 @@ namespace Grpc.Core.Tests
[Test] [Test]
public void UnaryCall() public void UnaryCall()
{ {
var internalCall = new Call<string, string>(channel, EchoMethod, new CallContext()); var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallContext());
Assert.AreEqual("ABC", Calls.BlockingUnaryCall(internalCall, "ABC")); Assert.AreEqual("ABC", Calls.BlockingUnaryCall(callDetails, "ABC"));
} }
[Test] [Test]
public void UnaryCall_ServerHandlerThrows() public void UnaryCall_ServerHandlerThrows()
{ {
var internalCall = new Call<string, string>(channel, EchoMethod, new CallContext()); var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallContext());
try try
{ {
Calls.BlockingUnaryCall(internalCall, "THROW"); Calls.BlockingUnaryCall(callDetails, "THROW");
Assert.Fail(); Assert.Fail();
} }
catch (RpcException e) catch (RpcException e)
...@@ -127,10 +127,10 @@ namespace Grpc.Core.Tests ...@@ -127,10 +127,10 @@ namespace Grpc.Core.Tests
[Test] [Test]
public void UnaryCall_ServerHandlerThrowsRpcException() public void UnaryCall_ServerHandlerThrowsRpcException()
{ {
var internalCall = new Call<string, string>(channel, EchoMethod, new CallContext()); var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallContext());
try try
{ {
Calls.BlockingUnaryCall(internalCall, "THROW_UNAUTHENTICATED"); Calls.BlockingUnaryCall(callDetails, "THROW_UNAUTHENTICATED");
Assert.Fail(); Assert.Fail();
} }
catch (RpcException e) catch (RpcException e)
...@@ -142,10 +142,10 @@ namespace Grpc.Core.Tests ...@@ -142,10 +142,10 @@ namespace Grpc.Core.Tests
[Test] [Test]
public void UnaryCall_ServerHandlerSetsStatus() public void UnaryCall_ServerHandlerSetsStatus()
{ {
var internalCall = new Call<string, string>(channel, EchoMethod, new CallContext()); var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallContext());
try try
{ {
Calls.BlockingUnaryCall(internalCall, "SET_UNAUTHENTICATED"); Calls.BlockingUnaryCall(callDetails, "SET_UNAUTHENTICATED");
Assert.Fail(); Assert.Fail();
} }
catch (RpcException e) catch (RpcException e)
...@@ -157,18 +157,18 @@ namespace Grpc.Core.Tests ...@@ -157,18 +157,18 @@ namespace Grpc.Core.Tests
[Test] [Test]
public async Task AsyncUnaryCall() public async Task AsyncUnaryCall()
{ {
var internalCall = new Call<string, string>(channel, EchoMethod, new CallContext()); var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallContext());
var result = await Calls.AsyncUnaryCall(internalCall, "ABC"); var result = await Calls.AsyncUnaryCall(callDetails, "ABC");
Assert.AreEqual("ABC", result); Assert.AreEqual("ABC", result);
} }
[Test] [Test]
public async Task AsyncUnaryCall_ServerHandlerThrows() public async Task AsyncUnaryCall_ServerHandlerThrows()
{ {
var internalCall = new Call<string, string>(channel, EchoMethod, new CallContext()); var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallContext());
try try
{ {
await Calls.AsyncUnaryCall(internalCall, "THROW"); await Calls.AsyncUnaryCall(callDetails, "THROW");
Assert.Fail(); Assert.Fail();
} }
catch (RpcException e) catch (RpcException e)
...@@ -180,8 +180,8 @@ namespace Grpc.Core.Tests ...@@ -180,8 +180,8 @@ namespace Grpc.Core.Tests
[Test] [Test]
public async Task ClientStreamingCall() public async Task ClientStreamingCall()
{ {
var internalCall = new Call<string, string>(channel, ConcatAndEchoMethod, new CallContext()); var callDetails = new CallInvocationDetails<string, string>(channel, ConcatAndEchoMethod, new CallContext());
var call = Calls.AsyncClientStreamingCall(internalCall); var call = Calls.AsyncClientStreamingCall(callDetails);
await call.RequestStream.WriteAll(new string[] { "A", "B", "C" }); await call.RequestStream.WriteAll(new string[] { "A", "B", "C" });
Assert.AreEqual("ABC", await call.ResponseAsync); Assert.AreEqual("ABC", await call.ResponseAsync);
...@@ -191,8 +191,8 @@ namespace Grpc.Core.Tests ...@@ -191,8 +191,8 @@ namespace Grpc.Core.Tests
public async Task ClientStreamingCall_CancelAfterBegin() public async Task ClientStreamingCall_CancelAfterBegin()
{ {
var cts = new CancellationTokenSource(); var cts = new CancellationTokenSource();
var internalCall = new Call<string, string>(channel, ConcatAndEchoMethod, new CallContext(cancellationToken: cts.Token)); var callDetails = new CallInvocationDetails<string, string>(channel, ConcatAndEchoMethod, new CallContext(cancellationToken: cts.Token));
var call = Calls.AsyncClientStreamingCall(internalCall); var call = Calls.AsyncClientStreamingCall(callDetails);
// TODO(jtattermusch): we need this to ensure call has been initiated once we cancel it. // TODO(jtattermusch): we need this to ensure call has been initiated once we cancel it.
await Task.Delay(1000); await Task.Delay(1000);
...@@ -216,8 +216,8 @@ namespace Grpc.Core.Tests ...@@ -216,8 +216,8 @@ namespace Grpc.Core.Tests
new Metadata.Entry("ascii-header", "abcdefg"), new Metadata.Entry("ascii-header", "abcdefg"),
new Metadata.Entry("binary-header-bin", new byte[] { 1, 2, 3, 0, 0xff }), new Metadata.Entry("binary-header-bin", new byte[] { 1, 2, 3, 0, 0xff }),
}; };
var internalCall = new Call<string, string>(channel, EchoMethod, new CallContext(headers: headers)); var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallContext(headers: headers));
var call = Calls.AsyncUnaryCall(internalCall, "ABC"); var call = Calls.AsyncUnaryCall(callDetails, "ABC");
Assert.AreEqual("ABC", call.ResponseAsync.Result); Assert.AreEqual("ABC", call.ResponseAsync.Result);
...@@ -237,25 +237,25 @@ namespace Grpc.Core.Tests ...@@ -237,25 +237,25 @@ namespace Grpc.Core.Tests
{ {
channel.Dispose(); channel.Dispose();
var internalCall = new Call<string, string>(channel, EchoMethod, new CallContext()); var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallContext());
Assert.Throws(typeof(ObjectDisposedException), () => Calls.BlockingUnaryCall(internalCall, "ABC")); Assert.Throws(typeof(ObjectDisposedException), () => Calls.BlockingUnaryCall(callDetails, "ABC"));
} }
[Test] [Test]
public void UnaryCallPerformance() public void UnaryCallPerformance()
{ {
var internalCall = new Call<string, string>(channel, EchoMethod, new CallContext()); var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallContext());
BenchmarkUtil.RunBenchmark(100, 100, BenchmarkUtil.RunBenchmark(100, 100,
() => { Calls.BlockingUnaryCall(internalCall, "ABC"); }); () => { Calls.BlockingUnaryCall(callDetails, "ABC"); });
} }
[Test] [Test]
public void UnknownMethodHandler() public void UnknownMethodHandler()
{ {
var internalCall = new Call<string, string>(channel, NonexistentMethod, new CallContext()); var callDetails = new CallInvocationDetails<string, string>(channel, NonexistentMethod, new CallContext());
try try
{ {
Calls.BlockingUnaryCall(internalCall, "ABC"); Calls.BlockingUnaryCall(callDetails, "ABC");
Assert.Fail(); Assert.Fail();
} }
catch (RpcException e) catch (RpcException e)
...@@ -267,16 +267,16 @@ namespace Grpc.Core.Tests ...@@ -267,16 +267,16 @@ namespace Grpc.Core.Tests
[Test] [Test]
public void UserAgentStringPresent() public void UserAgentStringPresent()
{ {
var internalCall = new Call<string, string>(channel, EchoMethod, new CallContext()); var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallContext());
string userAgent = Calls.BlockingUnaryCall(internalCall, "RETURN-USER-AGENT"); string userAgent = Calls.BlockingUnaryCall(callDetails, "RETURN-USER-AGENT");
Assert.IsTrue(userAgent.StartsWith("grpc-csharp/")); Assert.IsTrue(userAgent.StartsWith("grpc-csharp/"));
} }
[Test] [Test]
public void PeerInfoPresent() public void PeerInfoPresent()
{ {
var internalCall = new Call<string, string>(channel, EchoMethod, new CallContext()); var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallContext());
string peer = Calls.BlockingUnaryCall(internalCall, "RETURN-PEER"); string peer = Calls.BlockingUnaryCall(callDetails, "RETURN-PEER");
Assert.IsTrue(peer.Contains(Host)); Assert.IsTrue(peer.Contains(Host));
} }
...@@ -288,8 +288,8 @@ namespace Grpc.Core.Tests ...@@ -288,8 +288,8 @@ namespace Grpc.Core.Tests
var stateChangedTask = channel.WaitForStateChangedAsync(channel.State); var stateChangedTask = channel.WaitForStateChangedAsync(channel.State);
var internalCall = new Call<string, string>(channel, EchoMethod, new CallContext()); var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallContext());
await Calls.AsyncUnaryCall(internalCall, "abc"); await Calls.AsyncUnaryCall(callDetails, "abc");
await stateChangedTask; await stateChangedTask;
Assert.AreEqual(ChannelState.Ready, channel.State); Assert.AreEqual(ChannelState.Ready, channel.State);
......
...@@ -99,12 +99,12 @@ namespace Grpc.Core.Tests ...@@ -99,12 +99,12 @@ namespace Grpc.Core.Tests
public void InfiniteDeadline() public void InfiniteDeadline()
{ {
// no deadline specified, check server sees infinite deadline // no deadline specified, check server sees infinite deadline
var internalCall = new Call<string, string>(channel, TestMethod, new CallContext()); var callDetails = new CallInvocationDetails<string, string>(channel, TestMethod, new CallContext());
Assert.AreEqual("DATETIME_MAXVALUE", Calls.BlockingUnaryCall(internalCall, "RETURN_DEADLINE")); Assert.AreEqual("DATETIME_MAXVALUE", Calls.BlockingUnaryCall(callDetails, "RETURN_DEADLINE"));
// DateTime.MaxValue deadline specified, check server sees infinite deadline // DateTime.MaxValue deadline specified, check server sees infinite deadline
var internalCall2 = new Call<string, string>(channel, TestMethod, new CallContext()); var callDetails2 = new CallInvocationDetails<string, string>(channel, TestMethod, new CallContext());
Assert.AreEqual("DATETIME_MAXVALUE", Calls.BlockingUnaryCall(internalCall2, "RETURN_DEADLINE")); Assert.AreEqual("DATETIME_MAXVALUE", Calls.BlockingUnaryCall(callDetails2, "RETURN_DEADLINE"));
} }
[Test] [Test]
...@@ -113,9 +113,9 @@ namespace Grpc.Core.Tests ...@@ -113,9 +113,9 @@ namespace Grpc.Core.Tests
var remainingTimeClient = TimeSpan.FromDays(7); var remainingTimeClient = TimeSpan.FromDays(7);
var deadline = DateTime.UtcNow + remainingTimeClient; var deadline = DateTime.UtcNow + remainingTimeClient;
Thread.Sleep(1000); Thread.Sleep(1000);
var internalCall = new Call<string, string>(channel, TestMethod, new CallContext(deadline: deadline)); var callDetails = new CallInvocationDetails<string, string>(channel, TestMethod, new CallContext(deadline: deadline));
var serverDeadlineTicksString = Calls.BlockingUnaryCall(internalCall, "RETURN_DEADLINE"); var serverDeadlineTicksString = Calls.BlockingUnaryCall(callDetails, "RETURN_DEADLINE");
var serverDeadline = new DateTime(long.Parse(serverDeadlineTicksString), DateTimeKind.Utc); var serverDeadline = new DateTime(long.Parse(serverDeadlineTicksString), DateTimeKind.Utc);
// A fairly relaxed check that the deadline set by client and deadline seen by server // A fairly relaxed check that the deadline set by client and deadline seen by server
...@@ -127,11 +127,11 @@ namespace Grpc.Core.Tests ...@@ -127,11 +127,11 @@ namespace Grpc.Core.Tests
[Test] [Test]
public void DeadlineInThePast() public void DeadlineInThePast()
{ {
var internalCall = new Call<string, string>(channel, TestMethod, new CallContext(deadline: DateTime.MinValue)); var callDetails = new CallInvocationDetails<string, string>(channel, TestMethod, new CallContext(deadline: DateTime.MinValue));
try try
{ {
Calls.BlockingUnaryCall(internalCall, "TIMEOUT"); Calls.BlockingUnaryCall(callDetails, "TIMEOUT");
Assert.Fail(); Assert.Fail();
} }
catch (RpcException e) catch (RpcException e)
...@@ -145,11 +145,11 @@ namespace Grpc.Core.Tests ...@@ -145,11 +145,11 @@ namespace Grpc.Core.Tests
public void DeadlineExceededStatusOnTimeout() public void DeadlineExceededStatusOnTimeout()
{ {
var deadline = DateTime.UtcNow.Add(TimeSpan.FromSeconds(5)); var deadline = DateTime.UtcNow.Add(TimeSpan.FromSeconds(5));
var internalCall = new Call<string, string>(channel, TestMethod, new CallContext(deadline: deadline)); var callDetails = new CallInvocationDetails<string, string>(channel, TestMethod, new CallContext(deadline: deadline));
try try
{ {
Calls.BlockingUnaryCall(internalCall, "TIMEOUT"); Calls.BlockingUnaryCall(callDetails, "TIMEOUT");
Assert.Fail(); Assert.Fail();
} }
catch (RpcException e) catch (RpcException e)
...@@ -163,11 +163,11 @@ namespace Grpc.Core.Tests ...@@ -163,11 +163,11 @@ namespace Grpc.Core.Tests
public void ServerReceivesCancellationOnTimeout() public void ServerReceivesCancellationOnTimeout()
{ {
var deadline = DateTime.UtcNow.Add(TimeSpan.FromSeconds(5)); var deadline = DateTime.UtcNow.Add(TimeSpan.FromSeconds(5));
var internalCall = new Call<string, string>(channel, TestMethod, new CallContext(deadline: deadline)); var callDetails = new CallInvocationDetails<string, string>(channel, TestMethod, new CallContext(deadline: deadline));
try try
{ {
Calls.BlockingUnaryCall(internalCall, "CHECK_CANCELLATION_RECEIVED"); Calls.BlockingUnaryCall(callDetails, "CHECK_CANCELLATION_RECEIVED");
Assert.Fail(); Assert.Fail();
} }
catch (RpcException e) catch (RpcException e)
......
...@@ -38,25 +38,30 @@ using Grpc.Core.Utils; ...@@ -38,25 +38,30 @@ using Grpc.Core.Utils;
namespace Grpc.Core namespace Grpc.Core
{ {
/// <summary> /// <summary>
/// Abstraction of a call to be invoked on a client. /// Details about a client-side call to be invoked.
/// </summary> /// </summary>
public class Call<TRequest, TResponse> public class CallInvocationDetails<TRequest, TResponse>
{ {
readonly Channel channel; readonly Channel channel;
readonly Method<TRequest, TResponse> method; readonly string method;
readonly string host; readonly string host;
readonly Marshaller<TRequest> requestMarshaller;
readonly Marshaller<TResponse> responseMarshaller;
readonly CallContext context; readonly CallContext context;
public Call(Channel channel, Method<TRequest, TResponse> method, CallContext context)
: this(channel, method, null, context) public CallInvocationDetails(Channel channel, Method<TRequest, TResponse> method, CallContext context) :
this(channel, method.FullName, null, method.RequestMarshaller, method.ResponseMarshaller, context)
{ {
} }
public Call(Channel channel, Method<TRequest, TResponse> method, string host, CallContext context) public CallInvocationDetails(Channel channel, string method, string host, Marshaller<TRequest> requestMarshaller, Marshaller<TResponse> responseMarshaller, CallContext context)
{ {
this.channel = Preconditions.CheckNotNull(channel); this.channel = Preconditions.CheckNotNull(channel);
this.method = Preconditions.CheckNotNull(method); this.method = Preconditions.CheckNotNull(method);
this.host = host; this.host = host;
this.requestMarshaller = Preconditions.CheckNotNull(requestMarshaller);
this.responseMarshaller = Preconditions.CheckNotNull(responseMarshaller);
this.context = Preconditions.CheckNotNull(context); this.context = Preconditions.CheckNotNull(context);
} }
...@@ -68,7 +73,7 @@ namespace Grpc.Core ...@@ -68,7 +73,7 @@ namespace Grpc.Core
} }
} }
public Method<TRequest, TResponse> Method public string Method
{ {
get get
{ {
...@@ -84,6 +89,22 @@ namespace Grpc.Core ...@@ -84,6 +89,22 @@ namespace Grpc.Core
} }
} }
public Marshaller<TRequest> RequestMarshaller
{
get
{
return this.requestMarshaller;
}
}
public Marshaller<TResponse> ResponseMarshaller
{
get
{
return this.responseMarshaller;
}
}
/// <summary> /// <summary>
/// Call context. /// Call context.
/// </summary> /// </summary>
......
...@@ -43,71 +43,52 @@ namespace Grpc.Core ...@@ -43,71 +43,52 @@ namespace Grpc.Core
/// </summary> /// </summary>
public static class Calls public static class Calls
{ {
public static TResponse BlockingUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req) public static TResponse BlockingUnaryCall<TRequest, TResponse>(CallInvocationDetails<TRequest, TResponse> call, TRequest req)
where TRequest : class where TRequest : class
where TResponse : class where TResponse : class
{ {
var asyncCall = new AsyncCall<TRequest, TResponse>(call.Channel, call.Method.FullName, call.Host, call.Context, var asyncCall = new AsyncCall<TRequest, TResponse>(call);
call.Method.RequestMarshaller.Serializer, call.Method.ResponseMarshaller.Deserializer);
// TODO(jtattermusch): this gives a race that cancellation can be requested before the call even starts.
RegisterCancellationCallback(asyncCall, call.Context.CancellationToken);
return asyncCall.UnaryCall(req); return asyncCall.UnaryCall(req);
} }
public static AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req) public static AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(CallInvocationDetails<TRequest, TResponse> call, TRequest req)
where TRequest : class where TRequest : class
where TResponse : class where TResponse : class
{ {
var asyncCall = new AsyncCall<TRequest, TResponse>(call.Channel, call.Method.FullName, call.Host, call.Context, var asyncCall = new AsyncCall<TRequest, TResponse>(call);
call.Method.RequestMarshaller.Serializer, call.Method.ResponseMarshaller.Deserializer);
var asyncResult = asyncCall.UnaryCallAsync(req); var asyncResult = asyncCall.UnaryCallAsync(req);
RegisterCancellationCallback(asyncCall, call.Context.CancellationToken);
return new AsyncUnaryCall<TResponse>(asyncResult, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); return new AsyncUnaryCall<TResponse>(asyncResult, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
} }
public static AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req) public static AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(CallInvocationDetails<TRequest, TResponse> call, TRequest req)
where TRequest : class where TRequest : class
where TResponse : class where TResponse : class
{ {
var asyncCall = new AsyncCall<TRequest, TResponse>(call.Channel, call.Method.FullName, call.Host, call.Context, var asyncCall = new AsyncCall<TRequest, TResponse>(call);
call.Method.RequestMarshaller.Serializer, call.Method.ResponseMarshaller.Deserializer);
asyncCall.StartServerStreamingCall(req); asyncCall.StartServerStreamingCall(req);
RegisterCancellationCallback(asyncCall, call.Context.CancellationToken);
var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall); var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall);
return new AsyncServerStreamingCall<TResponse>(responseStream, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); return new AsyncServerStreamingCall<TResponse>(responseStream, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
} }
public static AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call) public static AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(CallInvocationDetails<TRequest, TResponse> call)
where TRequest : class where TRequest : class
where TResponse : class where TResponse : class
{ {
var asyncCall = new AsyncCall<TRequest, TResponse>(call.Channel, call.Method.FullName, call.Host, call.Context, var asyncCall = new AsyncCall<TRequest, TResponse>(call);
call.Method.RequestMarshaller.Serializer, call.Method.ResponseMarshaller.Deserializer);
var resultTask = asyncCall.ClientStreamingCallAsync(); var resultTask = asyncCall.ClientStreamingCallAsync();
RegisterCancellationCallback(asyncCall, call.Context.CancellationToken);
var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall); var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall);
return new AsyncClientStreamingCall<TRequest, TResponse>(requestStream, resultTask, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); return new AsyncClientStreamingCall<TRequest, TResponse>(requestStream, resultTask, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
} }
public static AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call) public static AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(CallInvocationDetails<TRequest, TResponse> call)
where TRequest : class where TRequest : class
where TResponse : class where TResponse : class
{ {
var asyncCall = new AsyncCall<TRequest, TResponse>(call.Channel, call.Method.FullName, call.Host, call.Context, var asyncCall = new AsyncCall<TRequest, TResponse>(call);
call.Method.RequestMarshaller.Serializer, call.Method.ResponseMarshaller.Deserializer);
asyncCall.StartDuplexStreamingCall(); asyncCall.StartDuplexStreamingCall();
RegisterCancellationCallback(asyncCall, call.Context.CancellationToken);
var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall); var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall);
var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall); var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall);
return new AsyncDuplexStreamingCall<TRequest, TResponse>(requestStream, responseStream, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); return new AsyncDuplexStreamingCall<TRequest, TResponse>(requestStream, responseStream, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
} }
private static void RegisterCancellationCallback<TRequest, TResponse>(AsyncCall<TRequest, TResponse> asyncCall, CancellationToken token)
{
if (token.CanBeCanceled)
{
token.Register(() => asyncCall.Cancel());
}
}
} }
} }
...@@ -76,7 +76,7 @@ namespace Grpc.Core ...@@ -76,7 +76,7 @@ namespace Grpc.Core
/// <summary> /// <summary>
/// Creates a new call to given method. /// Creates a new call to given method.
/// </summary> /// </summary>
protected Call<TRequest, TResponse> CreateCall<TRequest, TResponse>(Method<TRequest, TResponse> method, CallContext context) protected CallInvocationDetails<TRequest, TResponse> CreateCall<TRequest, TResponse>(Method<TRequest, TResponse> method, CallContext context)
where TRequest : class where TRequest : class
where TResponse : class where TResponse : class
{ {
...@@ -86,7 +86,7 @@ namespace Grpc.Core ...@@ -86,7 +86,7 @@ namespace Grpc.Core
interceptor(context.Headers); interceptor(context.Headers);
context.Headers.Freeze(); context.Headers.Freeze();
} }
return new Call<TRequest, TResponse>(channel, method, null, context); return new CallInvocationDetails<TRequest, TResponse>(channel, method, context);
} }
} }
} }
...@@ -58,7 +58,6 @@ ...@@ -58,7 +58,6 @@
<Compile Include="Properties\AssemblyInfo.cs" /> <Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="RpcException.cs" /> <Compile Include="RpcException.cs" />
<Compile Include="Calls.cs" /> <Compile Include="Calls.cs" />
<Compile Include="Call.cs" />
<Compile Include="AsyncClientStreamingCall.cs" /> <Compile Include="AsyncClientStreamingCall.cs" />
<Compile Include="GrpcEnvironment.cs" /> <Compile Include="GrpcEnvironment.cs" />
<Compile Include="Status.cs" /> <Compile Include="Status.cs" />
...@@ -115,6 +114,7 @@ ...@@ -115,6 +114,7 @@
<Compile Include="Logging\ConsoleLogger.cs" /> <Compile Include="Logging\ConsoleLogger.cs" />
<Compile Include="Internal\NativeLogRedirector.cs" /> <Compile Include="Internal\NativeLogRedirector.cs" />
<Compile Include="ChannelState.cs" /> <Compile Include="ChannelState.cs" />
<Compile Include="CallInvocationDetails.cs" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<None Include="Grpc.Core.nuspec" /> <None Include="Grpc.Core.nuspec" />
......
...@@ -50,10 +50,7 @@ namespace Grpc.Core.Internal ...@@ -50,10 +50,7 @@ namespace Grpc.Core.Internal
{ {
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<AsyncCall<TRequest, TResponse>>(); static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<AsyncCall<TRequest, TResponse>>();
readonly Channel channel; readonly CallInvocationDetails<TRequest, TResponse> callDetails;
readonly string method;
readonly string host;
readonly CallContext context;
// Completion of a pending unary response if not null. // Completion of a pending unary response if not null.
TaskCompletionSource<TResponse> unaryResponseTcs; TaskCompletionSource<TResponse> unaryResponseTcs;
...@@ -63,12 +60,10 @@ namespace Grpc.Core.Internal ...@@ -63,12 +60,10 @@ namespace Grpc.Core.Internal
bool readObserverCompleted; // True if readObserver has already been completed. bool readObserverCompleted; // True if readObserver has already been completed.
public AsyncCall(Channel channel, string method, string host, CallContext context, Func<TRequest, byte[]> serializer, Func<byte[], TResponse> deserializer) : base(serializer, deserializer) public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails)
: base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer)
{ {
this.channel = channel; this.callDetails = callDetails;
this.method = Preconditions.CheckNotNull(method);
this.host = host; // null host means default host will be used by C-core.
this.context = context;
} }
// TODO: this method is not Async, so it shouldn't be in AsyncCall class, but // TODO: this method is not Async, so it shouldn't be in AsyncCall class, but
...@@ -87,13 +82,14 @@ namespace Grpc.Core.Internal ...@@ -87,13 +82,14 @@ namespace Grpc.Core.Internal
lock (myLock) lock (myLock)
{ {
Preconditions.CheckState(!started); Preconditions.CheckState(!started);
Initialize(cq);
started = true; started = true;
Initialize(cq);
halfcloseRequested = true; halfcloseRequested = true;
readingDone = true; readingDone = true;
} }
using (var metadataArray = MetadataArraySafeHandle.Create(context.Headers)) using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Context.Headers))
{ {
using (var ctx = BatchContextSafeHandle.Create()) using (var ctx = BatchContextSafeHandle.Create())
{ {
...@@ -132,16 +128,17 @@ namespace Grpc.Core.Internal ...@@ -132,16 +128,17 @@ namespace Grpc.Core.Internal
lock (myLock) lock (myLock)
{ {
Preconditions.CheckState(!started); Preconditions.CheckState(!started);
Initialize(channel.Environment.CompletionQueue);
started = true; started = true;
Initialize(callDetails.Channel.Environment.CompletionQueue);
halfcloseRequested = true; halfcloseRequested = true;
readingDone = true; readingDone = true;
byte[] payload = UnsafeSerialize(msg); byte[] payload = UnsafeSerialize(msg);
unaryResponseTcs = new TaskCompletionSource<TResponse>(); unaryResponseTcs = new TaskCompletionSource<TResponse>();
using (var metadataArray = MetadataArraySafeHandle.Create(context.Headers)) using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Context.Headers))
{ {
call.StartUnary(payload, HandleUnaryResponse, metadataArray); call.StartUnary(payload, HandleUnaryResponse, metadataArray);
} }
...@@ -158,13 +155,14 @@ namespace Grpc.Core.Internal ...@@ -158,13 +155,14 @@ namespace Grpc.Core.Internal
lock (myLock) lock (myLock)
{ {
Preconditions.CheckState(!started); Preconditions.CheckState(!started);
Initialize(channel.Environment.CompletionQueue);
started = true; started = true;
Initialize(callDetails.Channel.Environment.CompletionQueue);
readingDone = true; readingDone = true;
unaryResponseTcs = new TaskCompletionSource<TResponse>(); unaryResponseTcs = new TaskCompletionSource<TResponse>();
using (var metadataArray = MetadataArraySafeHandle.Create(context.Headers)) using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Context.Headers))
{ {
call.StartClientStreaming(HandleUnaryResponse, metadataArray); call.StartClientStreaming(HandleUnaryResponse, metadataArray);
} }
...@@ -181,15 +179,16 @@ namespace Grpc.Core.Internal ...@@ -181,15 +179,16 @@ namespace Grpc.Core.Internal
lock (myLock) lock (myLock)
{ {
Preconditions.CheckState(!started); Preconditions.CheckState(!started);
Initialize(channel.Environment.CompletionQueue);
started = true; started = true;
Initialize(callDetails.Channel.Environment.CompletionQueue);
halfcloseRequested = true; halfcloseRequested = true;
halfclosed = true; // halfclose not confirmed yet, but it will be once finishedHandler is called. halfclosed = true; // halfclose not confirmed yet, but it will be once finishedHandler is called.
byte[] payload = UnsafeSerialize(msg); byte[] payload = UnsafeSerialize(msg);
using (var metadataArray = MetadataArraySafeHandle.Create(context.Headers)) using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Context.Headers))
{ {
call.StartServerStreaming(payload, HandleFinished, metadataArray); call.StartServerStreaming(payload, HandleFinished, metadataArray);
} }
...@@ -205,11 +204,11 @@ namespace Grpc.Core.Internal ...@@ -205,11 +204,11 @@ namespace Grpc.Core.Internal
lock (myLock) lock (myLock)
{ {
Preconditions.CheckState(!started); Preconditions.CheckState(!started);
Initialize(channel.Environment.CompletionQueue);
started = true; started = true;
using (var metadataArray = MetadataArraySafeHandle.Create(context.Headers)) Initialize(callDetails.Channel.Environment.CompletionQueue);
using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Context.Headers))
{ {
call.StartDuplexStreaming(HandleFinished, metadataArray); call.StartDuplexStreaming(HandleFinished, metadataArray);
} }
...@@ -311,14 +310,26 @@ namespace Grpc.Core.Internal ...@@ -311,14 +310,26 @@ namespace Grpc.Core.Internal
protected override void OnReleaseResources() protected override void OnReleaseResources()
{ {
channel.Environment.DebugStats.ActiveClientCalls.Decrement(); callDetails.Channel.Environment.DebugStats.ActiveClientCalls.Decrement();
} }
private void Initialize(CompletionQueueSafeHandle cq) private void Initialize(CompletionQueueSafeHandle cq)
{ {
var call = channel.Handle.CreateCall(channel.Environment.CompletionRegistry, cq, method, host, Timespec.FromDateTime(context.Deadline)); var call = callDetails.Channel.Handle.CreateCall(callDetails.Channel.Environment.CompletionRegistry, cq,
channel.Environment.DebugStats.ActiveClientCalls.Increment(); callDetails.Method, callDetails.Host, Timespec.FromDateTime(callDetails.Context.Deadline));
callDetails.Channel.Environment.DebugStats.ActiveClientCalls.Increment();
InitializeInternal(call); InitializeInternal(call);
RegisterCancellationCallback();
}
// Make sure that once cancellationToken for this call is cancelled, Cancel() will be called.
private void RegisterCancellationCallback()
{
var token = callDetails.Context.CancellationToken;
if (token.CanBeCanceled)
{
token.Register(() => this.Cancel());
}
} }
/// <summary> /// <summary>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment