Skip to content
Snippets Groups Projects
Commit 3d6fa14f authored by Michael Lumish's avatar Michael Lumish
Browse files

Merge pull request #411 from jtattermusch/csharp_server

Improved C# grpc server implementation to be able to register call handlers
parents e83fe4c9 caa36146
No related branches found
No related tags found
No related merge requests found
Showing with 364 additions and 59 deletions
...@@ -8,10 +8,8 @@ namespace Google.GRPC.Core ...@@ -8,10 +8,8 @@ namespace Google.GRPC.Core
readonly string methodName; readonly string methodName;
readonly Func<TRequest, byte[]> requestSerializer; readonly Func<TRequest, byte[]> requestSerializer;
readonly Func<byte[], TResponse> responseDeserializer; readonly Func<byte[], TResponse> responseDeserializer;
readonly TimeSpan timeout;
readonly Channel channel; readonly Channel channel;
// TODO: channel param should be removed in the future.
public Call(string methodName, public Call(string methodName,
Func<TRequest, byte[]> requestSerializer, Func<TRequest, byte[]> requestSerializer,
Func<byte[], TResponse> responseDeserializer, Func<byte[], TResponse> responseDeserializer,
...@@ -20,24 +18,22 @@ namespace Google.GRPC.Core ...@@ -20,24 +18,22 @@ namespace Google.GRPC.Core
this.methodName = methodName; this.methodName = methodName;
this.requestSerializer = requestSerializer; this.requestSerializer = requestSerializer;
this.responseDeserializer = responseDeserializer; this.responseDeserializer = responseDeserializer;
this.timeout = timeout;
this.channel = channel; this.channel = channel;
} }
public Call(Method<TRequest, TResponse> method, Channel channel)
public Channel Channel
{ {
get this.methodName = method.Name;
{ this.requestSerializer = method.RequestMarshaller.Serialize;
return this.channel; this.responseDeserializer = method.ResponseMarshaller.Deserialize;
} this.channel = channel;
} }
public TimeSpan Timeout public Channel Channel
{ {
get get
{ {
return this.timeout; return this.channel;
} }
} }
......
...@@ -54,6 +54,11 @@ ...@@ -54,6 +54,11 @@
<Compile Include="Internal\AsyncCall.cs" /> <Compile Include="Internal\AsyncCall.cs" />
<Compile Include="Internal\ServerSafeHandle.cs" /> <Compile Include="Internal\ServerSafeHandle.cs" />
<Compile Include="Internal\StreamingInputObserver.cs" /> <Compile Include="Internal\StreamingInputObserver.cs" />
<Compile Include="Method.cs" />
<Compile Include="IMarshaller.cs" />
<Compile Include="ServerCalls.cs" />
<Compile Include="ServerCallHandler.cs" />
<Compile Include="Internal\ServerWritingObserver.cs" />
</ItemGroup> </ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" /> <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup> <ItemGroup>
......
using System;
namespace Google.GRPC.Core
{
/// <summary>
/// For serializing and deserializing messages.
/// </summary>
public interface IMarshaller<T>
{
byte[] Serialize(T value);
T Deserialize(byte[] payload);
}
/// <summary>
/// UTF-8 Marshalling for string. Useful for testing.
/// </summary>
internal class StringMarshaller : IMarshaller<string> {
public byte[] Serialize(string value)
{
return System.Text.Encoding.UTF8.GetBytes(value);
}
public string Deserialize(byte[] payload)
{
return System.Text.Encoding.UTF8.GetString(payload);
}
}
}
...@@ -86,6 +86,14 @@ namespace Google.GRPC.Core.Internal ...@@ -86,6 +86,14 @@ namespace Google.GRPC.Core.Internal
return StartRead().Task; return StartRead().Task;
} }
public Task Halfclosed
{
get
{
return halfcloseTcs.Task;
}
}
public Task<Status> Finished public Task<Status> Finished
{ {
get get
......
...@@ -30,8 +30,8 @@ namespace Google.GRPC.Core.Internal ...@@ -30,8 +30,8 @@ namespace Google.GRPC.Core.Internal
[DllImport("libgrpc.so")] [DllImport("libgrpc.so")]
static extern void grpc_server_shutdown(ServerSafeHandle server); static extern void grpc_server_shutdown(ServerSafeHandle server);
[DllImport("libgrpc.so")] [DllImport("libgrpc.so", EntryPoint = "grpc_server_shutdown_and_notify")]
static extern void grpc_server_shutdown_and_notify(ServerSafeHandle server, IntPtr tag); static extern void grpc_server_shutdown_and_notify_CALLBACK(ServerSafeHandle server, [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate callback);
[DllImport("libgrpc.so")] [DllImport("libgrpc.so")]
static extern void grpc_server_destroy(IntPtr server); static extern void grpc_server_destroy(IntPtr server);
...@@ -62,6 +62,11 @@ namespace Google.GRPC.Core.Internal ...@@ -62,6 +62,11 @@ namespace Google.GRPC.Core.Internal
grpc_server_shutdown(this); grpc_server_shutdown(this);
} }
public void ShutdownAndNotify(EventCallbackDelegate callback)
{
grpc_server_shutdown_and_notify_CALLBACK(this, callback);
}
public GRPCCallError RequestCall(EventCallbackDelegate callback) public GRPCCallError RequestCall(EventCallbackDelegate callback)
{ {
return grpc_server_request_call_old_CALLBACK(this, callback); return grpc_server_request_call_old_CALLBACK(this, callback);
......
using System;
using Google.GRPC.Core.Internal;
namespace Google.GRPC.Core.Internal
{
/// <summary>
/// Observer that writes all arriving messages to a call abstraction (in blocking fashion)
/// and then halfcloses the call. Used for server-side call handling.
/// </summary>
internal class ServerWritingObserver<TWrite, TRead> : IObserver<TWrite>
{
readonly AsyncCall<TWrite, TRead> call;
public ServerWritingObserver(AsyncCall<TWrite, TRead> call)
{
this.call = call;
}
public void OnCompleted()
{
// TODO: how bad is the Wait here?
call.WriteStatusAsync(new Status(StatusCode.GRPC_STATUS_OK, "")).Wait();
}
public void OnError(Exception error)
{
// TODO: handle this...
throw new InvalidOperationException("This should never be called.");
}
public void OnNext(TWrite value)
{
// TODO: how bad is the Wait here?
call.WriteAsync(value).Wait();
}
}
}
using System; using System;
using Google.GRPC.Core.Internal; using Google.GRPC.Core.Internal;
namespace Google.GRPC.Core namespace Google.GRPC.Core.Internal
{ {
internal class StreamingInputObserver<TWrite, TRead> : IObserver<TWrite> internal class StreamingInputObserver<TWrite, TRead> : IObserver<TWrite>
{ {
......
using System;
namespace Google.GRPC.Core
{
public enum MethodType
{
Unary,
ClientStreaming,
ServerStreaming,
DuplexStreaming
}
/// <summary>
/// A description of a service method.
/// </summary>
public class Method<TRequest, TResponse>
{
readonly MethodType type;
readonly string name;
readonly IMarshaller<TRequest> requestMarshaller;
readonly IMarshaller<TResponse> responseMarshaller;
public Method(MethodType type, string name, IMarshaller<TRequest> requestMarshaller, IMarshaller<TResponse> responseMarshaller)
{
this.type = type;
this.name = name;
this.requestMarshaller = requestMarshaller;
this.responseMarshaller = responseMarshaller;
}
public MethodType Type
{
get
{
return this.type;
}
}
public string Name
{
get
{
return this.name;
}
}
public IMarshaller<TRequest> RequestMarshaller
{
get
{
return this.requestMarshaller;
}
}
public IMarshaller<TResponse> ResponseMarshaller
{
get
{
return this.responseMarshaller;
}
}
}
}
using System; using System;
using System.Runtime.InteropServices; using System.Runtime.InteropServices;
using System.Diagnostics; using System.Diagnostics;
using System.Threading.Tasks;
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Collections.Generic;
using Google.GRPC.Core.Internal; using Google.GRPC.Core.Internal;
namespace Google.GRPC.Core namespace Google.GRPC.Core
...@@ -15,10 +17,15 @@ namespace Google.GRPC.Core ...@@ -15,10 +17,15 @@ namespace Google.GRPC.Core
// TODO: make sure the delegate doesn't get garbage collected while // TODO: make sure the delegate doesn't get garbage collected while
// native callbacks are in the completion queue. // native callbacks are in the completion queue.
readonly EventCallbackDelegate newRpcHandler; readonly EventCallbackDelegate newRpcHandler;
readonly EventCallbackDelegate serverShutdownHandler;
readonly BlockingCollection<NewRpcInfo> newRpcQueue = new BlockingCollection<NewRpcInfo>(); readonly BlockingCollection<NewRpcInfo> newRpcQueue = new BlockingCollection<NewRpcInfo>();
readonly ServerSafeHandle handle; readonly ServerSafeHandle handle;
readonly Dictionary<string, IServerCallHandler> callHandlers = new Dictionary<string, IServerCallHandler>();
readonly TaskCompletionSource<object> shutdownTcs = new TaskCompletionSource<object>();
static Server() { static Server() {
GrpcEnvironment.EnsureInitialized(); GrpcEnvironment.EnsureInitialized();
} }
...@@ -28,8 +35,14 @@ namespace Google.GRPC.Core ...@@ -28,8 +35,14 @@ namespace Google.GRPC.Core
// TODO: what is the tag for server shutdown? // TODO: what is the tag for server shutdown?
this.handle = ServerSafeHandle.NewServer(GetCompletionQueue(), IntPtr.Zero); this.handle = ServerSafeHandle.NewServer(GetCompletionQueue(), IntPtr.Zero);
this.newRpcHandler = HandleNewRpc; this.newRpcHandler = HandleNewRpc;
this.serverShutdownHandler = HandleServerShutdown;
} }
// only call before Start(), this will be in server builder in the future.
internal void AddCallHandler(string methodName, IServerCallHandler handler) {
callHandlers.Add(methodName, handler);
}
// only call before Start()
public int AddPort(string addr) { public int AddPort(string addr) {
return handle.AddPort(addr); return handle.AddPort(addr);
} }
...@@ -37,49 +50,57 @@ namespace Google.GRPC.Core ...@@ -37,49 +50,57 @@ namespace Google.GRPC.Core
public void Start() public void Start()
{ {
handle.Start(); handle.Start();
// TODO: this basically means the server is single threaded....
StartHandlingRpcs();
} }
public void RunRpc() /// <summary>
/// Requests and handles single RPC call.
/// </summary>
internal void RunRpc()
{ {
AllowOneRpc(); AllowOneRpc();
try { try
var rpcInfo = newRpcQueue.Take(); {
var rpcInfo = newRpcQueue.Take();
Console.WriteLine("Server received RPC " + rpcInfo.Method);
AsyncCall<byte[], byte[]> asyncCall = new AsyncCall<byte[], byte[]>(
(payload) => payload, (payload) => payload);
asyncCall.InitializeServer(rpcInfo.Call);
asyncCall.Accept(GetCompletionQueue()); Console.WriteLine("Server received RPC " + rpcInfo.Method);
while(true) { IServerCallHandler callHandler;
byte[] payload = asyncCall.ReadAsync().Result; if (!callHandlers.TryGetValue(rpcInfo.Method, out callHandler))
if (payload == null)
{ {
break; callHandler = new NoSuchMethodCallHandler();
} }
callHandler.StartCall(rpcInfo.Method, rpcInfo.Call, GetCompletionQueue());
} }
catch(Exception e)
asyncCall.WriteAsync(new byte[] { }).Wait(); {
// TODO: what should be the details?
asyncCall.WriteStatusAsync(new Status(StatusCode.GRPC_STATUS_OK, "")).Wait();
asyncCall.Finished.Wait();
} catch(Exception e) {
Console.WriteLine("Exception while handling RPC: " + e); Console.WriteLine("Exception while handling RPC: " + e);
} }
} }
// TODO: implement disposal properly... /// <summary>
public void Shutdown() { /// Requests server shutdown and when there are no more calls being serviced,
handle.Shutdown(); /// cleans up used resources.
/// </summary>
/// <returns>The async.</returns>
public async Task ShutdownAsync() {
handle.ShutdownAndNotify(serverShutdownHandler);
await shutdownTcs.Task;
handle.Dispose();
}
public void Kill() {
handle.Dispose();
}
//handle.Dispose(); private async Task StartHandlingRpcs() {
while (true)
{
await Task.Factory.StartNew(RunRpc);
}
} }
private void AllowOneRpc() private void AllowOneRpc()
...@@ -100,6 +121,18 @@ namespace Google.GRPC.Core ...@@ -100,6 +121,18 @@ namespace Google.GRPC.Core
} }
} }
private void HandleServerShutdown(IntPtr eventPtr)
{
try
{
shutdownTcs.SetResult(null);
}
catch (Exception e)
{
Console.WriteLine("Caught exception in a native handler: " + e);
}
}
private static void AssertCallOk(GRPCCallError callError) private static void AssertCallOk(GRPCCallError callError)
{ {
Trace.Assert(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK"); Trace.Assert(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK");
......
using System;
using Google.GRPC.Core.Internal;
namespace Google.GRPC.Core
{
internal interface IServerCallHandler
{
void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq);
}
internal class UnaryRequestServerCallHandler<TRequest, TResponse> : IServerCallHandler
{
readonly Method<TRequest, TResponse> method;
readonly UnaryRequestServerMethod<TRequest, TResponse> handler;
public UnaryRequestServerCallHandler(Method<TRequest, TResponse> method, UnaryRequestServerMethod<TRequest, TResponse> handler)
{
this.method = method;
this.handler = handler;
}
public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq)
{
var asyncCall = new AsyncCall<TResponse, TRequest>(
(msg) => method.ResponseMarshaller.Serialize(msg),
(payload) => method.RequestMarshaller.Deserialize(payload));
asyncCall.InitializeServer(call);
asyncCall.Accept(cq);
var request = asyncCall.ReadAsync().Result;
var responseObserver = new ServerWritingObserver<TResponse, TRequest>(asyncCall);
handler(request, responseObserver);
asyncCall.Halfclosed.Wait();
// TODO: wait until writing is finished
asyncCall.WriteStatusAsync(new Status(StatusCode.GRPC_STATUS_OK, "")).Wait();
asyncCall.Finished.Wait();
}
}
internal class StreamingRequestServerCallHandler<TRequest, TResponse> : IServerCallHandler
{
readonly Method<TRequest, TResponse> method;
readonly StreamingRequestServerMethod<TRequest, TResponse> handler;
public StreamingRequestServerCallHandler(Method<TRequest, TResponse> method, StreamingRequestServerMethod<TRequest, TResponse> handler)
{
this.method = method;
this.handler = handler;
}
public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq)
{
var asyncCall = new AsyncCall<TResponse, TRequest>(
(msg) => method.ResponseMarshaller.Serialize(msg),
(payload) => method.RequestMarshaller.Deserialize(payload));
asyncCall.InitializeServer(call);
asyncCall.Accept(cq);
var responseObserver = new ServerWritingObserver<TResponse, TRequest>(asyncCall);
var requestObserver = handler(responseObserver);
// feed the requests
asyncCall.StartReadingToStream(requestObserver);
asyncCall.Halfclosed.Wait();
asyncCall.WriteStatusAsync(new Status(StatusCode.GRPC_STATUS_OK, "")).Wait();
asyncCall.Finished.Wait();
}
}
internal class NoSuchMethodCallHandler : IServerCallHandler
{
public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq)
{
// We don't care about the payload type here.
AsyncCall<byte[], byte[]> asyncCall = new AsyncCall<byte[], byte[]>(
(payload) => payload, (payload) => payload);
asyncCall.InitializeServer(call);
asyncCall.Accept(cq);
asyncCall.WriteStatusAsync(new Status(StatusCode.GRPC_STATUS_UNIMPLEMENTED, "No such method.")).Wait();
asyncCall.Finished.Wait();
}
}
}
using System;
namespace Google.GRPC.Core
{
// TODO: perhaps add also serverSideStreaming and clientSideStreaming
public delegate void UnaryRequestServerMethod<TRequest, TResponse> (TRequest request, IObserver<TResponse> responseObserver);
public delegate IObserver<TRequest> StreamingRequestServerMethod<TRequest, TResponse> (IObserver<TResponse> responseObserver);
internal static class ServerCalls {
public static IServerCallHandler UnaryRequestCall<TRequest, TResponse>(Method<TRequest, TResponse> method, UnaryRequestServerMethod<TRequest, TResponse> handler)
{
return new UnaryRequestServerCallHandler<TRequest, TResponse>(method, handler);
}
public static IServerCallHandler StreamingRequestCall<TRequest, TResponse>(Method<TRequest, TResponse> method, StreamingRequestServerMethod<TRequest, TResponse> handler)
{
return new StreamingRequestServerCallHandler<TRequest, TResponse>(method, handler);
}
}
}
...@@ -8,41 +8,48 @@ namespace Google.GRPC.Core.Tests ...@@ -8,41 +8,48 @@ namespace Google.GRPC.Core.Tests
{ {
public class ClientServerTest public class ClientServerTest
{ {
string request = "REQUEST";
string serverAddr = "localhost:" + Utils.PickUnusedPort(); string serverAddr = "localhost:" + Utils.PickUnusedPort();
private Method<string, string> unaryEchoStringMethod = new Method<string, string>(
MethodType.Unary,
"/tests.Test/UnaryEchoString",
new StringMarshaller(),
new StringMarshaller());
[Test] [Test]
public void EmptyCall() public void EmptyCall()
{ {
Server server = new Server(); Server server = new Server();
server.AddCallHandler(unaryEchoStringMethod.Name,
ServerCalls.UnaryRequestCall(unaryEchoStringMethod, HandleUnaryEchoString));
server.AddPort(serverAddr); server.AddPort(serverAddr);
server.Start(); server.Start();
Task.Factory.StartNew(
() => {
server.RunRpc();
}
);
using (Channel channel = new Channel(serverAddr)) using (Channel channel = new Channel(serverAddr))
{ {
CreateCall(channel); var call = CreateUnaryEchoStringCall(channel);
string response = Calls.BlockingUnaryCall(CreateCall(channel), request, default(CancellationToken));
Console.WriteLine("Received response: " + response); Assert.AreEqual("ABC", Calls.BlockingUnaryCall(call, "ABC", default(CancellationToken)));
Assert.AreEqual("abcdef", Calls.BlockingUnaryCall(call, "abcdef", default(CancellationToken)));
} }
server.Shutdown(); server.ShutdownAsync().Wait();
GrpcEnvironment.Shutdown(); GrpcEnvironment.Shutdown();
} }
private Call<string, string> CreateCall(Channel channel) private Call<string, string> CreateUnaryEchoStringCall(Channel channel)
{ {
return new Call<string, string>("/tests.Test/EmptyCall", return new Call<string, string>(unaryEchoStringMethod, channel);
(s) => System.Text.Encoding.ASCII.GetBytes(s), }
(b) => System.Text.Encoding.ASCII.GetString(b),
Timeout.InfiniteTimeSpan, channel); private void HandleUnaryEchoString(string request, IObserver<string> responseObserver) {
responseObserver.OnNext(request);
responseObserver.OnCompleted();
} }
} }
} }
...@@ -12,7 +12,7 @@ namespace Google.GRPC.Core.Tests ...@@ -12,7 +12,7 @@ namespace Google.GRPC.Core.Tests
Server server = new Server(); Server server = new Server();
server.AddPort("localhost:" + Utils.PickUnusedPort()); server.AddPort("localhost:" + Utils.PickUnusedPort());
server.Start(); server.Start();
server.Shutdown(); server.ShutdownAsync().Wait();
GrpcEnvironment.Shutdown(); GrpcEnvironment.Shutdown();
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment