Skip to content
Snippets Groups Projects
Commit 5bbd8186 authored by Jan Tattermusch's avatar Jan Tattermusch
Browse files

refactor server call handlers

parent 7f23a754
No related branches found
No related tags found
No related merge requests found
......@@ -42,7 +42,7 @@ namespace Grpc.Core.Internal
{
internal interface IServerCallHandler
{
Task HandleCall(string methodName, CallSafeHandle call, GrpcEnvironment environment);
Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment);
}
internal class UnaryServerCallHandler<TRequest, TResponse> : IServerCallHandler
......@@ -58,14 +58,14 @@ namespace Grpc.Core.Internal
this.handler = handler;
}
public async Task HandleCall(string methodName, CallSafeHandle call, GrpcEnvironment environment)
public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment)
{
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer,
environment);
asyncCall.Initialize(call);
asyncCall.Initialize(newRpc.Call);
var finishedTask = asyncCall.ServerSideCallAsync();
var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
......@@ -111,14 +111,14 @@ namespace Grpc.Core.Internal
this.handler = handler;
}
public async Task HandleCall(string methodName, CallSafeHandle call, GrpcEnvironment environment)
public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment)
{
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer,
environment);
asyncCall.Initialize(call);
asyncCall.Initialize(newRpc.Call);
var finishedTask = asyncCall.ServerSideCallAsync();
var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
......@@ -165,14 +165,14 @@ namespace Grpc.Core.Internal
this.handler = handler;
}
public async Task HandleCall(string methodName, CallSafeHandle call, GrpcEnvironment environment)
public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment)
{
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer,
environment);
asyncCall.Initialize(call);
asyncCall.Initialize(newRpc.Call);
var finishedTask = asyncCall.ServerSideCallAsync();
var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
......@@ -222,14 +222,14 @@ namespace Grpc.Core.Internal
this.handler = handler;
}
public async Task HandleCall(string methodName, CallSafeHandle call, GrpcEnvironment environment)
public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment)
{
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer,
environment);
asyncCall.Initialize(call);
asyncCall.Initialize(newRpc.Call);
var finishedTask = asyncCall.ServerSideCallAsync();
var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
......@@ -259,13 +259,15 @@ namespace Grpc.Core.Internal
internal class NoSuchMethodCallHandler : IServerCallHandler
{
public async Task HandleCall(string methodName, CallSafeHandle call, GrpcEnvironment environment)
public static readonly NoSuchMethodCallHandler Instance = new NoSuchMethodCallHandler();
public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment)
{
// We don't care about the payload type here.
var asyncCall = new AsyncCallServer<byte[], byte[]>(
(payload) => payload, (payload) => payload, environment);
asyncCall.Initialize(call);
asyncCall.Initialize(newRpc.Call);
var finishedTask = asyncCall.ServerSideCallAsync();
var requestStream = new ServerRequestStream<byte[], byte[]>(asyncCall);
var responseStream = new ServerResponseStream<byte[], byte[]>(asyncCall);
......
......@@ -218,16 +218,16 @@ namespace Grpc.Core
/// <summary>
/// Selects corresponding handler for given call and handles the call.
/// </summary>
private async Task InvokeCallHandler(CallSafeHandle call, string method)
private async Task HandleCallAsync(ServerRpcNew newRpc)
{
try
{
IServerCallHandler callHandler;
if (!callHandlers.TryGetValue(method, out callHandler))
if (!callHandlers.TryGetValue(newRpc.Method, out callHandler))
{
callHandler = new NoSuchMethodCallHandler();
callHandler = NoSuchMethodCallHandler.Instance;
}
await callHandler.HandleCall(method, call, environment);
await callHandler.HandleCall(newRpc, environment);
}
catch (Exception e)
{
......@@ -240,14 +240,15 @@ namespace Grpc.Core
/// </summary>
private void HandleNewServerRpc(bool success, BatchContextSafeHandle ctx)
{
// TODO: handle error
if (success)
{
ServerRpcNew newRpc = ctx.GetServerRpcNew();
// after server shutdown, the callback returns with null call
if (!newRpc.Call.IsInvalid)
{
Task.Run(async () => await InvokeCallHandler(newRpc.Call, newRpc.Method));
Task.Run(async () => await HandleCallAsync(newRpc));
}
}
AllowOneRpc();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment