diff --git a/src/csharp/GrpcCore/Call.cs b/src/csharp/GrpcCore/Call.cs index bf257e5d5981a1ccfc2775256c6c33e9d0501f6f..d3847a800917d6fae1ceca33338cb77b9e37eaa0 100644 --- a/src/csharp/GrpcCore/Call.cs +++ b/src/csharp/GrpcCore/Call.cs @@ -8,10 +8,8 @@ namespace Google.GRPC.Core readonly string methodName; readonly Func<TRequest, byte[]> requestSerializer; readonly Func<byte[], TResponse> responseDeserializer; - readonly TimeSpan timeout; readonly Channel channel; - // TODO: channel param should be removed in the future. public Call(string methodName, Func<TRequest, byte[]> requestSerializer, Func<byte[], TResponse> responseDeserializer, @@ -20,24 +18,22 @@ namespace Google.GRPC.Core this.methodName = methodName; this.requestSerializer = requestSerializer; this.responseDeserializer = responseDeserializer; - this.timeout = timeout; this.channel = channel; } - - public Channel Channel + public Call(Method<TRequest, TResponse> method, Channel channel) { - get - { - return this.channel; - } + this.methodName = method.Name; + this.requestSerializer = method.RequestMarshaller.Serialize; + this.responseDeserializer = method.ResponseMarshaller.Deserialize; + this.channel = channel; } - public TimeSpan Timeout + public Channel Channel { get { - return this.timeout; + return this.channel; } } diff --git a/src/csharp/GrpcCore/GrpcCore.csproj b/src/csharp/GrpcCore/GrpcCore.csproj index f0c84e78ea6c954251e7135fa260964e1eb569a4..2ad0f9154cc00e98563b06c6c7e25ec865f5d39d 100644 --- a/src/csharp/GrpcCore/GrpcCore.csproj +++ b/src/csharp/GrpcCore/GrpcCore.csproj @@ -54,6 +54,11 @@ <Compile Include="Internal\AsyncCall.cs" /> <Compile Include="Internal\ServerSafeHandle.cs" /> <Compile Include="Internal\StreamingInputObserver.cs" /> + <Compile Include="Method.cs" /> + <Compile Include="IMarshaller.cs" /> + <Compile Include="ServerCalls.cs" /> + <Compile Include="ServerCallHandler.cs" /> + <Compile Include="Internal\ServerWritingObserver.cs" /> </ItemGroup> <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" /> <ItemGroup> diff --git a/src/csharp/GrpcCore/IMarshaller.cs b/src/csharp/GrpcCore/IMarshaller.cs new file mode 100644 index 0000000000000000000000000000000000000000..eb08d8d3860dfb95238ca2f68075524cf23574f1 --- /dev/null +++ b/src/csharp/GrpcCore/IMarshaller.cs @@ -0,0 +1,31 @@ +using System; + +namespace Google.GRPC.Core +{ + /// <summary> + /// For serializing and deserializing messages. + /// </summary> + public interface IMarshaller<T> + { + byte[] Serialize(T value); + + T Deserialize(byte[] payload); + } + + /// <summary> + /// UTF-8 Marshalling for string. Useful for testing. + /// </summary> + internal class StringMarshaller : IMarshaller<string> { + + public byte[] Serialize(string value) + { + return System.Text.Encoding.UTF8.GetBytes(value); + } + + public string Deserialize(byte[] payload) + { + return System.Text.Encoding.UTF8.GetString(payload); + } + } +} + diff --git a/src/csharp/GrpcCore/Internal/AsyncCall.cs b/src/csharp/GrpcCore/Internal/AsyncCall.cs index e83ca0eaa9619bea73c4d0c7a011c376e3ff349e..c38363bb2bcce89cbbfe302527dc19d22f98069d 100644 --- a/src/csharp/GrpcCore/Internal/AsyncCall.cs +++ b/src/csharp/GrpcCore/Internal/AsyncCall.cs @@ -86,6 +86,14 @@ namespace Google.GRPC.Core.Internal return StartRead().Task; } + public Task Halfclosed + { + get + { + return halfcloseTcs.Task; + } + } + public Task<Status> Finished { get diff --git a/src/csharp/GrpcCore/Internal/ServerSafeHandle.cs b/src/csharp/GrpcCore/Internal/ServerSafeHandle.cs index 0d38bce63e3a5c35db82c384a4884cc91f7be3eb..08d4cf01927c85920bcea6bb640b950b77c0fe51 100644 --- a/src/csharp/GrpcCore/Internal/ServerSafeHandle.cs +++ b/src/csharp/GrpcCore/Internal/ServerSafeHandle.cs @@ -30,8 +30,8 @@ namespace Google.GRPC.Core.Internal [DllImport("libgrpc.so")] static extern void grpc_server_shutdown(ServerSafeHandle server); - [DllImport("libgrpc.so")] - static extern void grpc_server_shutdown_and_notify(ServerSafeHandle server, IntPtr tag); + [DllImport("libgrpc.so", EntryPoint = "grpc_server_shutdown_and_notify")] + static extern void grpc_server_shutdown_and_notify_CALLBACK(ServerSafeHandle server, [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate callback); [DllImport("libgrpc.so")] static extern void grpc_server_destroy(IntPtr server); @@ -62,6 +62,11 @@ namespace Google.GRPC.Core.Internal grpc_server_shutdown(this); } + public void ShutdownAndNotify(EventCallbackDelegate callback) + { + grpc_server_shutdown_and_notify_CALLBACK(this, callback); + } + public GRPCCallError RequestCall(EventCallbackDelegate callback) { return grpc_server_request_call_old_CALLBACK(this, callback); diff --git a/src/csharp/GrpcCore/Internal/ServerWritingObserver.cs b/src/csharp/GrpcCore/Internal/ServerWritingObserver.cs new file mode 100644 index 0000000000000000000000000000000000000000..2b46e9c53d6c9074c02be99e9c88280cacc93a58 --- /dev/null +++ b/src/csharp/GrpcCore/Internal/ServerWritingObserver.cs @@ -0,0 +1,38 @@ +using System; +using Google.GRPC.Core.Internal; + +namespace Google.GRPC.Core.Internal +{ + /// <summary> + /// Observer that writes all arriving messages to a call abstraction (in blocking fashion) + /// and then halfcloses the call. Used for server-side call handling. + /// </summary> + internal class ServerWritingObserver<TWrite, TRead> : IObserver<TWrite> + { + readonly AsyncCall<TWrite, TRead> call; + + public ServerWritingObserver(AsyncCall<TWrite, TRead> call) + { + this.call = call; + } + + public void OnCompleted() + { + // TODO: how bad is the Wait here? + call.WriteStatusAsync(new Status(StatusCode.GRPC_STATUS_OK, "")).Wait(); + } + + public void OnError(Exception error) + { + // TODO: handle this... + throw new InvalidOperationException("This should never be called."); + } + + public void OnNext(TWrite value) + { + // TODO: how bad is the Wait here? + call.WriteAsync(value).Wait(); + } + } +} + diff --git a/src/csharp/GrpcCore/Internal/StreamingInputObserver.cs b/src/csharp/GrpcCore/Internal/StreamingInputObserver.cs index d483e53a2db1e43440903cfa10e2366d10896ad7..c5de979351a14058405d685914a81d41d2a733d5 100644 --- a/src/csharp/GrpcCore/Internal/StreamingInputObserver.cs +++ b/src/csharp/GrpcCore/Internal/StreamingInputObserver.cs @@ -1,7 +1,7 @@ using System; using Google.GRPC.Core.Internal; -namespace Google.GRPC.Core +namespace Google.GRPC.Core.Internal { internal class StreamingInputObserver<TWrite, TRead> : IObserver<TWrite> { diff --git a/src/csharp/GrpcCore/Method.cs b/src/csharp/GrpcCore/Method.cs new file mode 100644 index 0000000000000000000000000000000000000000..27901156950edc954016178627f0a545132e37e1 --- /dev/null +++ b/src/csharp/GrpcCore/Method.cs @@ -0,0 +1,64 @@ +using System; + +namespace Google.GRPC.Core +{ + public enum MethodType + { + Unary, + ClientStreaming, + ServerStreaming, + DuplexStreaming + } + + /// <summary> + /// A description of a service method. + /// </summary> + public class Method<TRequest, TResponse> + { + readonly MethodType type; + readonly string name; + readonly IMarshaller<TRequest> requestMarshaller; + readonly IMarshaller<TResponse> responseMarshaller; + + public Method(MethodType type, string name, IMarshaller<TRequest> requestMarshaller, IMarshaller<TResponse> responseMarshaller) + { + this.type = type; + this.name = name; + this.requestMarshaller = requestMarshaller; + this.responseMarshaller = responseMarshaller; + } + + public MethodType Type + { + get + { + return this.type; + } + } + + public string Name + { + get + { + return this.name; + } + } + + public IMarshaller<TRequest> RequestMarshaller + { + get + { + return this.requestMarshaller; + } + } + + public IMarshaller<TResponse> ResponseMarshaller + { + get + { + return this.responseMarshaller; + } + } + } +} + diff --git a/src/csharp/GrpcCore/Server.cs b/src/csharp/GrpcCore/Server.cs index 68da1a8300813a98ad76aef285e60850295f3080..4e9d114f850af8ab1acc35608a9cd79aa6d82d19 100644 --- a/src/csharp/GrpcCore/Server.cs +++ b/src/csharp/GrpcCore/Server.cs @@ -1,7 +1,9 @@ using System; using System.Runtime.InteropServices; using System.Diagnostics; +using System.Threading.Tasks; using System.Collections.Concurrent; +using System.Collections.Generic; using Google.GRPC.Core.Internal; namespace Google.GRPC.Core @@ -15,10 +17,15 @@ namespace Google.GRPC.Core // TODO: make sure the delegate doesn't get garbage collected while // native callbacks are in the completion queue. readonly EventCallbackDelegate newRpcHandler; + readonly EventCallbackDelegate serverShutdownHandler; readonly BlockingCollection<NewRpcInfo> newRpcQueue = new BlockingCollection<NewRpcInfo>(); readonly ServerSafeHandle handle; + readonly Dictionary<string, IServerCallHandler> callHandlers = new Dictionary<string, IServerCallHandler>(); + + readonly TaskCompletionSource<object> shutdownTcs = new TaskCompletionSource<object>(); + static Server() { GrpcEnvironment.EnsureInitialized(); } @@ -28,8 +35,14 @@ namespace Google.GRPC.Core // TODO: what is the tag for server shutdown? this.handle = ServerSafeHandle.NewServer(GetCompletionQueue(), IntPtr.Zero); this.newRpcHandler = HandleNewRpc; + this.serverShutdownHandler = HandleServerShutdown; } + // only call before Start(), this will be in server builder in the future. + internal void AddCallHandler(string methodName, IServerCallHandler handler) { + callHandlers.Add(methodName, handler); + } + // only call before Start() public int AddPort(string addr) { return handle.AddPort(addr); } @@ -37,49 +50,57 @@ namespace Google.GRPC.Core public void Start() { handle.Start(); + + // TODO: this basically means the server is single threaded.... + StartHandlingRpcs(); } - public void RunRpc() + /// <summary> + /// Requests and handles single RPC call. + /// </summary> + internal void RunRpc() { AllowOneRpc(); - try { - var rpcInfo = newRpcQueue.Take(); - - Console.WriteLine("Server received RPC " + rpcInfo.Method); - - AsyncCall<byte[], byte[]> asyncCall = new AsyncCall<byte[], byte[]>( - (payload) => payload, (payload) => payload); - - asyncCall.InitializeServer(rpcInfo.Call); + try + { + var rpcInfo = newRpcQueue.Take(); - asyncCall.Accept(GetCompletionQueue()); + Console.WriteLine("Server received RPC " + rpcInfo.Method); - while(true) { - byte[] payload = asyncCall.ReadAsync().Result; - if (payload == null) + IServerCallHandler callHandler; + if (!callHandlers.TryGetValue(rpcInfo.Method, out callHandler)) { - break; - } + callHandler = new NoSuchMethodCallHandler(); + } + callHandler.StartCall(rpcInfo.Method, rpcInfo.Call, GetCompletionQueue()); } - - asyncCall.WriteAsync(new byte[] { }).Wait(); - - // TODO: what should be the details? - asyncCall.WriteStatusAsync(new Status(StatusCode.GRPC_STATUS_OK, "")).Wait(); - - asyncCall.Finished.Wait(); - } catch(Exception e) { + catch(Exception e) + { Console.WriteLine("Exception while handling RPC: " + e); } } - // TODO: implement disposal properly... - public void Shutdown() { - handle.Shutdown(); + /// <summary> + /// Requests server shutdown and when there are no more calls being serviced, + /// cleans up used resources. + /// </summary> + /// <returns>The async.</returns> + public async Task ShutdownAsync() { + handle.ShutdownAndNotify(serverShutdownHandler); + await shutdownTcs.Task; + handle.Dispose(); + } + public void Kill() { + handle.Dispose(); + } - //handle.Dispose(); + private async Task StartHandlingRpcs() { + while (true) + { + await Task.Factory.StartNew(RunRpc); + } } private void AllowOneRpc() @@ -100,6 +121,18 @@ namespace Google.GRPC.Core } } + private void HandleServerShutdown(IntPtr eventPtr) + { + try + { + shutdownTcs.SetResult(null); + } + catch (Exception e) + { + Console.WriteLine("Caught exception in a native handler: " + e); + } + } + private static void AssertCallOk(GRPCCallError callError) { Trace.Assert(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK"); diff --git a/src/csharp/GrpcCore/ServerCallHandler.cs b/src/csharp/GrpcCore/ServerCallHandler.cs new file mode 100644 index 0000000000000000000000000000000000000000..08d527a019f991842d201d1d13981f19f95404be --- /dev/null +++ b/src/csharp/GrpcCore/ServerCallHandler.cs @@ -0,0 +1,93 @@ +using System; +using Google.GRPC.Core.Internal; + +namespace Google.GRPC.Core +{ + internal interface IServerCallHandler + { + void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq); + } + + internal class UnaryRequestServerCallHandler<TRequest, TResponse> : IServerCallHandler + { + readonly Method<TRequest, TResponse> method; + readonly UnaryRequestServerMethod<TRequest, TResponse> handler; + + public UnaryRequestServerCallHandler(Method<TRequest, TResponse> method, UnaryRequestServerMethod<TRequest, TResponse> handler) + { + this.method = method; + this.handler = handler; + } + + public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq) + { + var asyncCall = new AsyncCall<TResponse, TRequest>( + (msg) => method.ResponseMarshaller.Serialize(msg), + (payload) => method.RequestMarshaller.Deserialize(payload)); + + asyncCall.InitializeServer(call); + asyncCall.Accept(cq); + + var request = asyncCall.ReadAsync().Result; + + var responseObserver = new ServerWritingObserver<TResponse, TRequest>(asyncCall); + handler(request, responseObserver); + + asyncCall.Halfclosed.Wait(); + // TODO: wait until writing is finished + + asyncCall.WriteStatusAsync(new Status(StatusCode.GRPC_STATUS_OK, "")).Wait(); + asyncCall.Finished.Wait(); + } + } + + internal class StreamingRequestServerCallHandler<TRequest, TResponse> : IServerCallHandler + { + readonly Method<TRequest, TResponse> method; + readonly StreamingRequestServerMethod<TRequest, TResponse> handler; + + public StreamingRequestServerCallHandler(Method<TRequest, TResponse> method, StreamingRequestServerMethod<TRequest, TResponse> handler) + { + this.method = method; + this.handler = handler; + } + + public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq) + { + var asyncCall = new AsyncCall<TResponse, TRequest>( + (msg) => method.ResponseMarshaller.Serialize(msg), + (payload) => method.RequestMarshaller.Deserialize(payload)); + + asyncCall.InitializeServer(call); + asyncCall.Accept(cq); + + var responseObserver = new ServerWritingObserver<TResponse, TRequest>(asyncCall); + var requestObserver = handler(responseObserver); + + // feed the requests + asyncCall.StartReadingToStream(requestObserver); + + asyncCall.Halfclosed.Wait(); + + asyncCall.WriteStatusAsync(new Status(StatusCode.GRPC_STATUS_OK, "")).Wait(); + asyncCall.Finished.Wait(); + } + } + + internal class NoSuchMethodCallHandler : IServerCallHandler + { + public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq) + { + // We don't care about the payload type here. + AsyncCall<byte[], byte[]> asyncCall = new AsyncCall<byte[], byte[]>( + (payload) => payload, (payload) => payload); + + asyncCall.InitializeServer(call); + asyncCall.Accept(cq); + asyncCall.WriteStatusAsync(new Status(StatusCode.GRPC_STATUS_UNIMPLEMENTED, "No such method.")).Wait(); + + asyncCall.Finished.Wait(); + } + } +} + diff --git a/src/csharp/GrpcCore/ServerCalls.cs b/src/csharp/GrpcCore/ServerCalls.cs new file mode 100644 index 0000000000000000000000000000000000000000..86c47189320259e9dd07c16a8cf2a5e139227931 --- /dev/null +++ b/src/csharp/GrpcCore/ServerCalls.cs @@ -0,0 +1,25 @@ +using System; + +namespace Google.GRPC.Core +{ + // TODO: perhaps add also serverSideStreaming and clientSideStreaming + + public delegate void UnaryRequestServerMethod<TRequest, TResponse> (TRequest request, IObserver<TResponse> responseObserver); + + public delegate IObserver<TRequest> StreamingRequestServerMethod<TRequest, TResponse> (IObserver<TResponse> responseObserver); + + internal static class ServerCalls { + + public static IServerCallHandler UnaryRequestCall<TRequest, TResponse>(Method<TRequest, TResponse> method, UnaryRequestServerMethod<TRequest, TResponse> handler) + { + return new UnaryRequestServerCallHandler<TRequest, TResponse>(method, handler); + } + + public static IServerCallHandler StreamingRequestCall<TRequest, TResponse>(Method<TRequest, TResponse> method, StreamingRequestServerMethod<TRequest, TResponse> handler) + { + return new StreamingRequestServerCallHandler<TRequest, TResponse>(method, handler); + } + + } +} + diff --git a/src/csharp/GrpcCoreTests/ClientServerTest.cs b/src/csharp/GrpcCoreTests/ClientServerTest.cs index 823ee942882993a521df18f95608bf48f2bc2fd5..511683b00386b88ee610fa70cba244aa4c663027 100644 --- a/src/csharp/GrpcCoreTests/ClientServerTest.cs +++ b/src/csharp/GrpcCoreTests/ClientServerTest.cs @@ -8,41 +8,48 @@ namespace Google.GRPC.Core.Tests { public class ClientServerTest { - string request = "REQUEST"; string serverAddr = "localhost:" + Utils.PickUnusedPort(); + private Method<string, string> unaryEchoStringMethod = new Method<string, string>( + MethodType.Unary, + "/tests.Test/UnaryEchoString", + new StringMarshaller(), + new StringMarshaller()); + [Test] public void EmptyCall() { Server server = new Server(); + + server.AddCallHandler(unaryEchoStringMethod.Name, + ServerCalls.UnaryRequestCall(unaryEchoStringMethod, HandleUnaryEchoString)); + server.AddPort(serverAddr); server.Start(); - Task.Factory.StartNew( - () => { - server.RunRpc(); - } - ); - using (Channel channel = new Channel(serverAddr)) { - CreateCall(channel); - string response = Calls.BlockingUnaryCall(CreateCall(channel), request, default(CancellationToken)); - Console.WriteLine("Received response: " + response); + var call = CreateUnaryEchoStringCall(channel); + + Assert.AreEqual("ABC", Calls.BlockingUnaryCall(call, "ABC", default(CancellationToken))); + Assert.AreEqual("abcdef", Calls.BlockingUnaryCall(call, "abcdef", default(CancellationToken))); } - server.Shutdown(); + server.ShutdownAsync().Wait(); GrpcEnvironment.Shutdown(); } - private Call<string, string> CreateCall(Channel channel) + private Call<string, string> CreateUnaryEchoStringCall(Channel channel) { - return new Call<string, string>("/tests.Test/EmptyCall", - (s) => System.Text.Encoding.ASCII.GetBytes(s), - (b) => System.Text.Encoding.ASCII.GetString(b), - Timeout.InfiniteTimeSpan, channel); + return new Call<string, string>(unaryEchoStringMethod, channel); + } + + private void HandleUnaryEchoString(string request, IObserver<string> responseObserver) { + responseObserver.OnNext(request); + responseObserver.OnCompleted(); } + } } diff --git a/src/csharp/GrpcCoreTests/ServerTest.cs b/src/csharp/GrpcCoreTests/ServerTest.cs index b34101bbf597e7baa50b1bf94bc5cfac8a5dbb04..e6de95c3363292aada89742d61293d27f0f79f93 100644 --- a/src/csharp/GrpcCoreTests/ServerTest.cs +++ b/src/csharp/GrpcCoreTests/ServerTest.cs @@ -12,7 +12,7 @@ namespace Google.GRPC.Core.Tests Server server = new Server(); server.AddPort("localhost:" + Utils.PickUnusedPort()); server.Start(); - server.Shutdown(); + server.ShutdownAsync().Wait(); GrpcEnvironment.Shutdown(); }