From 337a2ddba59563e7370b133d63ab8bd9ebeb7232 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch <jtattermusch@google.com> Date: Fri, 13 Feb 2015 15:41:41 -0800 Subject: [PATCH] migration to new C API --- src/csharp/GrpcApi/MathGrpc.cs | 6 +- src/csharp/GrpcApi/TestServiceGrpc.cs | 6 +- src/csharp/GrpcCore/Calls.cs | 58 +-- src/csharp/GrpcCore/GrpcCore.csproj | 6 +- src/csharp/GrpcCore/Internal/AsyncCall.cs | 493 ++++++++++-------- .../Internal/BatchContextSafeHandle.cs | 96 ++++ .../GrpcCore/Internal/CallSafeHandle.cs | 138 ++--- ...ver.cs => ClientStreamingInputObserver.cs} | 15 +- .../Internal/CompletionQueueSafeHandle.cs | 16 - src/csharp/GrpcCore/Internal/Event.cs | 224 -------- .../GrpcCore/Internal/GrpcThreadPool.cs | 47 +- .../Internal/SafeHandleZeroIsInvalid.cs | 6 + .../GrpcCore/Internal/ServerSafeHandle.cs | 16 +- ...er.cs => ServerStreamingOutputObserver.cs} | 10 +- src/csharp/GrpcCore/Server.cs | 31 +- src/csharp/GrpcCore/ServerCallHandler.cs | 28 +- src/csharp/GrpcCoreTests/ClientServerTest.cs | 67 ++- src/csharp/ext/grpc_csharp_ext.c | 365 +++++++++++++ 18 files changed, 953 insertions(+), 675 deletions(-) create mode 100644 src/csharp/GrpcCore/Internal/BatchContextSafeHandle.cs rename src/csharp/GrpcCore/Internal/{StreamingInputObserver.cs => ClientStreamingInputObserver.cs} (88%) delete mode 100644 src/csharp/GrpcCore/Internal/Event.cs rename src/csharp/GrpcCore/Internal/{ServerWritingObserver.cs => ServerStreamingOutputObserver.cs} (87%) diff --git a/src/csharp/GrpcApi/MathGrpc.cs b/src/csharp/GrpcApi/MathGrpc.cs index caea1608ec..44e704e496 100644 --- a/src/csharp/GrpcApi/MathGrpc.cs +++ b/src/csharp/GrpcApi/MathGrpc.cs @@ -81,7 +81,7 @@ namespace math Task<DivReply> DivAsync(DivArgs request, CancellationToken token = default(CancellationToken)); - Task Fib(FibArgs request, IObserver<Num> responseObserver, CancellationToken token = default(CancellationToken)); + void Fib(FibArgs request, IObserver<Num> responseObserver, CancellationToken token = default(CancellationToken)); ClientStreamingAsyncResult<Num, Num> Sum(CancellationToken token = default(CancellationToken)); @@ -109,10 +109,10 @@ namespace math return Calls.AsyncUnaryCall(call, request, token); } - public Task Fib(FibArgs request, IObserver<Num> responseObserver, CancellationToken token = default(CancellationToken)) + public void Fib(FibArgs request, IObserver<Num> responseObserver, CancellationToken token = default(CancellationToken)) { var call = new Google.GRPC.Core.Call<FibArgs, Num>(fibMethod, channel); - return Calls.AsyncServerStreamingCall(call, request, responseObserver, token); + Calls.AsyncServerStreamingCall(call, request, responseObserver, token); } public ClientStreamingAsyncResult<Num, Num> Sum(CancellationToken token = default(CancellationToken)) diff --git a/src/csharp/GrpcApi/TestServiceGrpc.cs b/src/csharp/GrpcApi/TestServiceGrpc.cs index 6534a44ef4..64d5c09563 100644 --- a/src/csharp/GrpcApi/TestServiceGrpc.cs +++ b/src/csharp/GrpcApi/TestServiceGrpc.cs @@ -99,7 +99,7 @@ namespace grpc.testing Task<SimpleResponse> UnaryCallAsync(SimpleRequest request, CancellationToken token = default(CancellationToken)); - Task StreamingOutputCall(StreamingOutputCallRequest request, IObserver<StreamingOutputCallResponse> responseObserver, CancellationToken token = default(CancellationToken)); + void StreamingOutputCall(StreamingOutputCallRequest request, IObserver<StreamingOutputCallResponse> responseObserver, CancellationToken token = default(CancellationToken)); ClientStreamingAsyncResult<StreamingInputCallRequest, StreamingInputCallResponse> StreamingInputCall(CancellationToken token = default(CancellationToken)); @@ -141,9 +141,9 @@ namespace grpc.testing return Calls.AsyncUnaryCall(call, request, token); } - public Task StreamingOutputCall(StreamingOutputCallRequest request, IObserver<StreamingOutputCallResponse> responseObserver, CancellationToken token = default(CancellationToken)) { + public void StreamingOutputCall(StreamingOutputCallRequest request, IObserver<StreamingOutputCallResponse> responseObserver, CancellationToken token = default(CancellationToken)) { var call = new Google.GRPC.Core.Call<StreamingOutputCallRequest, StreamingOutputCallResponse>(streamingOutputCallMethod, channel); - return Calls.AsyncServerStreamingCall(call, request, responseObserver, token); + Calls.AsyncServerStreamingCall(call, request, responseObserver, token); } public ClientStreamingAsyncResult<StreamingInputCallRequest, StreamingInputCallResponse> StreamingInputCall(CancellationToken token = default(CancellationToken)) diff --git a/src/csharp/GrpcCore/Calls.cs b/src/csharp/GrpcCore/Calls.cs index d89d9a16f9..e5ddd879d6 100644 --- a/src/csharp/GrpcCore/Calls.cs +++ b/src/csharp/GrpcCore/Calls.cs @@ -47,50 +47,42 @@ namespace Google.GRPC.Core { public static TResponse BlockingUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, CancellationToken token) { - //TODO: implement this in real synchronous style once new GRPC C core API is available. - return AsyncUnaryCall(call, req, token).Result; + //TODO: implement this in real synchronous style. + try { + return AsyncUnaryCall(call, req, token).Result; + } catch(AggregateException ae) { + foreach (var e in ae.InnerExceptions) + { + if (e is RpcException) + { + throw e; + } + } + throw; + } } public static async Task<TResponse> AsyncUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, CancellationToken token) { var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestSerializer, call.ResponseDeserializer); - asyncCall.Initialize(call.Channel, call.MethodName); - asyncCall.Start(false, GetCompletionQueue()); - - await asyncCall.WriteAsync(req); - await asyncCall.WritesCompletedAsync(); - - TResponse response = await asyncCall.ReadAsync(); - - Status status = await asyncCall.Finished; - - if (status.StatusCode != StatusCode.GRPC_STATUS_OK) - { - throw new RpcException(status); - } - return response; + asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.MethodName); + return await asyncCall.UnaryCallAsync(req); } - public static async Task AsyncServerStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, IObserver<TResponse> outputs, CancellationToken token) + public static void AsyncServerStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, IObserver<TResponse> outputs, CancellationToken token) { var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestSerializer, call.ResponseDeserializer); - asyncCall.Initialize(call.Channel, call.MethodName); - asyncCall.Start(false, GetCompletionQueue()); - asyncCall.StartReadingToStream(outputs); - - await asyncCall.WriteAsync(req); - await asyncCall.WritesCompletedAsync(); + asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.MethodName); + asyncCall.StartServerStreamingCall(req, outputs); } public static ClientStreamingAsyncResult<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, CancellationToken token) { var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestSerializer, call.ResponseDeserializer); - asyncCall.Initialize(call.Channel, call.MethodName); - asyncCall.Start(false, GetCompletionQueue()); - - var task = asyncCall.ReadAsync(); - var inputs = new StreamingInputObserver<TRequest, TResponse>(asyncCall); + asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.MethodName); + var task = asyncCall.ClientStreamingCallAsync(); + var inputs = new ClientStreamingInputObserver<TRequest, TResponse>(asyncCall); return new ClientStreamingAsyncResult<TRequest, TResponse>(task, inputs); } @@ -102,12 +94,10 @@ namespace Google.GRPC.Core public static IObserver<TRequest> DuplexStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, IObserver<TResponse> outputs, CancellationToken token) { var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestSerializer, call.ResponseDeserializer); - asyncCall.Initialize(call.Channel, call.MethodName); - asyncCall.Start(false, GetCompletionQueue()); + asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.MethodName); - asyncCall.StartReadingToStream(outputs); - var inputs = new StreamingInputObserver<TRequest, TResponse>(asyncCall); - return inputs; + asyncCall.StartDuplexStreamingCall(outputs); + return new ClientStreamingInputObserver<TRequest, TResponse>(asyncCall); } private static CompletionQueueSafeHandle GetCompletionQueue() { diff --git a/src/csharp/GrpcCore/GrpcCore.csproj b/src/csharp/GrpcCore/GrpcCore.csproj index 34b9f6dfb8..a574f181c8 100644 --- a/src/csharp/GrpcCore/GrpcCore.csproj +++ b/src/csharp/GrpcCore/GrpcCore.csproj @@ -47,21 +47,21 @@ <Compile Include="Internal\ChannelSafeHandle.cs" /> <Compile Include="Internal\CompletionQueueSafeHandle.cs" /> <Compile Include="Internal\Enums.cs" /> - <Compile Include="Internal\Event.cs" /> <Compile Include="Internal\SafeHandleZeroIsInvalid.cs" /> <Compile Include="Internal\Timespec.cs" /> <Compile Include="Internal\GrpcThreadPool.cs" /> <Compile Include="Internal\AsyncCall.cs" /> <Compile Include="Internal\ServerSafeHandle.cs" /> - <Compile Include="Internal\StreamingInputObserver.cs" /> <Compile Include="Method.cs" /> <Compile Include="ServerCalls.cs" /> <Compile Include="ServerCallHandler.cs" /> - <Compile Include="Internal\ServerWritingObserver.cs" /> <Compile Include="Marshaller.cs" /> <Compile Include="ServerServiceDefinition.cs" /> <Compile Include="Utils\RecordingObserver.cs" /> <Compile Include="Utils\RecordingQueue.cs" /> + <Compile Include="Internal\BatchContextSafeHandle.cs" /> + <Compile Include="Internal\ClientStreamingInputObserver.cs" /> + <Compile Include="Internal\ServerStreamingOutputObserver.cs" /> </ItemGroup> <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" /> <ItemGroup> diff --git a/src/csharp/GrpcCore/Internal/AsyncCall.cs b/src/csharp/GrpcCore/Internal/AsyncCall.cs index d5f3239e1e..ae7428978e 100644 --- a/src/csharp/GrpcCore/Internal/AsyncCall.cs +++ b/src/csharp/GrpcCore/Internal/AsyncCall.cs @@ -2,11 +2,11 @@ // Copyright 2015, Google Inc. // All rights reserved. -// +// // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are // met: -// +// // * Redistributions of source code must retain the above copyright // notice, this list of conditions and the following disclaimer. // * Redistributions in binary form must reproduce the above @@ -16,7 +16,7 @@ // * Neither the name of Google Inc. nor the names of its // contributors may be used to endorse or promote products derived from // this software without specific prior written permission. -// +// // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR @@ -41,39 +41,28 @@ using Google.GRPC.Core.Internal; namespace Google.GRPC.Core.Internal { - /// <summary> - /// Listener for call events that can be delivered from a completion queue. - /// </summary> - internal interface ICallEventListener { - - void OnClientMetadata(); - - void OnRead(byte[] payload); - - void OnWriteAccepted(GRPCOpError error); - - void OnFinishAccepted(GRPCOpError error); - - // ignore the status on server - void OnFinished(Status status); - } - /// <summary> /// Handle native call lifecycle and provides convenience methods. /// </summary> - internal class AsyncCall<TWrite, TRead>: ICallEventListener, IDisposable + internal class AsyncCall<TWrite, TRead> : IDisposable { readonly Func<TWrite, byte[]> serializer; readonly Func<byte[], TRead> deserializer; - // TODO: make sure the delegate doesn't get garbage collected while + // TODO: make sure the delegate doesn't get garbage collected while // native callbacks are in the completion queue. - readonly EventCallbackDelegate callbackHandler; + readonly CompletionCallbackDelegate unaryResponseHandler; + readonly CompletionCallbackDelegate finishedHandler; + readonly CompletionCallbackDelegate writeFinishedHandler; + readonly CompletionCallbackDelegate readFinishedHandler; + readonly CompletionCallbackDelegate halfclosedHandler; + readonly CompletionCallbackDelegate finishedServersideHandler; object myLock = new object(); bool disposed; CallSafeHandle call; + bool server; bool started; bool errorOccured; @@ -85,54 +74,25 @@ namespace Google.GRPC.Core.Internal TaskCompletionSource<object> writeTcs; TaskCompletionSource<TRead> readTcs; + + TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>(); TaskCompletionSource<object> halfcloseTcs = new TaskCompletionSource<object>(); TaskCompletionSource<Status> finishedTcs = new TaskCompletionSource<Status>(); + TaskCompletionSource<TRead> unaryResponseTcs; + IObserver<TRead> readObserver; public AsyncCall(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer) { this.serializer = serializer; this.deserializer = deserializer; - this.callbackHandler = HandleEvent; - } - - public Task WriteAsync(TWrite msg) - { - return StartWrite(msg, false).Task; - } - - public Task WritesCompletedAsync() - { - WritesDone(); - return halfcloseTcs.Task; - } - - public Task WriteStatusAsync(Status status) - { - WriteStatus(status); - return halfcloseTcs.Task; - } - - public Task<TRead> ReadAsync() - { - return StartRead().Task; - } - - public Task Halfclosed - { - get - { - return halfcloseTcs.Task; - } - } - - public Task<Status> Finished - { - get - { - return finishedTcs.Task; - } + this.unaryResponseHandler = HandleUnaryResponseCompletion; + this.finishedHandler = HandleFinished; + this.writeFinishedHandler = HandleWriteFinished; + this.readFinishedHandler = HandleReadFinished; + this.halfclosedHandler = HandleHalfclosed; + this.finishedServersideHandler = HandleFinishedServerside; } /// <summary> @@ -147,14 +107,14 @@ namespace Google.GRPC.Core.Internal throw new InvalidOperationException("Already registered an observer."); } this.readObserver = readObserver; - StartRead(); + ReceiveMessageAsync(); } } - public void Initialize(Channel channel, String methodName) { + public void Initialize(Channel channel, CompletionQueueSafeHandle cq, String methodName) { lock (myLock) { - this.call = CallSafeHandle.Create(channel.Handle, methodName, channel.Target, Timespec.InfFuture); + this.call = CallSafeHandle.Create(channel.Handle, cq, methodName, channel.Target, Timespec.InfFuture); } } @@ -163,42 +123,75 @@ namespace Google.GRPC.Core.Internal lock(myLock) { this.call = call; + started = true; + server = true; } } - // Client only - public void Start(bool buffered, CompletionQueueSafeHandle cq) + + public Task<TRead> UnaryCallAsync(TWrite msg) { lock (myLock) { - if (started) - { - throw new InvalidOperationException("Already started."); - } - - call.Invoke(cq, buffered, callbackHandler, callbackHandler); started = true; + halfcloseRequested = true; + + // TODO: handle serialization error... + byte[] payload = serializer(msg); + + unaryResponseTcs = new TaskCompletionSource<TRead>(); + call.StartUnary(payload, unaryResponseHandler); + + return unaryResponseTcs.Task; } } - // Server only - public void Accept(CompletionQueueSafeHandle cq) + public Task<TRead> ClientStreamingCallAsync() { lock (myLock) { - if (started) - { - throw new InvalidOperationException("Already started."); - } + started = true; + + unaryResponseTcs = new TaskCompletionSource<TRead>(); + call.StartClientStreaming(unaryResponseHandler); + + return unaryResponseTcs.Task; + } + } - call.ServerAccept(cq, callbackHandler); - call.ServerEndInitialMetadata(0); + public void StartServerStreamingCall(TWrite msg, IObserver<TRead> readObserver) + { + lock (myLock) + { started = true; + halfcloseRequested = true; + + this.readObserver = readObserver; + + // TODO: handle serialization error... + byte[] payload = serializer(msg); + + call.StartServerStreaming(payload, finishedHandler); + + ReceiveMessageAsync(); } } - public TaskCompletionSource<object> StartWrite(TWrite msg, bool buffered) + public void StartDuplexStreamingCall(IObserver<TRead> readObserver) { + lock (myLock) + { + started = true; + + this.readObserver = readObserver; + + call.StartDuplexStreaming(finishedHandler); + + ReceiveMessageAsync(); + } + } + + public Task SendMessageAsync(TWrite msg) { lock (myLock) { CheckStarted(); @@ -219,14 +212,13 @@ namespace Google.GRPC.Core.Internal // TODO: wrap serialization... byte[] payload = serializer(msg); - call.StartWrite(payload, buffered, callbackHandler); + call.StartSendMessage(payload, writeFinishedHandler); writeTcs = new TaskCompletionSource<object>(); - return writeTcs; + return writeTcs.Task; } } - // client only - public void WritesDone() + public Task SendCloseFromClientAsync() { lock (myLock) { @@ -240,13 +232,13 @@ namespace Google.GRPC.Core.Internal throw new InvalidOperationException("Already halfclosed."); } - call.WritesDone(callbackHandler); + call.StartSendCloseFromClient(halfclosedHandler); halfcloseRequested = true; + return halfcloseTcs.Task; } } - // server only - public void WriteStatus(Status status) + public Task SendStatusFromServerAsync(Status status) { lock (myLock) { @@ -260,12 +252,13 @@ namespace Google.GRPC.Core.Internal throw new InvalidOperationException("Already halfclosed."); } - call.StartWriteStatus(status, callbackHandler); + call.StartSendStatusFromServer(status, halfclosedHandler); halfcloseRequested = true; + return halfcloseTcs.Task; } } - public TaskCompletionSource<TRead> StartRead() + public Task<TRead> ReceiveMessageAsync() { lock (myLock) { @@ -285,10 +278,19 @@ namespace Google.GRPC.Core.Internal throw new InvalidOperationException("Only one read can be pending at a time"); } - call.StartRead(callbackHandler); + call.StartReceiveMessage(readFinishedHandler); readTcs = new TaskCompletionSource<TRead>(); - return readTcs; + return readTcs.Task; + } + } + + internal Task StartServerSide() + { + lock (myLock) + { + call.StartServerSide(finishedServersideHandler); + return finishedServersideTcs.Task; } } @@ -317,107 +319,7 @@ namespace Google.GRPC.Core.Internal // grpc_call_cancel_with_status is threadsafe call.CancelWithStatus(status); } - - public void OnClientMetadata() - { - // TODO: implement.... - } - - public void OnRead(byte[] payload) - { - TaskCompletionSource<TRead> oldTcs = null; - IObserver<TRead> observer = null; - lock (myLock) - { - oldTcs = readTcs; - readTcs = null; - if (payload == null) - { - doneWithReading = true; - } - observer = readObserver; - } - - // TODO: wrap deserialization... - TRead msg = payload != null ? deserializer(payload) : default(TRead); - - oldTcs.SetResult(msg); - - // TODO: make sure we deliver reads in the right order. - - if (observer != null) - { - if (payload != null) - { - // TODO: wrap to handle exceptions - observer.OnNext(msg); - - // start a new read - StartRead(); - } - else - { - // TODO: wrap to handle exceptions; - observer.OnCompleted(); - } - - } - } - - public void OnWriteAccepted(GRPCOpError error) - { - TaskCompletionSource<object> oldTcs = null; - lock (myLock) - { - UpdateErrorOccured(error); - oldTcs = writeTcs; - writeTcs = null; - } - - if (errorOccured) - { - // TODO: use the right type of exception... - oldTcs.SetException(new Exception("Write failed")); - } - else - { - // TODO: where does the continuation run? - oldTcs.SetResult(null); - } - } - - public void OnFinishAccepted(GRPCOpError error) - { - lock (myLock) - { - UpdateErrorOccured(error); - halfclosed = true; - } - - if (errorOccured) - { - halfcloseTcs.SetException(new Exception("Halfclose failed")); - - } - else - { - halfcloseTcs.SetResult(null); - } - - } - - public void OnFinished(Status status) - { - lock (myLock) - { - finishedStatus = status; - - DisposeResourcesIfNeeded(); - } - finishedTcs.SetResult(status); - - } - + public void Dispose() { Dispose(true); @@ -434,7 +336,7 @@ namespace Google.GRPC.Core.Internal { call.Dispose(); } - } + } disposed = true; } } @@ -489,38 +391,195 @@ namespace Google.GRPC.Core.Internal } } - private void HandleEvent(IntPtr eventPtr) { + private void CompleteStreamObserver(Status status) { + if (status.StatusCode != StatusCode.GRPC_STATUS_OK) + { + // TODO: wrap to handle exceptions; + readObserver.OnError(new RpcException(status)); + } else { + // TODO: wrap to handle exceptions; + readObserver.OnCompleted(); + } + } + + private void HandleUnaryResponseCompletion(GRPCOpError error, IntPtr batchContextPtr) { + try { + + TaskCompletionSource<TRead> tcs; + lock(myLock) { + tcs = unaryResponseTcs; + } + + // we're done with this call, get rid of the native object. + call.Dispose(); + + var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr); + + if (error != GRPCOpError.GRPC_OP_OK) { + tcs.SetException(new RpcException( + new Status(StatusCode.GRPC_STATUS_INTERNAL, "Internal error occured.") + )); + return; + } + + var status = ctx.GetReceivedStatus(); + if (status.StatusCode != StatusCode.GRPC_STATUS_OK) { + tcs.SetException(new RpcException(status)); + return; + } + + // TODO: handle deserialize error... + var msg = deserializer(ctx.GetReceivedMessage()); + tcs.SetResult(msg); + } catch(Exception e) { + Console.WriteLine("Caught exception in a native handler: " + e); + } + } + + private void HandleWriteFinished(GRPCOpError error, IntPtr batchContextPtr) { + try { + + TaskCompletionSource<object> oldTcs = null; + lock (myLock) + { + UpdateErrorOccured(error); + oldTcs = writeTcs; + writeTcs = null; + } + + if (errorOccured) + { + // TODO: use the right type of exception... + oldTcs.SetException(new Exception("Write failed")); + } + else + { + // TODO: where does the continuation run? + oldTcs.SetResult(null); + } + + } catch(Exception e) { + Console.WriteLine("Caught exception in a native handler: " + e); + } + } + + private void HandleHalfclosed(GRPCOpError error, IntPtr batchContextPtr) { + try { + lock (myLock) + { + UpdateErrorOccured(error); + halfclosed = true; + } + + if (errorOccured) + { + halfcloseTcs.SetException(new Exception("Halfclose failed")); + + } + else + { + halfcloseTcs.SetResult(null); + } + } catch(Exception e) { + Console.WriteLine("Caught exception in a native handler: " + e); + } + } + + private void HandleReadFinished(GRPCOpError error, IntPtr batchContextPtr) { try { - var ev = new EventSafeHandleNotOwned(eventPtr); - switch (ev.GetCompletionType()) + + var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr); + var payload = ctx.GetReceivedMessage(); + + TaskCompletionSource<TRead> oldTcs = null; + IObserver<TRead> observer = null; + + Nullable<Status> status = null; + + lock (myLock) { - case GRPCCompletionType.GRPC_CLIENT_METADATA_READ: - OnClientMetadata(); - break; + oldTcs = readTcs; + readTcs = null; + if (payload == null) + { + doneWithReading = true; + } + observer = readObserver; + status = finishedStatus; + } + + // TODO: wrap deserialization... + TRead msg = payload != null ? deserializer(payload) : default(TRead); - case GRPCCompletionType.GRPC_READ: - byte[] payload = ev.GetReadData(); - OnRead(payload); - break; + oldTcs.SetResult(msg); - case GRPCCompletionType.GRPC_WRITE_ACCEPTED: - OnWriteAccepted(ev.GetWriteAccepted()); - break; + // TODO: make sure we deliver reads in the right order. - case GRPCCompletionType.GRPC_FINISH_ACCEPTED: - OnFinishAccepted(ev.GetFinishAccepted()); - break; + if (observer != null) { + if (payload != null) + { + // TODO: wrap to handle exceptions + observer.OnNext(msg); + + // start a new read + ReceiveMessageAsync(); + } + else + { + if (!server) { + if (status.HasValue) { + CompleteStreamObserver(status.Value); + } + } else { + // TODO: wrap to handle exceptions.. + observer.OnCompleted(); + } + // TODO: completeStreamObserver serverside... + } + } + } catch(Exception e) { + Console.WriteLine("Caught exception in a native handler: " + e); + } + } + + private void HandleFinished(GRPCOpError error, IntPtr batchContextPtr) { + try { + var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr); + var status = ctx.GetReceivedStatus(); + + bool wasDoneWithReading; + + lock (myLock) + { + finishedStatus = status; - case GRPCCompletionType.GRPC_FINISHED: - OnFinished(ev.GetFinished()); - break; + DisposeResourcesIfNeeded(); - default: - throw new ArgumentException("Unexpected completion type"); + wasDoneWithReading = doneWithReading; } + + if (wasDoneWithReading) { + CompleteStreamObserver(status); + } + + } catch(Exception e) { + Console.WriteLine("Caught exception in a native handler: " + e); + } + } + + private void HandleFinishedServerside(GRPCOpError error, IntPtr batchContextPtr) { + try { + var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr); + + // TODO: handle error ... + + finishedServersideTcs.SetResult(null); + + call.Dispose(); + } catch(Exception e) { Console.WriteLine("Caught exception in a native handler: " + e); } } } -} +} \ No newline at end of file diff --git a/src/csharp/GrpcCore/Internal/BatchContextSafeHandle.cs b/src/csharp/GrpcCore/Internal/BatchContextSafeHandle.cs new file mode 100644 index 0000000000..ddfd94a3b5 --- /dev/null +++ b/src/csharp/GrpcCore/Internal/BatchContextSafeHandle.cs @@ -0,0 +1,96 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Runtime.InteropServices; +using Google.GRPC.Core; + +namespace Google.GRPC.Core.Internal +{ + /// <summary> + /// Not owned version of + /// grpcsharp_batch_context + /// </summary> + internal class BatchContextSafeHandleNotOwned : SafeHandleZeroIsInvalid + { + [DllImport("grpc_csharp_ext.dll")] + static extern IntPtr grpcsharp_batch_context_recv_message_length(BatchContextSafeHandleNotOwned ctx); + + [DllImport("grpc_csharp_ext.dll")] + static extern void grpcsharp_batch_context_recv_message_to_buffer(BatchContextSafeHandleNotOwned ctx, byte[] buffer, UIntPtr bufferLen); + + [DllImport("grpc_csharp_ext.dll")] + static extern StatusCode grpcsharp_batch_context_recv_status_on_client_status(BatchContextSafeHandleNotOwned ctx); + + [DllImport("grpc_csharp_ext.dll")] + static extern IntPtr grpcsharp_batch_context_recv_status_on_client_details(BatchContextSafeHandleNotOwned ctx); // returns const char* + + [DllImport("grpc_csharp_ext.dll")] + static extern CallSafeHandle grpcsharp_batch_context_server_rpc_new_call(BatchContextSafeHandleNotOwned ctx); + + [DllImport("grpc_csharp_ext.dll")] + static extern IntPtr grpcsharp_batch_context_server_rpc_new_method(BatchContextSafeHandleNotOwned ctx); // returns const char* + + public BatchContextSafeHandleNotOwned(IntPtr handle) : base(false) + { + SetHandle(handle); + } + + public Status GetReceivedStatus() + { + // TODO: can the native method return string directly? + string details = Marshal.PtrToStringAnsi(grpcsharp_batch_context_recv_status_on_client_details(this)); + return new Status(grpcsharp_batch_context_recv_status_on_client_status(this), details); + } + + public byte[] GetReceivedMessage() + { + IntPtr len = grpcsharp_batch_context_recv_message_length(this); + if (len == new IntPtr(-1)) + { + return null; + } + byte[] data = new byte[(int) len]; + grpcsharp_batch_context_recv_message_to_buffer(this, data, new UIntPtr((ulong)data.Length)); + return data; + } + + public CallSafeHandle GetServerRpcNewCall() { + return grpcsharp_batch_context_server_rpc_new_call(this); + } + + public string GetServerRpcNewMethod() { + return Marshal.PtrToStringAnsi(grpcsharp_batch_context_server_rpc_new_method(this)); + } + } +} \ No newline at end of file diff --git a/src/csharp/GrpcCore/Internal/CallSafeHandle.cs b/src/csharp/GrpcCore/Internal/CallSafeHandle.cs index e9ccd8d5f9..55d66a62ca 100644 --- a/src/csharp/GrpcCore/Internal/CallSafeHandle.cs +++ b/src/csharp/GrpcCore/Internal/CallSafeHandle.cs @@ -2,11 +2,11 @@ // Copyright 2015, Google Inc. // All rights reserved. -// +// // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are // met: -// +// // * Redistributions of source code must retain the above copyright // notice, this list of conditions and the following disclaimer. // * Redistributions in binary form must reproduce the above @@ -16,7 +16,7 @@ // * Neither the name of Google Inc. nor the names of its // contributors may be used to endorse or promote products derived from // this software without specific prior written permission. -// +// // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR @@ -38,8 +38,8 @@ using Google.GRPC.Core; namespace Google.GRPC.Core.Internal { - // TODO: we need to make sure that the delegates are not collected before invoked. - internal delegate void EventCallbackDelegate(IntPtr eventPtr); + //TODO: rename the delegate + internal delegate void CompletionCallbackDelegate(GRPCOpError error, IntPtr batchContextPtr); /// <summary> /// grpc_call from <grpc/grpc.h> @@ -49,142 +49,108 @@ namespace Google.GRPC.Core.Internal const UInt32 GRPC_WRITE_BUFFER_HINT = 1; [DllImport("grpc_csharp_ext.dll")] - static extern CallSafeHandle grpcsharp_channel_create_call_old(ChannelSafeHandle channel, string method, string host, Timespec deadline); + static extern CallSafeHandle grpcsharp_channel_create_call(ChannelSafeHandle channel, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline); [DllImport("grpc_csharp_ext.dll")] - static extern GRPCCallError grpcsharp_call_add_metadata(CallSafeHandle call, IntPtr metadata, UInt32 flags); + static extern GRPCCallError grpcsharp_call_cancel(CallSafeHandle call); [DllImport("grpc_csharp_ext.dll")] - static extern GRPCCallError grpcsharp_call_invoke_old(CallSafeHandle call, CompletionQueueSafeHandle cq, IntPtr metadataReadTag, IntPtr finishedTag, UInt32 flags); - - [DllImport("grpc_csharp_ext.dll", EntryPoint = "grpcsharp_call_invoke_old")] - static extern GRPCCallError grpcsharp_call_invoke_old_CALLBACK(CallSafeHandle call, CompletionQueueSafeHandle cq, - [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate metadataReadCallback, - [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate finishedCallback, - UInt32 flags); + static extern GRPCCallError grpcsharp_call_cancel_with_status(CallSafeHandle call, StatusCode status, string description); [DllImport("grpc_csharp_ext.dll")] - static extern GRPCCallError grpcsharp_call_server_accept_old(CallSafeHandle call, CompletionQueueSafeHandle completionQueue, IntPtr finishedTag); - - [DllImport("grpc_csharp_ext.dll", EntryPoint = "grpcsharp_call_server_accept_old")] - static extern GRPCCallError grpcsharp_call_server_accept_old_CALLBACK(CallSafeHandle call, CompletionQueueSafeHandle completionQueue, [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate finishedCallback); + static extern GRPCCallError grpcsharp_call_start_unary(CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, + byte[] send_buffer, UIntPtr send_buffer_len); [DllImport("grpc_csharp_ext.dll")] - static extern GRPCCallError grpcsharp_call_server_end_initial_metadata_old(CallSafeHandle call, UInt32 flags); + static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); [DllImport("grpc_csharp_ext.dll")] - static extern GRPCCallError grpcsharp_call_cancel(CallSafeHandle call); + static extern GRPCCallError grpcsharp_call_start_server_streaming(CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, + byte[] send_buffer, UIntPtr send_buffer_len); [DllImport("grpc_csharp_ext.dll")] - static extern GRPCCallError grpcsharp_call_cancel_with_status(CallSafeHandle call, StatusCode status, string description); + static extern GRPCCallError grpcsharp_call_start_duplex_streaming(CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); [DllImport("grpc_csharp_ext.dll")] - static extern GRPCCallError grpcsharp_call_start_write_status_old(CallSafeHandle call, StatusCode statusCode, string statusMessage, IntPtr tag); - - [DllImport("grpc_csharp_ext.dll", EntryPoint = "grpcsharp_call_start_write_status_old")] - static extern GRPCCallError grpcsharp_call_start_write_status_old_CALLBACK(CallSafeHandle call, StatusCode statusCode, string statusMessage, [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate callback); + static extern GRPCCallError grpcsharp_call_send_message(CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, + byte[] send_buffer, UIntPtr send_buffer_len); [DllImport("grpc_csharp_ext.dll")] - static extern GRPCCallError grpcsharp_call_writes_done_old(CallSafeHandle call, IntPtr tag); - - [DllImport("grpc_csharp_ext.dll", EntryPoint = "grpcsharp_call_writes_done_old")] - static extern GRPCCallError grpcsharp_call_writes_done_old_CALLBACK(CallSafeHandle call, [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate callback); + static extern GRPCCallError grpcsharp_call_send_close_from_client(CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); [DllImport("grpc_csharp_ext.dll")] - static extern GRPCCallError grpcsharp_call_start_read_old(CallSafeHandle call, IntPtr tag); - - [DllImport("grpc_csharp_ext.dll", EntryPoint = "grpcsharp_call_start_read_old")] - static extern GRPCCallError grpcsharp_call_start_read_old_CALLBACK(CallSafeHandle call, [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate callback); + static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, StatusCode statusCode, string statusMessage); [DllImport("grpc_csharp_ext.dll")] - static extern void grpcsharp_call_start_write_from_copied_buffer(CallSafeHandle call, - byte[] buffer, UIntPtr length, - IntPtr tag, UInt32 flags); + static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); - [DllImport("grpc_csharp_ext.dll", EntryPoint = "grpcsharp_call_start_write_from_copied_buffer")] - static extern void grpcsharp_call_start_write_from_copied_buffer_CALLBACK(CallSafeHandle call, - byte[] buffer, UIntPtr length, - [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate callback, - UInt32 flags); + [DllImport("grpc_csharp_ext.dll")] + static extern GRPCCallError grpcsharp_call_start_serverside(CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); - [DllImport("grpc_csharp_ext.dll")] + [DllImport("grpc_csharp_ext.dll")] static extern void grpcsharp_call_destroy(IntPtr call); - private CallSafeHandle() - { - } - - /// <summary> - /// Creates a client call. - /// </summary> - public static CallSafeHandle Create(ChannelSafeHandle channel, string method, string host, Timespec deadline) - { - return grpcsharp_channel_create_call_old(channel, method, host, deadline); - } - - public void Invoke(CompletionQueueSafeHandle cq, IntPtr metadataReadTag, IntPtr finishedTag, bool buffered) - { - AssertCallOk(grpcsharp_call_invoke_old(this, cq, metadataReadTag, finishedTag, GetFlags(buffered))); - } - - public void Invoke(CompletionQueueSafeHandle cq, bool buffered, EventCallbackDelegate metadataReadCallback, EventCallbackDelegate finishedCallback) - { - AssertCallOk(grpcsharp_call_invoke_old_CALLBACK(this, cq, metadataReadCallback, finishedCallback, GetFlags(buffered))); - } - public void ServerAccept(CompletionQueueSafeHandle cq, IntPtr finishedTag) + private CallSafeHandle() { - AssertCallOk(grpcsharp_call_server_accept_old(this, cq, finishedTag)); } - public void ServerAccept(CompletionQueueSafeHandle cq, EventCallbackDelegate callback) + public static CallSafeHandle Create(ChannelSafeHandle channel, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline) { - AssertCallOk(grpcsharp_call_server_accept_old_CALLBACK(this, cq, callback)); + return grpcsharp_channel_create_call(channel, cq, method, host, deadline); } - public void ServerEndInitialMetadata(UInt32 flags) + public void StartUnary(byte[] payload, CompletionCallbackDelegate callback) { - AssertCallOk(grpcsharp_call_server_end_initial_metadata_old(this, flags)); + AssertCallOk(grpcsharp_call_start_unary(this, callback, payload, new UIntPtr((ulong) payload.Length))); } - public void StartWrite(byte[] payload, IntPtr tag, bool buffered) + public void StartClientStreaming(CompletionCallbackDelegate callback) { - grpcsharp_call_start_write_from_copied_buffer(this, payload, new UIntPtr((ulong) payload.Length), tag, GetFlags(buffered)); + AssertCallOk(grpcsharp_call_start_client_streaming(this, callback)); } - public void StartWrite(byte[] payload, bool buffered, EventCallbackDelegate callback) + public void StartServerStreaming(byte[] payload, CompletionCallbackDelegate callback) { - grpcsharp_call_start_write_from_copied_buffer_CALLBACK(this, payload, new UIntPtr((ulong) payload.Length), callback, GetFlags(buffered)); + AssertCallOk(grpcsharp_call_start_server_streaming(this, callback, payload, new UIntPtr((ulong) payload.Length))); } - public void StartWriteStatus(Status status, IntPtr tag) + public void StartDuplexStreaming(CompletionCallbackDelegate callback) { - AssertCallOk(grpcsharp_call_start_write_status_old(this, status.StatusCode, status.Detail, tag)); + AssertCallOk(grpcsharp_call_start_duplex_streaming(this, callback)); } - public void StartWriteStatus(Status status, EventCallbackDelegate callback) + public void StartSendMessage(byte[] payload, CompletionCallbackDelegate callback) { - AssertCallOk(grpcsharp_call_start_write_status_old_CALLBACK(this, status.StatusCode, status.Detail, callback)); + AssertCallOk(grpcsharp_call_send_message(this, callback, payload, new UIntPtr((ulong) payload.Length))); } - public void WritesDone(IntPtr tag) + public void StartSendCloseFromClient(CompletionCallbackDelegate callback) { - AssertCallOk(grpcsharp_call_writes_done_old(this, tag)); + AssertCallOk(grpcsharp_call_send_close_from_client(this, callback)); } - public void WritesDone(EventCallbackDelegate callback) + public void StartSendStatusFromServer(Status status, CompletionCallbackDelegate callback) { - AssertCallOk(grpcsharp_call_writes_done_old_CALLBACK(this, callback)); + AssertCallOk(grpcsharp_call_send_status_from_server(this, callback, status.StatusCode, status.Detail)); } - public void StartRead(IntPtr tag) + public void StartReceiveMessage(CompletionCallbackDelegate callback) { - AssertCallOk(grpcsharp_call_start_read_old(this, tag)); + AssertCallOk(grpcsharp_call_recv_message(this, callback)); } - public void StartRead(EventCallbackDelegate callback) + public void StartServerSide(CompletionCallbackDelegate callback) { - AssertCallOk(grpcsharp_call_start_read_old_CALLBACK(this, callback)); + AssertCallOk(grpcsharp_call_start_serverside(this, callback)); } public void Cancel() @@ -212,4 +178,4 @@ namespace Google.GRPC.Core.Internal return buffered ? 0 : GRPC_WRITE_BUFFER_HINT; } } -} +} \ No newline at end of file diff --git a/src/csharp/GrpcCore/Internal/StreamingInputObserver.cs b/src/csharp/GrpcCore/Internal/ClientStreamingInputObserver.cs similarity index 88% rename from src/csharp/GrpcCore/Internal/StreamingInputObserver.cs rename to src/csharp/GrpcCore/Internal/ClientStreamingInputObserver.cs index 60837de5e6..4d10a9bdf9 100644 --- a/src/csharp/GrpcCore/Internal/StreamingInputObserver.cs +++ b/src/csharp/GrpcCore/Internal/ClientStreamingInputObserver.cs @@ -2,11 +2,11 @@ // Copyright 2015, Google Inc. // All rights reserved. -// +// // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are // met: -// +// // * Redistributions of source code must retain the above copyright // notice, this list of conditions and the following disclaimer. // * Redistributions in binary form must reproduce the above @@ -16,7 +16,7 @@ // * Neither the name of Google Inc. nor the names of its // contributors may be used to endorse or promote products derived from // this software without specific prior written permission. -// +// // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR @@ -36,19 +36,20 @@ using Google.GRPC.Core.Internal; namespace Google.GRPC.Core.Internal { - internal class StreamingInputObserver<TWrite, TRead> : IObserver<TWrite> + internal class ClientStreamingInputObserver<TWrite, TRead> : IObserver<TWrite> { readonly AsyncCall<TWrite, TRead> call; - public StreamingInputObserver(AsyncCall<TWrite, TRead> call) + public ClientStreamingInputObserver(AsyncCall<TWrite, TRead> call) { this.call = call; } public void OnCompleted() { + // TODO: how bad is the Wait here? - call.WritesCompletedAsync().Wait(); + call.SendCloseFromClientAsync().Wait(); } public void OnError(Exception error) @@ -59,7 +60,7 @@ namespace Google.GRPC.Core.Internal public void OnNext(TWrite value) { // TODO: how bad is the Wait here? - call.WriteAsync(value).Wait(); + call.SendMessageAsync(value).Wait(); } } } diff --git a/src/csharp/GrpcCore/Internal/CompletionQueueSafeHandle.cs b/src/csharp/GrpcCore/Internal/CompletionQueueSafeHandle.cs index 666f220b8c..5ea436df19 100644 --- a/src/csharp/GrpcCore/Internal/CompletionQueueSafeHandle.cs +++ b/src/csharp/GrpcCore/Internal/CompletionQueueSafeHandle.cs @@ -45,12 +45,6 @@ namespace Google.GRPC.Core.Internal [DllImport("grpc_csharp_ext.dll")] static extern CompletionQueueSafeHandle grpcsharp_completion_queue_create(); - [DllImport("grpc_csharp_ext.dll")] - static extern EventSafeHandle grpcsharp_completion_queue_pluck(CompletionQueueSafeHandle cq, IntPtr tag, Timespec deadline); - - [DllImport("grpc_csharp_ext.dll")] - static extern EventSafeHandle grpcsharp_completion_queue_next(CompletionQueueSafeHandle cq, Timespec deadline); - [DllImport("grpc_csharp_ext.dll")] static extern void grpcsharp_completion_queue_shutdown(CompletionQueueSafeHandle cq); @@ -69,21 +63,11 @@ namespace Google.GRPC.Core.Internal return grpcsharp_completion_queue_create(); } - public EventSafeHandle Next(Timespec deadline) - { - return grpcsharp_completion_queue_next(this, deadline); - } - public GRPCCompletionType NextWithCallback() { return grpcsharp_completion_queue_next_with_callback(this); } - public EventSafeHandle Pluck(IntPtr tag, Timespec deadline) - { - return grpcsharp_completion_queue_pluck(this, tag, deadline); - } - public void Shutdown() { grpcsharp_completion_queue_shutdown(this); diff --git a/src/csharp/GrpcCore/Internal/Event.cs b/src/csharp/GrpcCore/Internal/Event.cs deleted file mode 100644 index 6116e0975a..0000000000 --- a/src/csharp/GrpcCore/Internal/Event.cs +++ /dev/null @@ -1,224 +0,0 @@ -#region Copyright notice and license - -// Copyright 2015, Google Inc. -// All rights reserved. -// -// Redistribution and use in source and binary forms, with or without -// modification, are permitted provided that the following conditions are -// met: -// -// * Redistributions of source code must retain the above copyright -// notice, this list of conditions and the following disclaimer. -// * Redistributions in binary form must reproduce the above -// copyright notice, this list of conditions and the following disclaimer -// in the documentation and/or other materials provided with the -// distribution. -// * Neither the name of Google Inc. nor the names of its -// contributors may be used to endorse or promote products derived from -// this software without specific prior written permission. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -#endregion - -using System; -using System.Runtime.InteropServices; -using Google.GRPC.Core; - -namespace Google.GRPC.Core.Internal -{ - /// <summary> - /// grpc_event from grpc/grpc.h - /// </summary> - internal class EventSafeHandle : SafeHandleZeroIsInvalid - { - [DllImport("grpc_csharp_ext.dll")] - static extern void grpcsharp_event_finish(IntPtr ev); - - [DllImport("grpc_csharp_ext.dll")] - static extern GRPCCompletionType grpcsharp_event_type(EventSafeHandle ev); - - [DllImport("grpc_csharp_ext.dll")] - static extern CallSafeHandle grpcsharp_event_call(EventSafeHandle ev); - - [DllImport("grpc_csharp_ext.dll")] - static extern GRPCOpError grpcsharp_event_write_accepted(EventSafeHandle ev); - - [DllImport("grpc_csharp_ext.dll")] - static extern GRPCOpError grpcsharp_event_finish_accepted(EventSafeHandle ev); - - [DllImport("grpc_csharp_ext.dll")] - static extern StatusCode grpcsharp_event_finished_status(EventSafeHandle ev); - - [DllImport("grpc_csharp_ext.dll")] - static extern IntPtr grpcsharp_event_finished_details(EventSafeHandle ev); // returns const char* - - [DllImport("grpc_csharp_ext.dll")] - static extern IntPtr grpcsharp_event_read_length(EventSafeHandle ev); - - [DllImport("grpc_csharp_ext.dll")] - static extern void grpcsharp_event_read_copy_to_buffer(EventSafeHandle ev, byte[] buffer, UIntPtr bufferLen); - - [DllImport("grpc_csharp_ext.dll")] - static extern IntPtr grpcsharp_event_server_rpc_new_method(EventSafeHandle ev); // returns const char* - - public GRPCCompletionType GetCompletionType() - { - return grpcsharp_event_type(this); - } - - public GRPCOpError GetWriteAccepted() - { - return grpcsharp_event_write_accepted(this); - } - - public GRPCOpError GetFinishAccepted() - { - return grpcsharp_event_finish_accepted(this); - } - - public Status GetFinished() - { - // TODO: can the native method return string directly? - string details = Marshal.PtrToStringAnsi(grpcsharp_event_finished_details(this)); - return new Status(grpcsharp_event_finished_status(this), details); - } - - public byte[] GetReadData() - { - IntPtr len = grpcsharp_event_read_length(this); - if (len == new IntPtr(-1)) - { - return null; - } - byte[] data = new byte[(int) len]; - grpcsharp_event_read_copy_to_buffer(this, data, new UIntPtr((ulong)data.Length)); - return data; - } - - public CallSafeHandle GetCall() { - return grpcsharp_event_call(this); - } - - public string GetServerRpcNewMethod() { - // TODO: can the native method return string directly? - return Marshal.PtrToStringAnsi(grpcsharp_event_server_rpc_new_method(this)); - } - - //TODO: client_metadata_read event type - - protected override bool ReleaseHandle() - { - grpcsharp_event_finish(handle); - return true; - } - } - - // TODO: this is basically c&p of EventSafeHandle. Unify! - /// <summary> - /// Not owned version of - /// grpc_event from grpc/grpc.h - /// </summary> - internal class EventSafeHandleNotOwned : SafeHandleZeroIsInvalid - { - [DllImport("grpc_csharp_ext.dll")] - static extern void grpcsharp_event_finish(IntPtr ev); - - [DllImport("grpc_csharp_ext.dll")] - static extern GRPCCompletionType grpcsharp_event_type(EventSafeHandleNotOwned ev); - - [DllImport("grpc_csharp_ext.dll")] - static extern CallSafeHandle grpcsharp_event_call(EventSafeHandleNotOwned ev); - - [DllImport("grpc_csharp_ext.dll")] - static extern GRPCOpError grpcsharp_event_write_accepted(EventSafeHandleNotOwned ev); - - [DllImport("grpc_csharp_ext.dll")] - static extern GRPCOpError grpcsharp_event_finish_accepted(EventSafeHandleNotOwned ev); - - [DllImport("grpc_csharp_ext.dll")] - static extern StatusCode grpcsharp_event_finished_status(EventSafeHandleNotOwned ev); - - [DllImport("grpc_csharp_ext.dll")] - static extern IntPtr grpcsharp_event_finished_details(EventSafeHandleNotOwned ev); // returns const char* - - [DllImport("grpc_csharp_ext.dll")] - static extern IntPtr grpcsharp_event_read_length(EventSafeHandleNotOwned ev); - - [DllImport("grpc_csharp_ext.dll")] - static extern void grpcsharp_event_read_copy_to_buffer(EventSafeHandleNotOwned ev, byte[] buffer, UIntPtr bufferLen); - - [DllImport("grpc_csharp_ext.dll")] - static extern IntPtr grpcsharp_event_server_rpc_new_method(EventSafeHandleNotOwned ev); // returns const char* - - public EventSafeHandleNotOwned() : base(false) - { - } - - public EventSafeHandleNotOwned(IntPtr handle) : base(false) - { - SetHandle(handle); - } - - public GRPCCompletionType GetCompletionType() - { - return grpcsharp_event_type(this); - } - - public GRPCOpError GetWriteAccepted() - { - return grpcsharp_event_write_accepted(this); - } - - public GRPCOpError GetFinishAccepted() - { - return grpcsharp_event_finish_accepted(this); - } - - public Status GetFinished() - { - // TODO: can the native method return string directly? - string details = Marshal.PtrToStringAnsi(grpcsharp_event_finished_details(this)); - return new Status(grpcsharp_event_finished_status(this), details); - } - - public byte[] GetReadData() - { - IntPtr len = grpcsharp_event_read_length(this); - if (len == new IntPtr(-1)) - { - return null; - } - byte[] data = new byte[(int) len]; - grpcsharp_event_read_copy_to_buffer(this, data, new UIntPtr((ulong)data.Length)); - return data; - } - - public CallSafeHandle GetCall() { - return grpcsharp_event_call(this); - } - - public string GetServerRpcNewMethod() { - // TODO: can the native method return string directly? - return Marshal.PtrToStringAnsi(grpcsharp_event_server_rpc_new_method(this)); - } - - //TODO: client_metadata_read event type - - protected override bool ReleaseHandle() - { - grpcsharp_event_finish(handle); - return true; - } - } -} diff --git a/src/csharp/GrpcCore/Internal/GrpcThreadPool.cs b/src/csharp/GrpcCore/Internal/GrpcThreadPool.cs index f8154fa250..634a0b2d72 100644 --- a/src/csharp/GrpcCore/Internal/GrpcThreadPool.cs +++ b/src/csharp/GrpcCore/Internal/GrpcThreadPool.cs @@ -48,7 +48,6 @@ namespace Google.GRPC.Core.Internal readonly object myLock = new object(); readonly List<Thread> threads = new List<Thread>(); readonly int poolSize; - readonly Action<EventSafeHandle> eventHandler; CompletionQueueSafeHandle cq; @@ -56,11 +55,6 @@ namespace Google.GRPC.Core.Internal this.poolSize = poolSize; } - internal GrpcThreadPool(int poolSize, Action<EventSafeHandle> eventHandler) { - this.poolSize = poolSize; - this.eventHandler = eventHandler; - } - public void Start() { lock (myLock) @@ -104,34 +98,19 @@ namespace Google.GRPC.Core.Internal } } - private Thread CreateAndStartThread(int i) { - Action body; - if (eventHandler != null) - { - body = ThreadBodyWithHandler; - } - else - { - body = ThreadBodyNoHandler; - } - var thread = new Thread(new ThreadStart(body)); + private Thread CreateAndStartThread(int i) + { + var thread = new Thread(new ThreadStart(RunHandlerLoop)); thread.IsBackground = false; thread.Start(); - if (eventHandler != null) - { - thread.Name = "grpc_server_newrpc " + i; - } - else - { - thread.Name = "grpc " + i; - } + thread.Name = "grpc " + i; return thread; } /// <summary> /// Body of the polling thread. /// </summary> - private void ThreadBodyNoHandler() + private void RunHandlerLoop() { GRPCCompletionType completionType; do @@ -140,22 +119,6 @@ namespace Google.GRPC.Core.Internal } while(completionType != GRPCCompletionType.GRPC_QUEUE_SHUTDOWN); Console.WriteLine("Completion queue has shutdown successfully, thread " + Thread.CurrentThread.Name + " exiting."); } - - /// <summary> - /// Body of the polling thread. - /// </summary> - private void ThreadBodyWithHandler() - { - GRPCCompletionType completionType; - do - { - using (EventSafeHandle ev = cq.Next(Timespec.InfFuture)) { - completionType = ev.GetCompletionType(); - eventHandler(ev); - } - } while(completionType != GRPCCompletionType.GRPC_QUEUE_SHUTDOWN); - Console.WriteLine("Completion queue has shutdown successfully, thread " + Thread.CurrentThread.Name + " exiting."); - } } } diff --git a/src/csharp/GrpcCore/Internal/SafeHandleZeroIsInvalid.cs b/src/csharp/GrpcCore/Internal/SafeHandleZeroIsInvalid.cs index 74a8ef7b6e..59f08d4ca8 100644 --- a/src/csharp/GrpcCore/Internal/SafeHandleZeroIsInvalid.cs +++ b/src/csharp/GrpcCore/Internal/SafeHandleZeroIsInvalid.cs @@ -56,6 +56,12 @@ namespace Google.GRPC.Core.Internal return handle == IntPtr.Zero; } } + + protected override bool ReleaseHandle() + { + // handle is not owned. + return true; + } } } diff --git a/src/csharp/GrpcCore/Internal/ServerSafeHandle.cs b/src/csharp/GrpcCore/Internal/ServerSafeHandle.cs index c91de97ce3..c096602800 100644 --- a/src/csharp/GrpcCore/Internal/ServerSafeHandle.cs +++ b/src/csharp/GrpcCore/Internal/ServerSafeHandle.cs @@ -38,13 +38,16 @@ using System.Collections.Concurrent; namespace Google.GRPC.Core.Internal { + // TODO: we need to make sure that the delegates are not collected before invoked. + internal delegate void ServerShutdownCallbackDelegate(IntPtr eventPtr); + /// <summary> /// grpc_server from grpc/grpc.h /// </summary> internal sealed class ServerSafeHandle : SafeHandleZeroIsInvalid { - [DllImport("grpc_csharp_ext.dll", EntryPoint = "grpcsharp_server_request_call_old")] - static extern GRPCCallError grpcsharp_server_request_call_old_CALLBACK(ServerSafeHandle server, [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate callback); + [DllImport("grpc_csharp_ext.dll")] + static extern GRPCCallError grpcsharp_server_request_call(ServerSafeHandle server, CompletionQueueSafeHandle cq, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); [DllImport("grpc_csharp_ext.dll")] static extern ServerSafeHandle grpcsharp_server_create(CompletionQueueSafeHandle cq, IntPtr args); @@ -63,8 +66,9 @@ namespace Google.GRPC.Core.Internal [DllImport("grpc_csharp_ext.dll")] static extern void grpcsharp_server_shutdown(ServerSafeHandle server); + // TODO: get rid of the old callback style [DllImport("grpc_csharp_ext.dll", EntryPoint = "grpcsharp_server_shutdown_and_notify")] - static extern void grpcsharp_server_shutdown_and_notify_CALLBACK(ServerSafeHandle server, [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate callback); + static extern void grpcsharp_server_shutdown_and_notify_CALLBACK(ServerSafeHandle server, [MarshalAs(UnmanagedType.FunctionPtr)] ServerShutdownCallbackDelegate callback); [DllImport("grpc_csharp_ext.dll")] static extern void grpcsharp_server_destroy(IntPtr server); @@ -95,14 +99,14 @@ namespace Google.GRPC.Core.Internal grpcsharp_server_shutdown(this); } - public void ShutdownAndNotify(EventCallbackDelegate callback) + public void ShutdownAndNotify(ServerShutdownCallbackDelegate callback) { grpcsharp_server_shutdown_and_notify_CALLBACK(this, callback); } - public GRPCCallError RequestCall(EventCallbackDelegate callback) + public GRPCCallError RequestCall(CompletionQueueSafeHandle cq, CompletionCallbackDelegate callback) { - return grpcsharp_server_request_call_old_CALLBACK(this, callback); + return grpcsharp_server_request_call(this, cq, callback); } protected override bool ReleaseHandle() diff --git a/src/csharp/GrpcCore/Internal/ServerWritingObserver.cs b/src/csharp/GrpcCore/Internal/ServerStreamingOutputObserver.cs similarity index 87% rename from src/csharp/GrpcCore/Internal/ServerWritingObserver.cs rename to src/csharp/GrpcCore/Internal/ServerStreamingOutputObserver.cs index 1d29864b9f..e9cb65cb3b 100644 --- a/src/csharp/GrpcCore/Internal/ServerWritingObserver.cs +++ b/src/csharp/GrpcCore/Internal/ServerStreamingOutputObserver.cs @@ -40,11 +40,11 @@ namespace Google.GRPC.Core.Internal /// Observer that writes all arriving messages to a call abstraction (in blocking fashion) /// and then halfcloses the call. Used for server-side call handling. /// </summary> - internal class ServerWritingObserver<TWrite, TRead> : IObserver<TWrite> + internal class ServerStreamingOutputObserver<TWrite, TRead> : IObserver<TWrite> { readonly AsyncCall<TWrite, TRead> call; - public ServerWritingObserver(AsyncCall<TWrite, TRead> call) + public ServerStreamingOutputObserver(AsyncCall<TWrite, TRead> call) { this.call = call; } @@ -52,19 +52,19 @@ namespace Google.GRPC.Core.Internal public void OnCompleted() { // TODO: how bad is the Wait here? - call.WriteStatusAsync(new Status(StatusCode.GRPC_STATUS_OK, "")).Wait(); + call.SendStatusFromServerAsync(new Status(StatusCode.GRPC_STATUS_OK, "")).Wait(); } public void OnError(Exception error) { - // TODO: handle this... + // TODO: implement this... throw new InvalidOperationException("This should never be called."); } public void OnNext(TWrite value) { // TODO: how bad is the Wait here? - call.WriteAsync(value).Wait(); + call.SendMessageAsync(value).Wait(); } } } diff --git a/src/csharp/GrpcCore/Server.cs b/src/csharp/GrpcCore/Server.cs index 0882a61299..91842d8182 100644 --- a/src/csharp/GrpcCore/Server.cs +++ b/src/csharp/GrpcCore/Server.cs @@ -49,8 +49,8 @@ namespace Google.GRPC.Core { // TODO: make sure the delegate doesn't get garbage collected while // native callbacks are in the completion queue. - readonly EventCallbackDelegate newRpcHandler; - readonly EventCallbackDelegate serverShutdownHandler; + readonly ServerShutdownCallbackDelegate serverShutdownHandler; + readonly CompletionCallbackDelegate newServerRpcHandler; readonly BlockingCollection<NewRpcInfo> newRpcQueue = new BlockingCollection<NewRpcInfo>(); readonly ServerSafeHandle handle; @@ -61,9 +61,8 @@ namespace Google.GRPC.Core public Server() { - // TODO: what is the tag for server shutdown? this.handle = ServerSafeHandle.NewServer(GetCompletionQueue(), IntPtr.Zero); - this.newRpcHandler = HandleNewRpc; + this.newServerRpcHandler = HandleNewServerRpc; this.serverShutdownHandler = HandleServerShutdown; } @@ -99,7 +98,7 @@ namespace Google.GRPC.Core { var rpcInfo = newRpcQueue.Take(); - Console.WriteLine("Server received RPC " + rpcInfo.Method); + //Console.WriteLine("Server received RPC " + rpcInfo.Method); IServerCallHandler callHandler; if (!callHandlers.TryGetValue(rpcInfo.Method, out callHandler)) @@ -138,23 +137,25 @@ namespace Google.GRPC.Core private void AllowOneRpc() { - AssertCallOk(handle.RequestCall(newRpcHandler)); + AssertCallOk(handle.RequestCall(GetCompletionQueue(), newServerRpcHandler)); } - private void HandleNewRpc(IntPtr eventPtr) - { - try - { - var ev = new EventSafeHandleNotOwned(eventPtr); - var rpcInfo = new NewRpcInfo(ev.GetCall(), ev.GetServerRpcNewMethod()); + private void HandleNewServerRpc(GRPCOpError error, IntPtr batchContextPtr) { + try { + var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr); + + if (error != GRPCOpError.GRPC_OP_OK) { + // TODO: handle error + } + + var rpcInfo = new NewRpcInfo(ctx.GetServerRpcNewCall(), ctx.GetServerRpcNewMethod()); // after server shutdown, the callback returns with null call if (!rpcInfo.Call.IsInvalid) { newRpcQueue.Add(rpcInfo); } - } - catch (Exception e) - { + + } catch(Exception e) { Console.WriteLine("Caught exception in a native handler: " + e); } } diff --git a/src/csharp/GrpcCore/ServerCallHandler.cs b/src/csharp/GrpcCore/ServerCallHandler.cs index bcce4a091f..3bc3b15396 100644 --- a/src/csharp/GrpcCore/ServerCallHandler.cs +++ b/src/csharp/GrpcCore/ServerCallHandler.cs @@ -59,15 +59,16 @@ namespace Google.GRPC.Core method.RequestMarshaller.Deserializer); asyncCall.InitializeServer(call); - asyncCall.Accept(cq); + + var finishedTask = asyncCall.StartServerSide(); - var request = asyncCall.ReadAsync().Result; + var request = asyncCall.ReceiveMessageAsync().Result; - var responseObserver = new ServerWritingObserver<TResponse, TRequest>(asyncCall); + var responseObserver = new ServerStreamingOutputObserver<TResponse, TRequest>(asyncCall); handler(request, responseObserver); - asyncCall.Halfclosed.Wait(); - asyncCall.Finished.Wait(); + finishedTask.Wait(); + } } @@ -89,16 +90,16 @@ namespace Google.GRPC.Core method.RequestMarshaller.Deserializer); asyncCall.InitializeServer(call); - asyncCall.Accept(cq); - var responseObserver = new ServerWritingObserver<TResponse, TRequest>(asyncCall); + var finishedTask = asyncCall.StartServerSide(); + + var responseObserver = new ServerStreamingOutputObserver<TResponse, TRequest>(asyncCall); var requestObserver = handler(responseObserver); // feed the requests asyncCall.StartReadingToStream(requestObserver); - asyncCall.Halfclosed.Wait(); - asyncCall.Finished.Wait(); + finishedTask.Wait(); } } @@ -110,11 +111,14 @@ namespace Google.GRPC.Core AsyncCall<byte[], byte[]> asyncCall = new AsyncCall<byte[], byte[]>( (payload) => payload, (payload) => payload); + asyncCall.InitializeServer(call); - asyncCall.Accept(cq); - asyncCall.WriteStatusAsync(new Status(StatusCode.GRPC_STATUS_UNIMPLEMENTED, "No such method.")).Wait(); - asyncCall.Finished.Wait(); + var finishedTask = asyncCall.StartServerSide(); + + asyncCall.SendStatusFromServerAsync(new Status(StatusCode.GRPC_STATUS_UNIMPLEMENTED, "No such method.")).Wait(); + + finishedTask.Wait(); } } } diff --git a/src/csharp/GrpcCoreTests/ClientServerTest.cs b/src/csharp/GrpcCoreTests/ClientServerTest.cs index 4401156520..dd3fc7038e 100644 --- a/src/csharp/GrpcCoreTests/ClientServerTest.cs +++ b/src/csharp/GrpcCoreTests/ClientServerTest.cs @@ -36,6 +36,7 @@ using NUnit.Framework; using Google.GRPC.Core; using Google.GRPC.Core.Internal; using System.Threading; +using System.Diagnostics; using System.Threading.Tasks; using Google.GRPC.Core.Utils; @@ -52,7 +53,7 @@ namespace Google.GRPC.Core.Tests Marshallers.StringMarshaller); [Test] - public void EmptyCall() + public void UnaryCall() { GrpcEnvironment.Initialize(); @@ -69,6 +70,7 @@ namespace Google.GRPC.Core.Tests var call = new Call<string, string>(unaryEchoStringMethod, channel); Assert.AreEqual("ABC", Calls.BlockingUnaryCall(call, "ABC", default(CancellationToken))); + Assert.AreEqual("abcdef", Calls.BlockingUnaryCall(call, "abcdef", default(CancellationToken))); } @@ -77,11 +79,72 @@ namespace Google.GRPC.Core.Tests GrpcEnvironment.Shutdown(); } + [Test] + public void UnaryCallPerformance() + { + GrpcEnvironment.Initialize(); + + Server server = new Server(); + server.AddServiceDefinition( + ServerServiceDefinition.CreateBuilder("someService") + .AddMethod(unaryEchoStringMethod, HandleUnaryEchoString).Build()); + + int port = server.AddPort(host + ":0"); + server.Start(); + + using (Channel channel = new Channel(host + ":" + port)) + { + var call = new Call<string, string>(unaryEchoStringMethod, channel); + + var stopwatch = new Stopwatch(); + stopwatch.Start(); + for (int i = 0; i < 1000; i++) + { + Calls.BlockingUnaryCall(call, "ABC", default(CancellationToken)); + } + stopwatch.Stop(); + Console.WriteLine("Elapsed time: " + stopwatch.ElapsedMilliseconds + "ms"); + } + + server.ShutdownAsync().Wait(); + + GrpcEnvironment.Shutdown(); + } + + + [Test] + public void UnknownMethodHandler() + { + GrpcEnvironment.Initialize(); + + Server server = new Server(); + server.AddServiceDefinition( + ServerServiceDefinition.CreateBuilder("someService").Build()); + + int port = server.AddPort(host + ":0"); + server.Start(); + + using (Channel channel = new Channel(host + ":" + port)) + { + var call = new Call<string, string>(unaryEchoStringMethod, channel); + + try { + Calls.BlockingUnaryCall(call, "ABC", default(CancellationToken)); + Assert.Fail(); + } catch(RpcException e) { + Assert.AreEqual(StatusCode.GRPC_STATUS_UNIMPLEMENTED, e.Status.StatusCode); + } + } + + server.ShutdownAsync().Wait(); + + GrpcEnvironment.Shutdown(); + } + private void HandleUnaryEchoString(string request, IObserver<string> responseObserver) { responseObserver.OnNext(request); responseObserver.OnCompleted(); } - } } diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c index c7949af44e..eff862537b 100644 --- a/src/csharp/ext/grpc_csharp_ext.c +++ b/src/csharp/ext/grpc_csharp_ext.c @@ -32,9 +32,11 @@ */ #include <grpc/support/port_platform.h> +#include <grpc/support/alloc.h> #include <grpc/grpc.h> #include <grpc/support/log.h> #include <grpc/support/slice.h> +#include <grpc/support/string.h> #include <string.h> @@ -58,6 +60,134 @@ grpc_byte_buffer *string_to_byte_buffer(const char *buffer, size_t len) { return bb; } +typedef void(GPR_CALLTYPE * callback_funcptr)(grpc_op_error op_error, void *batch_context); + +/* + * Helper to maintain lifetime of batch op inputs and store batch op outputs. + */ +typedef struct gprcsharp_batch_context { + grpc_metadata_array send_initial_metadata; + grpc_byte_buffer *send_message; + struct { + grpc_metadata_array trailing_metadata; + char *status_details; + } send_status_from_server; + grpc_metadata_array recv_initial_metadata; + grpc_byte_buffer *recv_message; + struct { + grpc_metadata_array trailing_metadata; + grpc_status_code status; + char *status_details; + size_t status_details_capacity; + } recv_status_on_client; + int recv_close_on_server_cancelled; + struct { + grpc_call *call; + grpc_call_details call_details; + grpc_metadata_array request_metadata; + } server_rpc_new; + + /* callback will be called upon completion */ + callback_funcptr callback; + +} grpcsharp_batch_context; + +grpcsharp_batch_context *grpcsharp_batch_context_create() { + grpcsharp_batch_context *ctx = gpr_malloc(sizeof(grpcsharp_batch_context)); + memset(ctx, 0, sizeof(grpcsharp_batch_context)); + return ctx; +} + +/** + * Destroys metadata array including keys and values. + */ +void grpcsharp_metadata_array_destroy_recursive(grpc_metadata_array *array) { + if (!array->metadata) { + return; + } + /* TODO: destroy also keys and values */ + grpc_metadata_array_destroy(array); +} + +void grpcsharp_batch_context_destroy(grpcsharp_batch_context *ctx) { + if (!ctx) { + return; + } + grpcsharp_metadata_array_destroy_recursive(&(ctx->send_initial_metadata)); + + grpc_byte_buffer_destroy(ctx->send_message); + + grpcsharp_metadata_array_destroy_recursive(&(ctx->send_status_from_server.trailing_metadata)); + gpr_free(ctx->send_status_from_server.status_details); + + grpc_metadata_array_destroy(&(ctx->recv_initial_metadata)); + + grpc_byte_buffer_destroy(ctx->recv_message); + + grpc_metadata_array_destroy(&(ctx->recv_status_on_client.trailing_metadata)); + gpr_free((void*) ctx->recv_status_on_client.status_details); + + /* NOTE: ctx->server_rpc_new.call is not destroyed because callback handler is supposed + to take its ownership. */ + + grpc_call_details_destroy(&(ctx->server_rpc_new.call_details)); + grpc_metadata_array_destroy(&(ctx->server_rpc_new.request_metadata)); + + gpr_free(ctx); +} + +GPR_EXPORT gpr_intptr GPR_CALLTYPE grpcsharp_batch_context_recv_message_length(const grpcsharp_batch_context *ctx) { + if (!ctx->recv_message) { + return -1; + } + return grpc_byte_buffer_length(ctx->recv_message); +} + +/* + * Copies data from recv_message to a buffer. Fatal error occurs if + * buffer is too small. + */ +GPR_EXPORT void GPR_CALLTYPE +grpcsharp_batch_context_recv_message_to_buffer(const grpcsharp_batch_context *ctx, char *buffer, + size_t buffer_len) { + grpc_byte_buffer_reader *reader; + gpr_slice slice; + size_t offset = 0; + + reader = grpc_byte_buffer_reader_create(ctx->recv_message); + + while (grpc_byte_buffer_reader_next(reader, &slice)) { + size_t len = GPR_SLICE_LENGTH(slice); + GPR_ASSERT(offset + len <= buffer_len); + memcpy(buffer + offset, GPR_SLICE_START_PTR(slice), + GPR_SLICE_LENGTH(slice)); + offset += len; + gpr_slice_unref(slice); + } + grpc_byte_buffer_reader_destroy(reader); +} + +GPR_EXPORT grpc_status_code GPR_CALLTYPE +grpcsharp_batch_context_recv_status_on_client_status(const grpcsharp_batch_context *ctx) { + return ctx->recv_status_on_client.status; +} + +GPR_EXPORT const char *GPR_CALLTYPE +grpcsharp_batch_context_recv_status_on_client_details(const grpcsharp_batch_context *ctx) { + return ctx->recv_status_on_client.status_details; +} + +GPR_EXPORT grpc_call* GPR_CALLTYPE +grpcsharp_batch_context_server_rpc_new_call(const grpcsharp_batch_context *ctx) { + return ctx->server_rpc_new.call; +} + +GPR_EXPORT const char *GPR_CALLTYPE +grpcsharp_batch_context_server_rpc_new_method(const grpcsharp_batch_context *ctx) { + return ctx->server_rpc_new.call_details.method; +} + + /* Init & shutdown */ GPR_EXPORT void GPR_CALLTYPE grpcsharp_init(void) { grpc_init(); } @@ -96,11 +226,18 @@ grpcsharp_completion_queue_destroy(grpc_completion_queue *cq) { GPR_EXPORT grpc_completion_type GPR_CALLTYPE grpcsharp_completion_queue_next_with_callback(grpc_completion_queue *cq) { grpc_event *ev; + grpcsharp_batch_context *batch_context; grpc_completion_type t; void(GPR_CALLTYPE * callback)(grpc_event *); ev = grpc_completion_queue_next(cq, gpr_inf_future); t = ev->type; + if (t == GRPC_OP_COMPLETE && ev->tag) { + /* NEW API handler */ + batch_context = (grpcsharp_batch_context *) ev->tag; + batch_context->callback(ev->data.op_complete, batch_context); + grpcsharp_batch_context_destroy(batch_context); + } else if (ev->tag) { /* call the callback in ev->tag */ /* C forbids to cast object pointers to function pointers, so @@ -128,6 +265,12 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_channel_destroy(grpc_channel *channel) { grpc_channel_destroy(channel); } +GPR_EXPORT grpc_call *GPR_CALLTYPE grpcsharp_channel_create_call(grpc_channel *channel, grpc_completion_queue *cq, + const char *method, + const char *host, gpr_timespec deadline) { + return grpc_channel_create_call(channel, cq, method, host, deadline); +} + GPR_EXPORT grpc_call *GPR_CALLTYPE grpcsharp_channel_create_call_old(grpc_channel *channel, const char *method, const char *host, gpr_timespec deadline) { @@ -145,6 +288,12 @@ grpcsharp_event_type(const grpc_event *event) { return event->type; } +GPR_EXPORT grpc_op_error GPR_CALLTYPE +grpcsharp_event_op_complete(const grpc_event *event) { + GPR_ASSERT(event->type == GRPC_OP_COMPLETE); + return event->data.op_complete; +} + GPR_EXPORT grpc_op_error GPR_CALLTYPE grpcsharp_event_write_accepted(const grpc_event *event) { GPR_ASSERT(event->type == GRPC_WRITE_ACCEPTED); @@ -343,3 +492,219 @@ grpcsharp_server_shutdown_and_notify(grpc_server *server, void *tag) { GPR_EXPORT void GPR_CALLTYPE grpcsharp_server_destroy(grpc_server *server) { grpc_server_destroy(server); } + +/* New API Experiments */ + +GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_unary(grpc_call *call, + callback_funcptr callback, + const char *send_buffer, size_t send_buffer_len) { + /* TODO: don't use magic number */ + grpc_op ops[6]; + grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); + ctx->callback = callback; + + /* TODO: implement sending the metadata... */ + ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; + /* ctx->send_initial_metadata is already zeroed out. */ + ops[0].data.send_initial_metadata.count = 0; + ops[0].data.send_initial_metadata.metadata = NULL; + + ops[1].op = GRPC_OP_SEND_MESSAGE; + ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len); + ops[1].data.send_message = ctx->send_message; + + ops[2].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + + ops[3].op = GRPC_OP_RECV_INITIAL_METADATA; + ops[3].data.recv_initial_metadata = &(ctx->recv_initial_metadata); + + ops[4].op = GRPC_OP_RECV_MESSAGE; + ops[4].data.recv_message = &(ctx->recv_message); + + ops[5].op = GRPC_OP_RECV_STATUS_ON_CLIENT; + ops[5].data.recv_status_on_client.trailing_metadata = &(ctx->recv_status_on_client.trailing_metadata); + ops[5].data.recv_status_on_client.status = &(ctx->recv_status_on_client.status); + /* not using preallocation for status_details */ + ops[5].data.recv_status_on_client.status_details = &(ctx->recv_status_on_client.status_details); + ops[5].data.recv_status_on_client.status_details_capacity = &(ctx->recv_status_on_client.status_details_capacity); + + return grpc_call_start_batch(call, ops, sizeof(ops)/sizeof(ops[0]), ctx); +} + +GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_client_streaming(grpc_call *call, + callback_funcptr callback) { + /* TODO: don't use magic number */ + grpc_op ops[4]; + grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); + ctx->callback = callback; + + /* TODO: implement sending the metadata... */ + ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; + /* ctx->send_initial_metadata is already zeroed out. */ + ops[0].data.send_initial_metadata.count = 0; + ops[0].data.send_initial_metadata.metadata = NULL; + + ops[1].op = GRPC_OP_RECV_INITIAL_METADATA; + ops[1].data.recv_initial_metadata = &(ctx->recv_initial_metadata); + + ops[2].op = GRPC_OP_RECV_MESSAGE; + ops[2].data.recv_message = &(ctx->recv_message); + + ops[3].op = GRPC_OP_RECV_STATUS_ON_CLIENT; + ops[3].data.recv_status_on_client.trailing_metadata = &(ctx->recv_status_on_client.trailing_metadata); + ops[3].data.recv_status_on_client.status = &(ctx->recv_status_on_client.status); + /* not using preallocation for status_details */ + ops[3].data.recv_status_on_client.status_details = &(ctx->recv_status_on_client.status_details); + ops[3].data.recv_status_on_client.status_details_capacity = &(ctx->recv_status_on_client.status_details_capacity); + + return grpc_call_start_batch(call, ops, sizeof(ops)/sizeof(ops[0]), ctx); +} + +GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming(grpc_call *call, + callback_funcptr callback, + const char *send_buffer, size_t send_buffer_len) { + /* TODO: don't use magic number */ + grpc_op ops[5]; + grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); + ctx->callback = callback; + + /* TODO: implement sending the metadata... */ + ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; + /* ctx->send_initial_metadata is already zeroed out. */ + ops[0].data.send_initial_metadata.count = 0; + ops[0].data.send_initial_metadata.metadata = NULL; + + ops[1].op = GRPC_OP_SEND_MESSAGE; + ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len); + ops[1].data.send_message = ctx->send_message; + + ops[2].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + + ops[3].op = GRPC_OP_RECV_INITIAL_METADATA; + ops[3].data.recv_initial_metadata = &(ctx->recv_initial_metadata); + + ops[4].op = GRPC_OP_RECV_STATUS_ON_CLIENT; + ops[4].data.recv_status_on_client.trailing_metadata = &(ctx->recv_status_on_client.trailing_metadata); + ops[4].data.recv_status_on_client.status = &(ctx->recv_status_on_client.status); + /* not using preallocation for status_details */ + ops[4].data.recv_status_on_client.status_details = &(ctx->recv_status_on_client.status_details); + ops[4].data.recv_status_on_client.status_details_capacity = &(ctx->recv_status_on_client.status_details_capacity); + + return grpc_call_start_batch(call, ops, sizeof(ops)/sizeof(ops[0]), ctx); +} + +GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_duplex_streaming(grpc_call *call, + callback_funcptr callback) { + /* TODO: don't use magic number */ + grpc_op ops[3]; + grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); + ctx->callback = callback; + + /* TODO: implement sending the metadata... */ + ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; + /* ctx->send_initial_metadata is already zeroed out. */ + ops[0].data.send_initial_metadata.count = 0; + ops[0].data.send_initial_metadata.metadata = NULL; + + ops[1].op = GRPC_OP_RECV_INITIAL_METADATA; + ops[1].data.recv_initial_metadata = &(ctx->recv_initial_metadata); + + ops[2].op = GRPC_OP_RECV_STATUS_ON_CLIENT; + ops[2].data.recv_status_on_client.trailing_metadata = &(ctx->recv_status_on_client.trailing_metadata); + ops[2].data.recv_status_on_client.status = &(ctx->recv_status_on_client.status); + /* not using preallocation for status_details */ + ops[2].data.recv_status_on_client.status_details = &(ctx->recv_status_on_client.status_details); + ops[2].data.recv_status_on_client.status_details_capacity = &(ctx->recv_status_on_client.status_details_capacity); + + return grpc_call_start_batch(call, ops, sizeof(ops)/sizeof(ops[0]), ctx); +} + +GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_message(grpc_call *call, + callback_funcptr callback, + const char *send_buffer, size_t send_buffer_len) { + /* TODO: don't use magic number */ + grpc_op ops[1]; + grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); + ctx->callback = callback; + + ops[0].op = GRPC_OP_SEND_MESSAGE; + ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len); + ops[0].data.send_message = ctx->send_message; + + return grpc_call_start_batch(call, ops, sizeof(ops)/sizeof(ops[0]), ctx); +} + +GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_close_from_client(grpc_call *call, + callback_funcptr callback) { + /* TODO: don't use magic number */ + grpc_op ops[1]; + grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); + ctx->callback = callback; + + ops[0].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + + return grpc_call_start_batch(call, ops, sizeof(ops)/sizeof(ops[0]), ctx); +} + +GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server(grpc_call *call, + callback_funcptr callback, grpc_status_code status_code, const char* status_details) { + /* TODO: don't use magic number */ + grpc_op ops[1]; + grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); + ctx->callback = callback; + + ops[0].op = GRPC_OP_SEND_STATUS_FROM_SERVER; + ops[0].data.send_status_from_server.status = status_code; + ops[0].data.send_status_from_server.status_details = gpr_strdup(status_details); + ops[0].data.send_status_from_server.trailing_metadata = NULL; + ops[0].data.send_status_from_server.trailing_metadata_count = 0; + + return grpc_call_start_batch(call, ops, sizeof(ops)/sizeof(ops[0]), ctx); +} + +GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_recv_message(grpc_call *call, + callback_funcptr callback) { + /* TODO: don't use magic number */ + grpc_op ops[1]; + grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); + ctx->callback = callback; + + ops[0].op = GRPC_OP_RECV_MESSAGE; + ops[0].data.recv_message = &(ctx->recv_message); + return grpc_call_start_batch(call, ops, sizeof(ops)/sizeof(ops[0]), ctx); +} + +GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_serverside(grpc_call *call, + callback_funcptr callback) { + /* TODO: don't use magic number */ + grpc_op ops[2]; + + grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); + ctx->callback = callback; + + ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; + ops[0].data.send_initial_metadata.count = 0; + ops[0].data.send_initial_metadata.metadata = NULL; + + ops[1].op = GRPC_OP_RECV_CLOSE_ON_SERVER; + ops[1].data.recv_close_on_server.cancelled = (&ctx->recv_close_on_server_cancelled); + + return grpc_call_start_batch(call, ops, sizeof(ops)/sizeof(ops[0]), ctx); +} + + +GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_server_request_call(grpc_server *server, + grpc_completion_queue *cq, callback_funcptr callback) { + + grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); + ctx->callback = callback; + + return grpc_server_request_call(server, &(ctx->server_rpc_new.call), + &(ctx->server_rpc_new.call_details), + &(ctx->server_rpc_new.request_metadata), + cq, ctx); +} + + + + -- GitLab