Skip to content
Snippets Groups Projects
Commit 542e21cb authored by Jan Tattermusch's avatar Jan Tattermusch
Browse files

refactoring AsyncCall

parent 5e10f183
No related branches found
No related tags found
No related merge requests found
......@@ -126,8 +126,7 @@ namespace Grpc.Core.Tests
[Test]
public void DeadlineInThePast()
{
var deadline = DateTime.MinValue;
var internalCall = new Call<string, string>(ServiceName, TestMethod, channel, new CallContext());
var internalCall = new Call<string, string>(ServiceName, TestMethod, channel, new CallContext(deadline: DateTime.MinValue));
try
{
......
......@@ -53,6 +53,7 @@ namespace Grpc.Core
this.name = method.GetFullName(serviceName);
this.requestMarshaller = method.RequestMarshaller;
this.responseMarshaller = method.ResponseMarshaller;
this.channel = channel;
this.context = context;
}
......
......@@ -47,19 +47,20 @@ namespace Grpc.Core
where TRequest : class
where TResponse : class
{
var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
var asyncCall = new AsyncCall<TRequest, TResponse>(call.Channel, call.Name, null, call.Context,
call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
// TODO(jtattermusch): this gives a race that cancellation can be requested before the call even starts.
RegisterCancellationCallback(asyncCall, call.Context.CancellationToken);
return asyncCall.UnaryCall(call.Channel, call.Name, req, call.Context.Headers, call.Context.Deadline);
return asyncCall.UnaryCall(req);
}
public static AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req)
where TRequest : class
where TResponse : class
{
var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
asyncCall.Initialize(call.Channel, call.Channel.Environment.CompletionQueue, call.Name, Timespec.FromDateTime(call.Context.Deadline));
var asyncResult = asyncCall.UnaryCallAsync(req, call.Context.Headers, call.Context.Deadline);
var asyncCall = new AsyncCall<TRequest, TResponse>(call.Channel, call.Name, null, call.Context,
call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
var asyncResult = asyncCall.UnaryCallAsync(req);
RegisterCancellationCallback(asyncCall, call.Context.CancellationToken);
return new AsyncUnaryCall<TResponse>(asyncResult, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
}
......@@ -68,9 +69,9 @@ namespace Grpc.Core
where TRequest : class
where TResponse : class
{
var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
asyncCall.Initialize(call.Channel, call.Channel.Environment.CompletionQueue, call.Name, Timespec.FromDateTime(call.Context.Deadline));
asyncCall.StartServerStreamingCall(req, call.Context.Headers, call.Context.Deadline);
var asyncCall = new AsyncCall<TRequest, TResponse>(call.Channel, call.Name, null, call.Context,
call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
asyncCall.StartServerStreamingCall(req);
RegisterCancellationCallback(asyncCall, call.Context.CancellationToken);
var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall);
return new AsyncServerStreamingCall<TResponse>(responseStream, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
......@@ -80,9 +81,9 @@ namespace Grpc.Core
where TRequest : class
where TResponse : class
{
var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
asyncCall.Initialize(call.Channel, call.Channel.Environment.CompletionQueue, call.Name, Timespec.FromDateTime(call.Context.Deadline));
var resultTask = asyncCall.ClientStreamingCallAsync(call.Context.Headers, call.Context.Deadline);
var asyncCall = new AsyncCall<TRequest, TResponse>(call.Channel, call.Name, null, call.Context,
call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
var resultTask = asyncCall.ClientStreamingCallAsync();
RegisterCancellationCallback(asyncCall, call.Context.CancellationToken);
var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall);
return new AsyncClientStreamingCall<TRequest, TResponse>(requestStream, resultTask, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
......@@ -92,9 +93,9 @@ namespace Grpc.Core
where TRequest : class
where TResponse : class
{
var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
asyncCall.Initialize(call.Channel, call.Channel.Environment.CompletionQueue, call.Name, Timespec.FromDateTime(call.Context.Deadline));
asyncCall.StartDuplexStreamingCall(call.Context.Headers, call.Context.Deadline);
var asyncCall = new AsyncCall<TRequest, TResponse>(call.Channel, call.Name, null, call.Context,
call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
asyncCall.StartDuplexStreamingCall();
RegisterCancellationCallback(asyncCall, call.Context.CancellationToken);
var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall);
var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall);
......
......@@ -50,7 +50,10 @@ namespace Grpc.Core.Internal
{
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<AsyncCall<TRequest, TResponse>>();
Channel channel;
readonly Channel channel;
readonly string method;
readonly string host;
readonly CallContext context;
// Completion of a pending unary response if not null.
TaskCompletionSource<TResponse> unaryResponseTcs;
......@@ -60,26 +63,20 @@ namespace Grpc.Core.Internal
bool readObserverCompleted; // True if readObserver has already been completed.
public AsyncCall(Func<TRequest, byte[]> serializer, Func<byte[], TResponse> deserializer) : base(serializer, deserializer)
{
}
public void Initialize(Channel channel, CompletionQueueSafeHandle cq, string methodName, Timespec deadline)
public AsyncCall(Channel channel, string method, string host, CallContext context, Func<TRequest, byte[]> serializer, Func<byte[], TResponse> deserializer) : base(serializer, deserializer)
{
this.channel = channel;
var call = channel.Handle.CreateCall(channel.Environment.CompletionRegistry, cq, methodName, null, deadline);
channel.Environment.DebugStats.ActiveClientCalls.Increment();
InitializeInternal(call);
this.method = Preconditions.CheckNotNull(method);
this.host = host; // null host means default host will be used by C-core.
this.context = context;
}
// TODO: this method is not Async, so it shouldn't be in AsyncCall class, but
// it is reusing fair amount of code in this class, so we are leaving it here.
// TODO: for other calls, you need to call Initialize, this methods calls initialize
// on its own, so there's a usage inconsistency.
/// <summary>
/// Blocking unary request - unary response call.
/// </summary>
public TResponse UnaryCall(Channel channel, string methodName, TRequest msg, Metadata headers, DateTime deadline)
public TResponse UnaryCall(TRequest msg)
{
using (CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.Create())
{
......@@ -89,13 +86,14 @@ namespace Grpc.Core.Internal
lock (myLock)
{
Initialize(channel, cq, methodName, Timespec.FromDateTime(deadline));
Preconditions.CheckState(!started);
Initialize(cq);
started = true;
halfcloseRequested = true;
readingDone = true;
}
using (var metadataArray = MetadataArraySafeHandle.Create(headers))
using (var metadataArray = MetadataArraySafeHandle.Create(context.Headers))
{
using (var ctx = BatchContextSafeHandle.Create())
{
......@@ -129,11 +127,12 @@ namespace Grpc.Core.Internal
/// <summary>
/// Starts a unary request - unary response call.
/// </summary>
public Task<TResponse> UnaryCallAsync(TRequest msg, Metadata headers, DateTime deadline)
public Task<TResponse> UnaryCallAsync(TRequest msg)
{
lock (myLock)
{
Preconditions.CheckNotNull(call);
Preconditions.CheckState(!started);
Initialize(channel.Environment.CompletionQueue);
started = true;
halfcloseRequested = true;
......@@ -142,7 +141,7 @@ namespace Grpc.Core.Internal
byte[] payload = UnsafeSerialize(msg);
unaryResponseTcs = new TaskCompletionSource<TResponse>();
using (var metadataArray = MetadataArraySafeHandle.Create(headers))
using (var metadataArray = MetadataArraySafeHandle.Create(context.Headers))
{
call.StartUnary(payload, HandleUnaryResponse, metadataArray);
}
......@@ -154,17 +153,18 @@ namespace Grpc.Core.Internal
/// Starts a streamed request - unary response call.
/// Use StartSendMessage and StartSendCloseFromClient to stream requests.
/// </summary>
public Task<TResponse> ClientStreamingCallAsync(Metadata headers, DateTime deadline)
public Task<TResponse> ClientStreamingCallAsync()
{
lock (myLock)
{
Preconditions.CheckNotNull(call);
Preconditions.CheckState(!started);
Initialize(channel.Environment.CompletionQueue);
started = true;
readingDone = true;
unaryResponseTcs = new TaskCompletionSource<TResponse>();
using (var metadataArray = MetadataArraySafeHandle.Create(headers))
using (var metadataArray = MetadataArraySafeHandle.Create(context.Headers))
{
call.StartClientStreaming(HandleUnaryResponse, metadataArray);
}
......@@ -176,11 +176,12 @@ namespace Grpc.Core.Internal
/// <summary>
/// Starts a unary request - streamed response call.
/// </summary>
public void StartServerStreamingCall(TRequest msg, Metadata headers, DateTime deadline)
public void StartServerStreamingCall(TRequest msg)
{
lock (myLock)
{
Preconditions.CheckNotNull(call);
Preconditions.CheckState(!started);
Initialize(channel.Environment.CompletionQueue);
started = true;
halfcloseRequested = true;
......@@ -188,7 +189,7 @@ namespace Grpc.Core.Internal
byte[] payload = UnsafeSerialize(msg);
using (var metadataArray = MetadataArraySafeHandle.Create(headers))
using (var metadataArray = MetadataArraySafeHandle.Create(context.Headers))
{
call.StartServerStreaming(payload, HandleFinished, metadataArray);
}
......@@ -199,15 +200,16 @@ namespace Grpc.Core.Internal
/// Starts a streaming request - streaming response call.
/// Use StartSendMessage and StartSendCloseFromClient to stream requests.
/// </summary>
public void StartDuplexStreamingCall(Metadata headers, DateTime deadline)
public void StartDuplexStreamingCall()
{
lock (myLock)
{
Preconditions.CheckNotNull(call);
Preconditions.CheckState(!started);
Initialize(channel.Environment.CompletionQueue);
started = true;
using (var metadataArray = MetadataArraySafeHandle.Create(headers))
using (var metadataArray = MetadataArraySafeHandle.Create(context.Headers))
{
call.StartDuplexStreaming(HandleFinished, metadataArray);
}
......@@ -312,6 +314,13 @@ namespace Grpc.Core.Internal
channel.Environment.DebugStats.ActiveClientCalls.Decrement();
}
private void Initialize(CompletionQueueSafeHandle cq)
{
var call = channel.Handle.CreateCall(channel.Environment.CompletionRegistry, cq, method, host, Timespec.FromDateTime(context.Deadline));
channel.Environment.DebugStats.ActiveClientCalls.Increment();
InitializeInternal(call);
}
/// <summary>
/// Handler for unary response completion.
/// </summary>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment