diff --git a/src/csharp/.gitignore b/src/csharp/.gitignore index 4f4cd1f7d1ad47282abab67f2c431147e8dd92b4..dbaf60de0cf6dfbeb27fa4948a42a70982b10ef6 100644 --- a/src/csharp/.gitignore +++ b/src/csharp/.gitignore @@ -1,4 +1,5 @@ *.userprefs +StyleCop.Cache test-results packages Grpc.v12.suo diff --git a/src/csharp/Grpc.Core.Tests/PInvokeTest.cs b/src/csharp/Grpc.Core.Tests/PInvokeTest.cs index 282d521ba3798004837e5c593d268c1103cd39f0..9db08d2f02f7ba5042a694cdd30ef065eedf1c9a 100644 --- a/src/csharp/Grpc.Core.Tests/PInvokeTest.cs +++ b/src/csharp/Grpc.Core.Tests/PInvokeTest.cs @@ -127,8 +127,6 @@ namespace Grpc.Core.Tests [Test] public void NopPInvokeBenchmark() { - CompletionCallbackDelegate handler = Handler; - BenchmarkUtil.RunBenchmark( 1000000, 100000000, () => { diff --git a/src/csharp/Grpc.Core/ChannelArgs.cs b/src/csharp/Grpc.Core/ChannelArgs.cs index 653a5780a33340be868f96efe22b1a283c9129cf..298b6edf2066f78e031ea6ca990a10bd47961364 100644 --- a/src/csharp/Grpc.Core/ChannelArgs.cs +++ b/src/csharp/Grpc.Core/ChannelArgs.cs @@ -99,7 +99,7 @@ namespace Grpc.Core } return nativeArgs; } - catch (Exception e) + catch (Exception) { if (nativeArgs != null) { diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj index 93d5430591b5018bb7d52b1f414db3c013f7fec7..78b6cdde59bc11f7052a290d3d15c04e8141b310 100644 --- a/src/csharp/Grpc.Core/Grpc.Core.csproj +++ b/src/csharp/Grpc.Core/Grpc.Core.csproj @@ -51,7 +51,6 @@ <Compile Include="Internal\SafeHandleZeroIsInvalid.cs" /> <Compile Include="Internal\Timespec.cs" /> <Compile Include="Internal\GrpcThreadPool.cs" /> - <Compile Include="Internal\AsyncCall.cs" /> <Compile Include="Internal\ServerSafeHandle.cs" /> <Compile Include="Method.cs" /> <Compile Include="ServerCalls.cs" /> @@ -69,6 +68,12 @@ <Compile Include="Credentials.cs" /> <Compile Include="Internal\ChannelArgsSafeHandle.cs" /> <Compile Include="ChannelArgs.cs" /> + <Compile Include="Internal\AsyncCompletion.cs" /> + <Compile Include="Internal\AsyncCallBase.cs" /> + <Compile Include="Internal\AsyncCallServer.cs" /> + <Compile Include="OperationFailedException.cs" /> + <Compile Include="Internal\AsyncCall.cs" /> + <Compile Include="Utils\Preconditions.cs" /> </ItemGroup> <Choose> <!-- Under older versions of Monodevelop, Choose is not supported and is just diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index 6f37b059f75dcc22b1d85b0dbf85ef7d95d68f38..5ae036298b00c16f032dc6832f1ba7bf91b76d7f 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -43,84 +43,47 @@ using Grpc.Core.Utils; namespace Grpc.Core.Internal { /// <summary> - /// Handles native call lifecycle and provides convenience methods. + /// Handles client side native call lifecycle. /// </summary> - internal class AsyncCall<TWrite, TRead> + internal class AsyncCall<TRequest, TResponse> : AsyncCallBase<TRequest, TResponse> { - readonly Func<TWrite, byte[]> serializer; - readonly Func<byte[], TRead> deserializer; - readonly CompletionCallbackDelegate unaryResponseHandler; readonly CompletionCallbackDelegate finishedHandler; - readonly CompletionCallbackDelegate writeFinishedHandler; - readonly CompletionCallbackDelegate readFinishedHandler; - readonly CompletionCallbackDelegate halfclosedHandler; - readonly CompletionCallbackDelegate finishedServersideHandler; - - object myLock = new object(); - GCHandle gchandle; - CallSafeHandle call; - bool disposed; - - bool server; - - bool started; - bool errorOccured; - bool cancelRequested; - bool readingDone; - bool halfcloseRequested; - bool halfclosed; - bool finished; - - // Completion of a pending write if not null. - TaskCompletionSource<object> writeTcs; - - // Completion of a pending read if not null. - TaskCompletionSource<TRead> readTcs; - - // Completion of a pending halfclose if not null. - TaskCompletionSource<object> halfcloseTcs; // Completion of a pending unary response if not null. - TaskCompletionSource<TRead> unaryResponseTcs; + TaskCompletionSource<TResponse> unaryResponseTcs; - // Set after status is received on client. Only used for server streaming and duplex streaming calls. + // Set after status is received. Only used for streaming response calls. Nullable<Status> finishedStatus; - TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>(); - // For streaming, the reads will be delivered to this observer. - IObserver<TRead> readObserver; + bool readObserverCompleted; // True if readObserver has already been completed. - public AsyncCall(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer) + public AsyncCall(Func<TRequest, byte[]> serializer, Func<byte[], TResponse> deserializer) : base(serializer, deserializer) { - this.serializer = serializer; - this.deserializer = deserializer; - this.unaryResponseHandler = HandleUnaryResponse; - this.finishedHandler = HandleFinished; - this.writeFinishedHandler = HandleWriteFinished; - this.readFinishedHandler = HandleReadFinished; - this.halfclosedHandler = HandleHalfclosed; - this.finishedServersideHandler = HandleFinishedServerside; + this.unaryResponseHandler = CreateBatchCompletionCallback(HandleUnaryResponse); + this.finishedHandler = CreateBatchCompletionCallback(HandleFinished); } public void Initialize(Channel channel, CompletionQueueSafeHandle cq, String methodName) { - InitializeInternal(CallSafeHandle.Create(channel.Handle, cq, methodName, channel.Target, Timespec.InfFuture), false); + var call = CallSafeHandle.Create(channel.Handle, cq, methodName, channel.Target, Timespec.InfFuture); + InitializeInternal(call); } - public void InitializeServer(CallSafeHandle call) - { - InitializeInternal(call, true); - } - - public TRead UnaryCall(Channel channel, String methodName, TWrite msg) + // TODO: this method is not Async, so it shouldn't be in AsyncCall class, but + // it is reusing fair amount of code in this class, so we are leaving it here. + // TODO: for other calls, you need to call Initialize, this methods calls initialize + // on its own, so there's a usage inconsistency. + /// <summary> + /// Blocking unary request - unary response call. + /// </summary> + public TResponse UnaryCall(Channel channel, String methodName, TRequest msg) { using(CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.Create()) { - // TODO: handle serialization error... - byte[] payload = serializer(msg); + byte[] payload = UnsafeSerialize(msg); - unaryResponseTcs = new TaskCompletionSource<TRead>(); + unaryResponseTcs = new TaskCompletionSource<TResponse>(); lock (myLock) { @@ -143,508 +106,200 @@ namespace Grpc.Core.Internal } } - public Task<TRead> UnaryCallAsync(TWrite msg) + /// <summary> + /// Starts a unary request - unary response call. + /// </summary> + public Task<TResponse> UnaryCallAsync(TRequest msg) { lock (myLock) { + Preconditions.CheckNotNull(call); + started = true; halfcloseRequested = true; readingDone = true; - // TODO: handle serialization error... - byte[] payload = serializer(msg); + byte[] payload = UnsafeSerialize(msg); - unaryResponseTcs = new TaskCompletionSource<TRead>(); + unaryResponseTcs = new TaskCompletionSource<TResponse>(); call.StartUnary(payload, unaryResponseHandler); return unaryResponseTcs.Task; } } - public Task<TRead> ClientStreamingCallAsync() + /// <summary> + /// Starts a streamed request - unary response call. + /// Use StartSendMessage and StartSendCloseFromClient to stream requests. + /// </summary> + public Task<TResponse> ClientStreamingCallAsync() { lock (myLock) { + Preconditions.CheckNotNull(call); + started = true; readingDone = true; - unaryResponseTcs = new TaskCompletionSource<TRead>(); + unaryResponseTcs = new TaskCompletionSource<TResponse>(); call.StartClientStreaming(unaryResponseHandler); return unaryResponseTcs.Task; } } - public void StartServerStreamingCall(TWrite msg, IObserver<TRead> readObserver) + /// <summary> + /// Starts a unary request - streamed response call. + /// </summary> + public void StartServerStreamingCall(TRequest msg, IObserver<TResponse> readObserver) { lock (myLock) { + Preconditions.CheckNotNull(call); + started = true; halfcloseRequested = true; halfclosed = true; // halfclose not confirmed yet, but it will be once finishedHandler is called. this.readObserver = readObserver; - // TODO: handle serialization error... - byte[] payload = serializer(msg); + byte[] payload = UnsafeSerialize(msg); call.StartServerStreaming(payload, finishedHandler); - ReceiveMessageAsync(); + StartReceiveMessage(); } } - public void StartDuplexStreamingCall(IObserver<TRead> readObserver) + /// <summary> + /// Starts a streaming request - streaming response call. + /// Use StartSendMessage and StartSendCloseFromClient to stream requests. + /// </summary> + public void StartDuplexStreamingCall(IObserver<TResponse> readObserver) { lock (myLock) { + Preconditions.CheckNotNull(call); + started = true; this.readObserver = readObserver; call.StartDuplexStreaming(finishedHandler); - ReceiveMessageAsync(); + StartReceiveMessage(); } } - public Task ServerSideUnaryRequestCallAsync() - { - lock (myLock) - { - started = true; - call.StartServerSide(finishedServersideHandler); - return finishedServersideTcs.Task; - } - } - - public Task ServerSideStreamingRequestCallAsync(IObserver<TRead> readObserver) - { - lock (myLock) - { - started = true; - call.StartServerSide(finishedServersideHandler); - - if (this.readObserver != null) - { - throw new InvalidOperationException("Already registered an observer."); - } - this.readObserver = readObserver; - ReceiveMessageAsync(); - - return finishedServersideTcs.Task; - } - } - - public Task SendMessageAsync(TWrite msg) + /// <summary> + /// Sends a streaming request. Only one pending send action is allowed at any given time. + /// completionDelegate is called when the operation finishes. + /// </summary> + public void StartSendMessage(TRequest msg, AsyncCompletionDelegate completionDelegate) { - lock (myLock) - { - CheckNotDisposed(); - CheckStarted(); - CheckNoError(); - - if (halfcloseRequested) - { - throw new InvalidOperationException("Already halfclosed."); - } - - if (writeTcs != null) - { - throw new InvalidOperationException("Only one write can be pending at a time"); - } - - // TODO: wrap serialization... - byte[] payload = serializer(msg); - - call.StartSendMessage(payload, writeFinishedHandler); - writeTcs = new TaskCompletionSource<object>(); - return writeTcs.Task; - } + StartSendMessageInternal(msg, completionDelegate); } - public Task SendCloseFromClientAsync() + /// <summary> + /// Sends halfclose, indicating client is done with streaming requests. + /// Only one pending send action is allowed at any given time. + /// completionDelegate is called when the operation finishes. + /// </summary> + public void StartSendCloseFromClient(AsyncCompletionDelegate completionDelegate) { lock (myLock) { - CheckNotDisposed(); - CheckStarted(); - CheckNoError(); - - if (halfcloseRequested) - { - throw new InvalidOperationException("Already halfclosed."); - } + Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); + CheckSendingAllowed(); call.StartSendCloseFromClient(halfclosedHandler); halfcloseRequested = true; - halfcloseTcs = new TaskCompletionSource<object>(); - return halfcloseTcs.Task; - } - } - - public Task SendStatusFromServerAsync(Status status) - { - lock (myLock) - { - CheckNotDisposed(); - CheckStarted(); - CheckNoError(); - - if (halfcloseRequested) - { - throw new InvalidOperationException("Already halfclosed."); - } - - call.StartSendStatusFromServer(status, halfclosedHandler); - halfcloseRequested = true; - halfcloseTcs = new TaskCompletionSource<object>(); - return halfcloseTcs.Task; + sendCompletionDelegate = completionDelegate; } } - public Task<TRead> ReceiveMessageAsync() + /// <summary> + /// On client-side, we only fire readObserver.OnCompleted once all messages have been read + /// and status has been received. + /// </summary> + protected override void CompleteReadObserver() { - lock (myLock) + if (readingDone && finishedStatus.HasValue) { - CheckNotDisposed(); - CheckStarted(); - CheckNoError(); - - if (readingDone) - { - throw new InvalidOperationException("Already read the last message."); - } - - if (readTcs != null) + bool shouldComplete; + lock (myLock) { - throw new InvalidOperationException("Only one read can be pending at a time"); + shouldComplete = !readObserverCompleted; + readObserverCompleted = true; } - call.StartReceiveMessage(readFinishedHandler); - - readTcs = new TaskCompletionSource<TRead>(); - return readTcs.Task; - } - } - - public void Cancel() - { - lock (myLock) - { - CheckNotDisposed(); - CheckStarted(); - cancelRequested = true; - } - // grpc_call_cancel is threadsafe - call.Cancel(); - } - - public void CancelWithStatus(Status status) - { - lock (myLock) - { - CheckNotDisposed(); - CheckStarted(); - cancelRequested = true; - } - // grpc_call_cancel_with_status is threadsafe - call.CancelWithStatus(status); - } - - private void InitializeInternal(CallSafeHandle call, bool server) - { - lock (myLock) - { - // Make sure this object and the delegated held by it will not be garbage collected - // before we release this handle. - gchandle = GCHandle.Alloc(this); - this.call = call; - this.server = server; - } - } - - private void CheckStarted() - { - if (!started) - { - throw new InvalidOperationException("Call not started"); - } - } - - private void CheckNotDisposed() - { - if (disposed) - { - throw new InvalidOperationException("Call has already been disposed."); - } - } - - private void CheckNoError() - { - if (errorOccured) - { - throw new InvalidOperationException("Error occured when processing call."); - } - } - - private bool ReleaseResourcesIfPossible() - { - if (!disposed && call != null) - { - if (halfclosed && readingDone && finished) + if (shouldComplete) { - ReleaseResources(); - return true; + var status = finishedStatus.Value; + if (status.StatusCode != StatusCode.OK) + { + FireReadObserverOnError(new RpcException(status)); + } + else + { + FireReadObserverOnCompleted(); + } } } - return false; - } - - private void ReleaseResources() - { - if (call != null) { - call.Dispose(); - } - gchandle.Free(); - disposed = true; - } - - private void CompleteStreamObserver(Status status) - { - if (status.StatusCode != StatusCode.OK) - { - // TODO: wrap to handle exceptions; - readObserver.OnError(new RpcException(status)); - } else { - // TODO: wrap to handle exceptions; - readObserver.OnCompleted(); - } } /// <summary> /// Handler for unary response completion. /// </summary> - private void HandleUnaryResponse(GRPCOpError error, IntPtr batchContextPtr) + private void HandleUnaryResponse(bool wasError, BatchContextSafeHandleNotOwned ctx) { - try + lock(myLock) { - TaskCompletionSource<TRead> tcs; - lock(myLock) - { - finished = true; - halfclosed = true; - tcs = unaryResponseTcs; - - ReleaseResourcesIfPossible(); - } - - var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr); + finished = true; + halfclosed = true; - if (error != GRPCOpError.GRPC_OP_OK) - { - tcs.SetException(new RpcException( - new Status(StatusCode.Internal, "Internal error occured.") - )); - return; - } - - var status = ctx.GetReceivedStatus(); - if (status.StatusCode != StatusCode.OK) - { - tcs.SetException(new RpcException(status)); - return; - } - - // TODO: handle deserialize error... - var msg = deserializer(ctx.GetReceivedMessage()); - tcs.SetResult(msg); - } - catch(Exception e) - { - Console.WriteLine("Caught exception in a native handler: " + e); + ReleaseResourcesIfPossible(); } - } - - private void HandleWriteFinished(GRPCOpError error, IntPtr batchContextPtr) - { - try - { - TaskCompletionSource<object> oldTcs = null; - lock (myLock) - { - oldTcs = writeTcs; - writeTcs = null; - } - - if (errorOccured) - { - // TODO: use the right type of exception... - oldTcs.SetException(new Exception("Write failed")); - } - else - { - // TODO: where does the continuation run? - oldTcs.SetResult(null); - } - } - catch(Exception e) + if (wasError) { - Console.WriteLine("Caught exception in a native handler: " + e); + unaryResponseTcs.SetException(new RpcException( + new Status(StatusCode.Internal, "Internal error occured.") + )); + return; } - } - - private void HandleHalfclosed(GRPCOpError error, IntPtr batchContextPtr) - { - try - { - lock (myLock) - { - halfclosed = true; - ReleaseResourcesIfPossible(); - } - - if (error != GRPCOpError.GRPC_OP_OK) - { - halfcloseTcs.SetException(new Exception("Halfclose failed")); - - } - else - { - halfcloseTcs.SetResult(null); - } - } - catch(Exception e) + var status = ctx.GetReceivedStatus(); + if (status.StatusCode != StatusCode.OK) { - Console.WriteLine("Caught exception in a native handler: " + e); + unaryResponseTcs.SetException(new RpcException(status)); + return; } - } - - private void HandleReadFinished(GRPCOpError error, IntPtr batchContextPtr) - { - try - { - var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr); - var payload = ctx.GetReceivedMessage(); - - TaskCompletionSource<TRead> oldTcs = null; - IObserver<TRead> observer = null; - - Nullable<Status> status = null; - - lock (myLock) - { - oldTcs = readTcs; - readTcs = null; - if (payload == null) - { - readingDone = true; - } - observer = readObserver; - status = finishedStatus; - - ReleaseResourcesIfPossible(); - } - - // TODO: wrap deserialization... - TRead msg = payload != null ? deserializer(payload) : default(TRead); - oldTcs.SetResult(msg); + // TODO: handle deserialization error + TResponse msg; + TryDeserialize(ctx.GetReceivedMessage(), out msg); - // TODO: make sure we deliver reads in the right order. - - if (observer != null) - { - if (payload != null) - { - // TODO: wrap to handle exceptions - observer.OnNext(msg); - - // start a new read - ReceiveMessageAsync(); - } - else - { - if (!server) - { - if (status.HasValue) - { - CompleteStreamObserver(status.Value); - } - } - else - { - // TODO: wrap to handle exceptions.. - observer.OnCompleted(); - } - // TODO: completeStreamObserver serverside... - } - } - } - catch(Exception e) - { - Console.WriteLine("Caught exception in a native handler: " + e); - } + unaryResponseTcs.SetResult(msg); } - private void HandleFinished(GRPCOpError error, IntPtr batchContextPtr) + /// <summary> + /// Handles receive status completion for calls with streaming response. + /// </summary> + private void HandleFinished(bool wasError, BatchContextSafeHandleNotOwned ctx) { - try - { - var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr); - var status = ctx.GetReceivedStatus(); - - bool wasReadingDone; - - lock (myLock) - { - finished = true; - finishedStatus = status; - - wasReadingDone = readingDone; - - ReleaseResourcesIfPossible(); - } - - if (wasReadingDone) { - CompleteStreamObserver(status); - } - - } - catch(Exception e) - { - Console.WriteLine("Caught exception in a native handler: " + e); - } - } + var status = ctx.GetReceivedStatus(); - private void HandleFinishedServerside(GRPCOpError error, IntPtr batchContextPtr) - { - try + lock (myLock) { - var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr); - - lock(myLock) - { - finished = true; - - // TODO: because of the way server calls are implemented, we need to set - // reading done to true here. Should be fixed in the future. - readingDone = true; - - ReleaseResourcesIfPossible(); - } - // TODO: handle error ... - - finishedServersideTcs.SetResult(null); + finished = true; + finishedStatus = status; + ReleaseResourcesIfPossible(); } - catch(Exception e) - { - Console.WriteLine("Caught exception in a native handler: " + e); - } + + CompleteReadObserver(); } } } \ No newline at end of file diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs new file mode 100644 index 0000000000000000000000000000000000000000..44d66b394ca6929bc1fea07c0bf85e0de5e0fcf7 --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs @@ -0,0 +1,407 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Diagnostics; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core.Internal; +using Grpc.Core.Utils; + +namespace Grpc.Core.Internal +{ + /// <summary> + /// Base for handling both client side and server side calls. + /// Handles native call lifecycle and provides convenience methods. + /// </summary> + internal abstract class AsyncCallBase<TWrite, TRead> + { + readonly Func<TWrite, byte[]> serializer; + readonly Func<byte[], TRead> deserializer; + + protected readonly CompletionCallbackDelegate sendFinishedHandler; + protected readonly CompletionCallbackDelegate readFinishedHandler; + protected readonly CompletionCallbackDelegate halfclosedHandler; + + protected readonly object myLock = new object(); + + protected GCHandle gchandle; + protected CallSafeHandle call; + protected bool disposed; + + protected bool started; + protected bool errorOccured; + protected bool cancelRequested; + + protected AsyncCompletionDelegate sendCompletionDelegate; // Completion of a pending send or sendclose if not null. + protected bool readPending; // True if there is a read in progress. + protected bool readingDone; + protected bool halfcloseRequested; + protected bool halfclosed; + protected bool finished; // True if close has been received from the peer. + + // Streaming reads will be delivered to this observer. For a call that only does unary read it may remain null. + protected IObserver<TRead> readObserver; + + public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer) + { + this.serializer = Preconditions.CheckNotNull(serializer); + this.deserializer = Preconditions.CheckNotNull(deserializer); + + this.sendFinishedHandler = CreateBatchCompletionCallback(HandleSendFinished); + this.readFinishedHandler = CreateBatchCompletionCallback(HandleReadFinished); + this.halfclosedHandler = CreateBatchCompletionCallback(HandleHalfclosed); + } + + /// <summary> + /// Requests cancelling the call. + /// </summary> + public void Cancel() + { + lock (myLock) + { + Preconditions.CheckState(started); + cancelRequested = true; + + if (!disposed) + { + call.Cancel(); + } + } + } + + /// <summary> + /// Requests cancelling the call with given status. + /// </summary> + public void CancelWithStatus(Status status) + { + lock (myLock) + { + Preconditions.CheckState(started); + cancelRequested = true; + + if (!disposed) + { + call.CancelWithStatus(status); + } + } + } + + protected void InitializeInternal(CallSafeHandle call) + { + lock (myLock) + { + // Make sure this object and the delegated held by it will not be garbage collected + // before we release this handle. + gchandle = GCHandle.Alloc(this); + this.call = call; + } + } + + /// <summary> + /// Initiates sending a message. Only once send operation can be active at a time. + /// completionDelegate is invoked upon completion. + /// </summary> + protected void StartSendMessageInternal(TWrite msg, AsyncCompletionDelegate completionDelegate) + { + byte[] payload = UnsafeSerialize(msg); + + lock (myLock) + { + Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); + CheckSendingAllowed(); + + call.StartSendMessage(payload, sendFinishedHandler); + sendCompletionDelegate = completionDelegate; + } + } + + /// <summary> + /// Requests receiving a next message. + /// </summary> + protected void StartReceiveMessage() + { + lock (myLock) + { + Preconditions.CheckState(started); + Preconditions.CheckState(!disposed); + Preconditions.CheckState(!errorOccured); + + Preconditions.CheckState(!readingDone); + Preconditions.CheckState(!readPending); + + call.StartReceiveMessage(readFinishedHandler); + readPending = true; + } + } + + /// <summary> + /// Default behavior just completes the read observer, but more sofisticated behavior might be required + /// by subclasses. + /// </summary> + protected virtual void CompleteReadObserver() + { + FireReadObserverOnCompleted(); + } + + /// <summary> + /// If there are no more pending actions and no new actions can be started, releases + /// the underlying native resources. + /// </summary> + protected bool ReleaseResourcesIfPossible() + { + if (!disposed && call != null) + { + if (halfclosed && readingDone && finished) + { + ReleaseResources(); + return true; + } + } + return false; + } + + private void ReleaseResources() + { + if (call != null) + { + call.Dispose(); + } + gchandle.Free(); + disposed = true; + } + + protected void CheckSendingAllowed() + { + Preconditions.CheckState(started); + Preconditions.CheckState(!disposed); + Preconditions.CheckState(!errorOccured); + + Preconditions.CheckState(!halfcloseRequested, "Already halfclosed."); + Preconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time"); + } + + protected byte[] UnsafeSerialize(TWrite msg) + { + return serializer(msg); + } + + protected bool TrySerialize(TWrite msg, out byte[] payload) + { + try + { + payload = serializer(msg); + return true; + } + catch(Exception) + { + Console.WriteLine("Exception occured while trying to serialize message"); + payload = null; + return false; + } + } + + protected bool TryDeserialize(byte[] payload, out TRead msg) + { + try + { + msg = deserializer(payload); + return true; + } + catch(Exception) + { + Console.WriteLine("Exception occured while trying to deserialize message"); + msg = default(TRead); + return false; + } + } + + protected void FireReadObserverOnNext(TRead value) + { + try + { + readObserver.OnNext(value); + } + catch(Exception e) + { + Console.WriteLine("Exception occured while invoking readObserver.OnNext: " + e); + } + } + + protected void FireReadObserverOnCompleted() + { + try + { + readObserver.OnCompleted(); + } + catch(Exception e) + { + Console.WriteLine("Exception occured while invoking readObserver.OnCompleted: " + e); + } + } + + protected void FireReadObserverOnError(Exception error) + { + try + { + readObserver.OnError(error); + } + catch(Exception e) + { + Console.WriteLine("Exception occured while invoking readObserver.OnError: " + e); + } + } + + protected void FireCompletion(AsyncCompletionDelegate completionDelegate, Exception error) + { + try + { + completionDelegate(error); + } + catch(Exception e) + { + Console.WriteLine("Exception occured while invoking completion delegate: " + e); + } + } + + /// <summary> + /// Creates completion callback delegate that wraps the batch completion handler in a try catch block to + /// prevent propagating exceptions accross managed/unmanaged boundary. + /// </summary> + protected CompletionCallbackDelegate CreateBatchCompletionCallback(Action<bool, BatchContextSafeHandleNotOwned> handler) + { + return new CompletionCallbackDelegate( (error, batchContextPtr) => { + try + { + var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr); + bool wasError = (error != GRPCOpError.GRPC_OP_OK); + handler(wasError, ctx); + } + catch(Exception e) + { + Console.WriteLine("Caught exception in a native handler: " + e); + } + }); + } + + /// <summary> + /// Handles send completion. + /// </summary> + private void HandleSendFinished(bool wasError, BatchContextSafeHandleNotOwned ctx) + { + AsyncCompletionDelegate origCompletionDelegate = null; + lock (myLock) + { + origCompletionDelegate = sendCompletionDelegate; + sendCompletionDelegate = null; + + ReleaseResourcesIfPossible(); + } + + if (wasError) + { + FireCompletion(origCompletionDelegate, new OperationFailedException("Send failed")); + } + else + { + FireCompletion(origCompletionDelegate, null); + } + } + + /// <summary> + /// Handles halfclose completion. + /// </summary> + private void HandleHalfclosed(bool wasError, BatchContextSafeHandleNotOwned ctx) + { + AsyncCompletionDelegate origCompletionDelegate = null; + lock (myLock) + { + halfclosed = true; + origCompletionDelegate = sendCompletionDelegate; + sendCompletionDelegate = null; + + ReleaseResourcesIfPossible(); + } + + if (wasError) + { + FireCompletion(origCompletionDelegate, new OperationFailedException("Halfclose failed")); + } + else + { + FireCompletion(origCompletionDelegate, null); + } + + } + + /// <summary> + /// Handles streaming read completion. + /// </summary> + private void HandleReadFinished(bool wasError, BatchContextSafeHandleNotOwned ctx) + { + var payload = ctx.GetReceivedMessage(); + + lock (myLock) + { + readPending = false; + if (payload == null) + { + readingDone = true; + } + + ReleaseResourcesIfPossible(); + } + + // TODO: handle the case when error occured... + + if (payload != null) + { + // TODO: handle deserialization error + TRead msg; + TryDeserialize(payload, out msg); + + FireReadObserverOnNext(msg); + + // Start a new read. The current one has already been delivered, + // so correct ordering of reads is assured. + StartReceiveMessage(); + } + else + { + CompleteReadObserver(); + } + } + } +} \ No newline at end of file diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs new file mode 100644 index 0000000000000000000000000000000000000000..d3a2be553fc6ac21cc1f476c5d7f16d9692be0d3 --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs @@ -0,0 +1,125 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Diagnostics; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core.Internal; +using Grpc.Core.Utils; + +namespace Grpc.Core.Internal +{ + /// <summary> + /// Handles server side native call lifecycle. + /// </summary> + internal class AsyncCallServer<TRequest, TResponse> : AsyncCallBase<TResponse, TRequest> + { + readonly CompletionCallbackDelegate finishedServersideHandler; + readonly TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>(); + + public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer) : base(serializer, deserializer) + { + this.finishedServersideHandler = CreateBatchCompletionCallback(HandleFinishedServerside); + } + + public void Initialize(CallSafeHandle call) + { + InitializeInternal(call); + } + + /// <summary> + /// Starts a server side call. Currently, all server side calls are implemented as duplex + /// streaming call and they are adapted to the appropriate streaming arity. + /// </summary> + public Task ServerSideCallAsync(IObserver<TRequest> readObserver) + { + lock (myLock) + { + Preconditions.CheckNotNull(call); + + started = true; + this.readObserver = readObserver; + + call.StartServerSide(finishedServersideHandler); + StartReceiveMessage(); + return finishedServersideTcs.Task; + } + } + + /// <summary> + /// Sends a streaming response. Only one pending send action is allowed at any given time. + /// completionDelegate is called when the operation finishes. + /// </summary> + public void StartSendMessage(TResponse msg, AsyncCompletionDelegate completionDelegate) + { + StartSendMessageInternal(msg, completionDelegate); + } + + /// <summary> + /// Sends call result status, also indicating server is done with streaming responses. + /// Only one pending send action is allowed at any given time. + /// completionDelegate is called when the operation finishes. + /// </summary> + public void StartSendStatusFromServer(Status status, AsyncCompletionDelegate completionDelegate) + { + lock (myLock) + { + Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); + CheckSendingAllowed(); + + call.StartSendStatusFromServer(status, halfclosedHandler); + halfcloseRequested = true; + sendCompletionDelegate = completionDelegate; + } + } + + /// <summary> + /// Handles the server side close completion. + /// </summary> + private void HandleFinishedServerside(bool wasError, BatchContextSafeHandleNotOwned ctx) + { + lock (myLock) + { + finished = true; + + ReleaseResourcesIfPossible(); + } + // TODO: handle error ... + + finishedServersideTcs.SetResult(null); + } + } +} \ No newline at end of file diff --git a/src/csharp/Grpc.Core/Internal/AsyncCompletion.cs b/src/csharp/Grpc.Core/Internal/AsyncCompletion.cs new file mode 100644 index 0000000000000000000000000000000000000000..b78bb497fa71efdc0a01340f50bc990a8c454ecd --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/AsyncCompletion.cs @@ -0,0 +1,95 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Diagnostics; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core.Internal; +using Grpc.Core.Utils; + +namespace Grpc.Core.Internal +{ + /// <summary> + /// If error != null, there's been an error or operation has been cancelled. + /// </summary> + internal delegate void AsyncCompletionDelegate(Exception error); + + /// <summary> + /// Helper for transforming AsyncCompletionDelegate into full-fledged Task. + /// </summary> + internal class AsyncCompletionTaskSource + { + readonly TaskCompletionSource<object> tcs = new TaskCompletionSource<object>(); + readonly AsyncCompletionDelegate completionDelegate; + + public AsyncCompletionTaskSource() + { + completionDelegate = new AsyncCompletionDelegate(HandleCompletion); + } + + public Task Task + { + get + { + return tcs.Task; + } + } + + public AsyncCompletionDelegate CompletionDelegate + { + get + { + return completionDelegate; + } + } + + private void HandleCompletion(Exception error) + { + if (error == null) + { + tcs.SetResult(null); + return; + } + if (error is OperationCanceledException) + { + tcs.SetCanceled(); + return; + } + tcs.SetException(error); + } + } + +} \ No newline at end of file diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs index 1c0bc98f062db57fd27bca50e8ade5d966e32047..61566b54072bbb8ab11f9bde2240d4b3760bd0a9 100644 --- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs @@ -1,5 +1,4 @@ #region Copyright notice and license - // Copyright 2015, Google Inc. // All rights reserved. // @@ -30,7 +29,6 @@ // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #endregion - using System; using System.Diagnostics; using System.Runtime.InteropServices; @@ -38,14 +36,12 @@ using Grpc.Core; namespace Grpc.Core.Internal { - //TODO: rename the delegate - internal delegate void CompletionCallbackDelegate(GRPCOpError error, IntPtr batchContextPtr); - + internal delegate void CompletionCallbackDelegate(GRPCOpError error,IntPtr batchContextPtr); /// <summary> /// grpc_call from <grpc/grpc.h> /// </summary> - internal class CallSafeHandle : SafeHandleZeroIsInvalid - { + internal class CallSafeHandle : SafeHandleZeroIsInvalid + { const UInt32 GRPC_WRITE_BUFFER_HINT = 1; [DllImport("grpc_csharp_ext.dll")] @@ -59,22 +55,22 @@ namespace Grpc.Core.Internal [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_start_unary(CallSafeHandle call, - [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, - byte[] send_buffer, UIntPtr send_buffer_len); + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, + byte[] send_buffer, UIntPtr send_buffer_len); [DllImport("grpc_csharp_ext.dll")] static extern void grpcsharp_call_blocking_unary(CallSafeHandle call, CompletionQueueSafeHandle dedicatedCq, - [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, - byte[] send_buffer, UIntPtr send_buffer_len); + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, + byte[] send_buffer, UIntPtr send_buffer_len); [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call, - [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_start_server_streaming(CallSafeHandle call, - [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, - byte[] send_buffer, UIntPtr send_buffer_len); + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, + byte[] send_buffer, UIntPtr send_buffer_len); [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_start_duplex_streaming(CallSafeHandle call, @@ -82,28 +78,27 @@ namespace Grpc.Core.Internal [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_send_message(CallSafeHandle call, - [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, - byte[] send_buffer, UIntPtr send_buffer_len); + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, + byte[] send_buffer, UIntPtr send_buffer_len); [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_send_close_from_client(CallSafeHandle call, - [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, StatusCode statusCode, string statusMessage); [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call, - [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_start_serverside(CallSafeHandle call, - [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); [DllImport("grpc_csharp_ext.dll")] static extern void grpcsharp_call_destroy(IntPtr call); - private CallSafeHandle() { } @@ -115,12 +110,12 @@ namespace Grpc.Core.Internal public void StartUnary(byte[] payload, CompletionCallbackDelegate callback) { - AssertCallOk(grpcsharp_call_start_unary(this, callback, payload, new UIntPtr((ulong) payload.Length))); + AssertCallOk(grpcsharp_call_start_unary(this, callback, payload, new UIntPtr((ulong)payload.Length))); } public void BlockingUnary(CompletionQueueSafeHandle dedicatedCq, byte[] payload, CompletionCallbackDelegate callback) { - grpcsharp_call_blocking_unary(this, dedicatedCq, callback, payload, new UIntPtr((ulong) payload.Length)); + grpcsharp_call_blocking_unary(this, dedicatedCq, callback, payload, new UIntPtr((ulong)payload.Length)); } public void StartClientStreaming(CompletionCallbackDelegate callback) @@ -130,7 +125,7 @@ namespace Grpc.Core.Internal public void StartServerStreaming(byte[] payload, CompletionCallbackDelegate callback) { - AssertCallOk(grpcsharp_call_start_server_streaming(this, callback, payload, new UIntPtr((ulong) payload.Length))); + AssertCallOk(grpcsharp_call_start_server_streaming(this, callback, payload, new UIntPtr((ulong)payload.Length))); } public void StartDuplexStreaming(CompletionCallbackDelegate callback) @@ -140,7 +135,7 @@ namespace Grpc.Core.Internal public void StartSendMessage(byte[] payload, CompletionCallbackDelegate callback) { - AssertCallOk(grpcsharp_call_send_message(this, callback, payload, new UIntPtr((ulong) payload.Length))); + AssertCallOk(grpcsharp_call_send_message(this, callback, payload, new UIntPtr((ulong)payload.Length))); } public void StartSendCloseFromClient(CompletionCallbackDelegate callback) @@ -173,19 +168,20 @@ namespace Grpc.Core.Internal AssertCallOk(grpcsharp_call_cancel_with_status(this, status.StatusCode, status.Detail)); } - protected override bool ReleaseHandle() - { + protected override bool ReleaseHandle() + { grpcsharp_call_destroy(handle); - return true; - } + return true; + } private static void AssertCallOk(GRPCCallError callError) { Trace.Assert(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK"); } - private static UInt32 GetFlags(bool buffered) { + private static UInt32 GetFlags(bool buffered) + { return buffered ? 0 : GRPC_WRITE_BUFFER_HINT; } - } + } } \ No newline at end of file diff --git a/src/csharp/Grpc.Core/Internal/ClientStreamingInputObserver.cs b/src/csharp/Grpc.Core/Internal/ClientStreamingInputObserver.cs index fb59e86e2d7f8e0d017abc4b26910ac8cc2fcb65..286c54f2c47811f458d4b1543b06007e7b836378 100644 --- a/src/csharp/Grpc.Core/Internal/ClientStreamingInputObserver.cs +++ b/src/csharp/Grpc.Core/Internal/ClientStreamingInputObserver.cs @@ -1,5 +1,4 @@ #region Copyright notice and license - // Copyright 2015, Google Inc. // All rights reserved. // @@ -28,40 +27,40 @@ // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - #endregion - using System; using Grpc.Core.Internal; namespace Grpc.Core.Internal { internal class ClientStreamingInputObserver<TWrite, TRead> : IObserver<TWrite> - { + { readonly AsyncCall<TWrite, TRead> call; public ClientStreamingInputObserver(AsyncCall<TWrite, TRead> call) - { + { this.call = call; - } - - public void OnCompleted() - { + } + public void OnCompleted() + { + var taskSource = new AsyncCompletionTaskSource(); + call.StartSendCloseFromClient(taskSource.CompletionDelegate); // TODO: how bad is the Wait here? - call.SendCloseFromClientAsync().Wait(); - } + taskSource.Task.Wait(); + } - public void OnError(Exception error) - { - throw new InvalidOperationException("This should never be called."); - } + public void OnError(Exception error) + { + throw new InvalidOperationException("This should never be called."); + } - public void OnNext(TWrite value) - { + public void OnNext(TWrite value) + { + var taskSource = new AsyncCompletionTaskSource(); + call.StartSendMessage(value, taskSource.CompletionDelegate); // TODO: how bad is the Wait here? - call.SendMessageAsync(value).Wait(); - } - } + taskSource.Task.Wait(); + } + } } - diff --git a/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs index 3f01fdbfd05c70d4ac130681d6af1ab91a6530c7..6bff923c55229166189b497d666b797436dc5d3b 100644 --- a/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs @@ -1,5 +1,4 @@ #region Copyright notice and license - // Copyright 2015, Google Inc. // All rights reserved. // @@ -28,9 +27,7 @@ // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - #endregion - using System; using System.Runtime.InteropServices; using System.Threading.Tasks; @@ -40,8 +37,8 @@ namespace Grpc.Core.Internal /// <summary> /// grpc_completion_queue from <grpc/grpc.h> /// </summary> - internal class CompletionQueueSafeHandle : SafeHandleZeroIsInvalid - { + internal class CompletionQueueSafeHandle : SafeHandleZeroIsInvalid + { [DllImport("grpc_csharp_ext.dll")] static extern CompletionQueueSafeHandle grpcsharp_completion_queue_create(); @@ -73,11 +70,11 @@ namespace Grpc.Core.Internal grpcsharp_completion_queue_shutdown(this); } - protected override bool ReleaseHandle() + protected override bool ReleaseHandle() { grpcsharp_completion_queue_destroy(handle); - return true; - } - } + return true; + } + } } diff --git a/src/csharp/Grpc.Core/Internal/ServerStreamingOutputObserver.cs b/src/csharp/Grpc.Core/Internal/ServerStreamingOutputObserver.cs index 08d99214754bd9316a75f5a252fc0861694bbe7a..9873dc9c71077b0d8650199e288e743f8f348896 100644 --- a/src/csharp/Grpc.Core/Internal/ServerStreamingOutputObserver.cs +++ b/src/csharp/Grpc.Core/Internal/ServerStreamingOutputObserver.cs @@ -1,5 +1,4 @@ #region Copyright notice and license - // Copyright 2015, Google Inc. // All rights reserved. // @@ -28,9 +27,7 @@ // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - #endregion - using System; using Grpc.Core.Internal; @@ -40,32 +37,36 @@ namespace Grpc.Core.Internal /// Observer that writes all arriving messages to a call abstraction (in blocking fashion) /// and then halfcloses the call. Used for server-side call handling. /// </summary> - internal class ServerStreamingOutputObserver<TWrite, TRead> : IObserver<TWrite> - { - readonly AsyncCall<TWrite, TRead> call; + internal class ServerStreamingOutputObserver<TRequest, TResponse> : IObserver<TResponse> + { + readonly AsyncCallServer<TRequest, TResponse> call; - public ServerStreamingOutputObserver(AsyncCall<TWrite, TRead> call) - { + public ServerStreamingOutputObserver(AsyncCallServer<TRequest, TResponse> call) + { this.call = call; - } + } - public void OnCompleted() - { + public void OnCompleted() + { + var taskSource = new AsyncCompletionTaskSource(); + call.StartSendStatusFromServer(new Status(StatusCode.OK, ""), taskSource.CompletionDelegate); // TODO: how bad is the Wait here? - call.SendStatusFromServerAsync(new Status(StatusCode.OK, "")).Wait(); - } + taskSource.Task.Wait(); + } - public void OnError(Exception error) - { + public void OnError(Exception error) + { // TODO: implement this... - throw new InvalidOperationException("This should never be called."); - } + throw new InvalidOperationException("This should never be called."); + } - public void OnNext(TWrite value) - { + public void OnNext(TResponse value) + { + var taskSource = new AsyncCompletionTaskSource(); + call.StartSendMessage(value, taskSource.CompletionDelegate); // TODO: how bad is the Wait here? - call.SendMessageAsync(value).Wait(); - } - } + taskSource.Task.Wait(); + } + } } diff --git a/src/csharp/Grpc.Core/Internal/Timespec.cs b/src/csharp/Grpc.Core/Internal/Timespec.cs index b191ecde94cc6e3e96cf966de8fe6813ab5b2628..e6efd66f13b9eb717d573f4469927d56c749d9f7 100644 --- a/src/csharp/Grpc.Core/Internal/Timespec.cs +++ b/src/csharp/Grpc.Core/Internal/Timespec.cs @@ -1,5 +1,4 @@ #region Copyright notice and license - // Copyright 2015, Google Inc. // All rights reserved. // @@ -28,21 +27,19 @@ // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - #endregion - using System; using System.Runtime.InteropServices; using System.Threading; namespace Grpc.Core.Internal { - /// <summary> - /// gpr_timespec from grpc/support/time.h - /// </summary> - [StructLayout(LayoutKind.Sequential)] - internal struct Timespec - { + /// <summary> + /// gpr_timespec from grpc/support/time.h + /// </summary> + [StructLayout(LayoutKind.Sequential)] + internal struct Timespec + { const int nanosPerSecond = 1000 * 1000 * 1000; const int nanosPerTick = 100; @@ -54,23 +51,22 @@ namespace Grpc.Core.Internal [DllImport("grpc_csharp_ext.dll")] static extern int gprsharp_sizeof_timespec(); - // TODO: revisit this. - // NOTE: on linux 64bit sizeof(gpr_timespec) = 16, on windows 32bit sizeof(gpr_timespec) = 8 + // NOTE: on linux 64bit sizeof(gpr_timespec) = 16, on windows 32bit sizeof(gpr_timespec) = 8 // so IntPtr seems to have the right size to work on both. - public System.IntPtr tv_sec; - public System.IntPtr tv_nsec; + public System.IntPtr tv_sec; + public System.IntPtr tv_nsec; - /// <summary> - /// Timespec a long time in the future. - /// </summary> - public static Timespec InfFuture - { - get - { + /// <summary> + /// Timespec a long time in the future. + /// </summary> + public static Timespec InfFuture + { + get + { return gprsharp_inf_future(); - } - } + } + } public static Timespec Now { @@ -92,7 +88,8 @@ namespace Grpc.Core.Internal /// Creates a GPR deadline from current instant and given timeout. /// </summary> /// <returns>The from timeout.</returns> - public static Timespec DeadlineFromTimeout(TimeSpan timeout) { + public static Timespec DeadlineFromTimeout(TimeSpan timeout) + { if (timeout == Timeout.InfiniteTimeSpan) { return Timespec.InfFuture; @@ -100,7 +97,8 @@ namespace Grpc.Core.Internal return Timespec.Now.Add(timeout); } - public Timespec Add(TimeSpan timeSpan) { + public Timespec Add(TimeSpan timeSpan) + { long nanos = tv_nsec.ToInt64() + (timeSpan.Ticks % TimeSpan.TicksPerSecond) * nanosPerTick; long overflow_sec = (nanos > nanosPerSecond) ? 1 : 0; @@ -109,6 +107,6 @@ namespace Grpc.Core.Internal result.tv_sec = new IntPtr(tv_sec.ToInt64() + (timeSpan.Ticks / TimeSpan.TicksPerSecond) + overflow_sec); return result; } - } + } } diff --git a/src/csharp/Grpc.Core/OperationFailedException.cs b/src/csharp/Grpc.Core/OperationFailedException.cs new file mode 100644 index 0000000000000000000000000000000000000000..34a8c95a85c3790f49582bdcd0e1d627e837a4c7 --- /dev/null +++ b/src/csharp/Grpc.Core/OperationFailedException.cs @@ -0,0 +1,48 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; + +namespace Grpc.Core +{ + /// <summary> + /// Thrown when gRPC operation fails. + /// </summary> + public class OperationFailedException : Exception + { + public OperationFailedException(string message) : base(message) + { + } + } +} + diff --git a/src/csharp/Grpc.Core/ServerCallHandler.cs b/src/csharp/Grpc.Core/ServerCallHandler.cs index 289f97aecee2777f6a7718bed807d1cacb3278c4..3eb8422f575e3b4f745e03f13856c254d76de10b 100644 --- a/src/csharp/Grpc.Core/ServerCallHandler.cs +++ b/src/csharp/Grpc.Core/ServerCallHandler.cs @@ -32,7 +32,9 @@ #endregion using System; +using System.Linq; using Grpc.Core.Internal; +using Grpc.Core.Utils; namespace Grpc.Core { @@ -54,17 +56,17 @@ namespace Grpc.Core public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq) { - var asyncCall = new AsyncCall<TResponse, TRequest>( + var asyncCall = new AsyncCallServer<TRequest, TResponse>( method.ResponseMarshaller.Serializer, method.RequestMarshaller.Deserializer); - asyncCall.InitializeServer(call); + asyncCall.Initialize(call); - var finishedTask = asyncCall.ServerSideUnaryRequestCallAsync(); + var requestObserver = new RecordingObserver<TRequest>(); + var finishedTask = asyncCall.ServerSideCallAsync(requestObserver); - var request = asyncCall.ReceiveMessageAsync().Result; - - var responseObserver = new ServerStreamingOutputObserver<TResponse, TRequest>(asyncCall); + var request = requestObserver.ToList().Result.Single(); + var responseObserver = new ServerStreamingOutputObserver<TRequest, TResponse>(asyncCall); handler(request, responseObserver); finishedTask.Wait(); @@ -85,15 +87,15 @@ namespace Grpc.Core public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq) { - var asyncCall = new AsyncCall<TResponse, TRequest>( + var asyncCall = new AsyncCallServer<TRequest, TResponse>( method.ResponseMarshaller.Serializer, method.RequestMarshaller.Deserializer); - asyncCall.InitializeServer(call); + asyncCall.Initialize(call); - var responseObserver = new ServerStreamingOutputObserver<TResponse, TRequest>(asyncCall); + var responseObserver = new ServerStreamingOutputObserver<TRequest,TResponse>(asyncCall); var requestObserver = handler(responseObserver); - var finishedTask = asyncCall.ServerSideStreamingRequestCallAsync(requestObserver); + var finishedTask = asyncCall.ServerSideCallAsync(requestObserver); finishedTask.Wait(); } } @@ -103,17 +105,15 @@ namespace Grpc.Core public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq) { // We don't care about the payload type here. - AsyncCall<byte[], byte[]> asyncCall = new AsyncCall<byte[], byte[]>( + var asyncCall = new AsyncCallServer<byte[], byte[]>( (payload) => payload, (payload) => payload); + asyncCall.Initialize(call); - asyncCall.InitializeServer(call); - - var finishedTask = asyncCall.ServerSideStreamingRequestCallAsync(new NullObserver<byte[]>()); + var finishedTask = asyncCall.ServerSideCallAsync(new NullObserver<byte[]>()); - // TODO: this makes the call finish before all reads can be done which causes trouble - // in AsyncCall.HandleReadFinished callback. Revisit this. - asyncCall.SendStatusFromServerAsync(new Status(StatusCode.Unimplemented, "No such method.")).Wait(); + // TODO: check result of the completion status. + asyncCall.StartSendStatusFromServer(new Status(StatusCode.Unimplemented, "No such method."), new AsyncCompletionDelegate((error) => {})); finishedTask.Wait(); } diff --git a/src/csharp/Grpc.Core/Status.cs b/src/csharp/Grpc.Core/Status.cs index 5ea1df7b481c2bec149fb0550f04b152a427b336..080bbdc2f5b3544b5917126837490732ae1a3f9f 100644 --- a/src/csharp/Grpc.Core/Status.cs +++ b/src/csharp/Grpc.Core/Status.cs @@ -1,5 +1,4 @@ #region Copyright notice and license - // Copyright 2015, Google Inc. // All rights reserved. // @@ -28,7 +27,6 @@ // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - #endregion using System; @@ -36,34 +34,40 @@ using System.Runtime.InteropServices; namespace Grpc.Core { - /// <summary> - /// Represents RPC result. - /// </summary> - public struct Status - { - readonly StatusCode statusCode; - readonly string detail; + /// <summary> + /// Represents RPC result. + /// </summary> + public struct Status + { + readonly StatusCode statusCode; + readonly string detail; - public Status(StatusCode statusCode, string detail) - { - this.statusCode = statusCode; - this.detail = detail; - } + public Status(StatusCode statusCode, string detail) + { + this.statusCode = statusCode; + this.detail = detail; + } - public StatusCode StatusCode - { - get - { - return statusCode; - } - } + /// <summary> + /// Gets the gRPC status code. OK indicates success, all other values indicate an error. + /// </summary> + public StatusCode StatusCode + { + get + { + return statusCode; + } + } - public string Detail - { - get - { - return detail; - } - } - } + /// <summary> + /// Gets the detail. + /// </summary> + public string Detail + { + get + { + return detail; + } + } + } } diff --git a/src/csharp/Grpc.Core/Utils/Preconditions.cs b/src/csharp/Grpc.Core/Utils/Preconditions.cs new file mode 100644 index 0000000000000000000000000000000000000000..b17ce4211783701b6b4ab8d6e9fb84057fe0fc75 --- /dev/null +++ b/src/csharp/Grpc.Core/Utils/Preconditions.cs @@ -0,0 +1,113 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Threading.Tasks; +using System.Collections.Generic; +using System.Collections.Concurrent; +using System.Diagnostics; + +namespace Grpc.Core.Utils +{ + public static class Preconditions + { + /// <summary> + /// Throws ArgumentException if condition is false. + /// </summary> + public static void CheckArgument(bool condition) + { + if (!condition) + { + throw new ArgumentException(); + } + } + + /// <summary> + /// Throws ArgumentException with given message if condition is false. + /// </summary> + public static void CheckArgument(bool condition, string errorMessage) + { + if (!condition) + { + throw new ArgumentException(errorMessage); + } + } + + /// <summary> + /// Throws NullReferenceException if reference is null. + /// </summary> + public static T CheckNotNull<T> (T reference) + { + if (reference == null) + { + throw new NullReferenceException(); + } + return reference; + } + + /// <summary> + /// Throws NullReferenceException with given message if reference is null. + /// </summary> + public static T CheckNotNull<T> (T reference, string errorMessage) + { + if (reference == null) + { + throw new NullReferenceException(errorMessage); + } + return reference; + } + + /// <summary> + /// Throws InvalidOperationException if condition is false. + /// </summary> + public static void CheckState(bool condition) + { + if (!condition) + { + throw new InvalidOperationException(); + } + } + + /// <summary> + /// Throws InvalidOperationException with given message if condition is false. + /// </summary> + public static void CheckState(bool condition, string errorMessage) + { + if (!condition) + { + throw new InvalidOperationException(errorMessage); + } + } + } +} + diff --git a/src/csharp/Grpc.Examples.MathClient/MathClient.cs b/src/csharp/Grpc.Examples.MathClient/MathClient.cs index 95a4678bb8f63e623bd0c3a3c78688922fe7e07a..f5956bd33e008cdafe2e0116d76640645e793570 100644 --- a/src/csharp/Grpc.Examples.MathClient/MathClient.cs +++ b/src/csharp/Grpc.Examples.MathClient/MathClient.cs @@ -1,5 +1,4 @@ #region Copyright notice and license - // Copyright 2015, Google Inc. // All rights reserved. // @@ -28,9 +27,7 @@ // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - #endregion - using System; using System.Runtime.InteropServices; using System.Threading; @@ -38,25 +35,25 @@ using Grpc.Core; namespace math { - class MathClient + class MathClient { - public static void Main (string[] args) - { + public static void Main(string[] args) + { GrpcEnvironment.Initialize(); - using (Channel channel = new Channel("127.0.0.1:23456")) - { - MathGrpc.IMathServiceClient stub = new MathGrpc.MathServiceClientStub(channel); - MathExamples.DivExample(stub); + using (Channel channel = new Channel("127.0.0.1:23456")) + { + MathGrpc.IMathServiceClient stub = new MathGrpc.MathServiceClientStub(channel); + MathExamples.DivExample(stub); MathExamples.FibExample(stub); - MathExamples.SumExample(stub); + MathExamples.SumExample(stub); - MathExamples.DivManyExample(stub); - } + MathExamples.DivManyExample(stub); + } GrpcEnvironment.Shutdown(); - } - } + } + } } diff --git a/src/csharp/Grpc.Examples/MathExamples.cs b/src/csharp/Grpc.Examples/MathExamples.cs index 97c91b1b1b586248b03e4d94200e7a9629627fef..134270f6f76c8e20c367d132c9e5bffe009df3db 100644 --- a/src/csharp/Grpc.Examples/MathExamples.cs +++ b/src/csharp/Grpc.Examples/MathExamples.cs @@ -1,5 +1,4 @@ #region Copyright notice and license - // Copyright 2015, Google Inc. // All rights reserved. // @@ -28,7 +27,6 @@ // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - #endregion using System; @@ -39,59 +37,63 @@ using Grpc.Core.Utils; namespace math { - public static class MathExamples - { - public static void DivExample(MathGrpc.IMathServiceClient stub) - { - DivReply result = stub.Div(new DivArgs.Builder { Dividend = 10, Divisor = 3 }.Build()); - Console.WriteLine("Div Result: " + result); - } - - public static void DivAsyncExample(MathGrpc.IMathServiceClient stub) - { - Task<DivReply> call = stub.DivAsync(new DivArgs.Builder { Dividend = 4, Divisor = 5 }.Build()); - DivReply result = call.Result; - Console.WriteLine(result); - } - - public static void DivAsyncWithCancellationExample(MathGrpc.IMathServiceClient stub) - { - Task<DivReply> call = stub.DivAsync(new DivArgs.Builder { Dividend = 4, Divisor = 5 }.Build()); - DivReply result = call.Result; - Console.WriteLine(result); - } - - public static void FibExample(MathGrpc.IMathServiceClient stub) - { + public static class MathExamples + { + public static void DivExample(MathGrpc.IMathServiceClient stub) + { + DivReply result = stub.Div(new DivArgs.Builder { Dividend = 10, Divisor = 3 }.Build()); + Console.WriteLine("Div Result: " + result); + } + + public static void DivAsyncExample(MathGrpc.IMathServiceClient stub) + { + Task<DivReply> call = stub.DivAsync(new DivArgs.Builder { Dividend = 4, Divisor = 5 }.Build()); + DivReply result = call.Result; + Console.WriteLine(result); + } + + public static void DivAsyncWithCancellationExample(MathGrpc.IMathServiceClient stub) + { + Task<DivReply> call = stub.DivAsync(new DivArgs.Builder { Dividend = 4, Divisor = 5 }.Build()); + DivReply result = call.Result; + Console.WriteLine(result); + } + + public static void FibExample(MathGrpc.IMathServiceClient stub) + { var recorder = new RecordingObserver<Num>(); stub.Fib(new FibArgs.Builder { Limit = 5 }.Build(), recorder); - List<Num> numbers = recorder.ToList().Result; + List<Num> numbers = recorder.ToList().Result; Console.WriteLine("Fib Result: " + string.Join("|", recorder.ToList().Result)); - } + } - public static void SumExample(MathGrpc.IMathServiceClient stub) - { - List<Num> numbers = new List<Num>{new Num.Builder { Num_ = 1 }.Build(), - new Num.Builder { Num_ = 2 }.Build(), - new Num.Builder { Num_ = 3 }.Build()}; + public static void SumExample(MathGrpc.IMathServiceClient stub) + { + List<Num> numbers = new List<Num> + {new Num.Builder { Num_ = 1 }.Build(), + new Num.Builder { Num_ = 2 }.Build(), + new Num.Builder { Num_ = 3 }.Build() + }; var res = stub.Sum(); - foreach (var num in numbers) { + foreach (var num in numbers) + { res.Inputs.OnNext(num); } res.Inputs.OnCompleted(); - Console.WriteLine("Sum Result: " + res.Task.Result); - } + Console.WriteLine("Sum Result: " + res.Task.Result); + } - public static void DivManyExample(MathGrpc.IMathServiceClient stub) - { - List<DivArgs> divArgsList = new List<DivArgs>{ - new DivArgs.Builder { Dividend = 10, Divisor = 3 }.Build(), - new DivArgs.Builder { Dividend = 100, Divisor = 21 }.Build(), - new DivArgs.Builder { Dividend = 7, Divisor = 2 }.Build() - }; + public static void DivManyExample(MathGrpc.IMathServiceClient stub) + { + List<DivArgs> divArgsList = new List<DivArgs> + { + new DivArgs.Builder { Dividend = 10, Divisor = 3 }.Build(), + new DivArgs.Builder { Dividend = 100, Divisor = 21 }.Build(), + new DivArgs.Builder { Dividend = 7, Divisor = 2 }.Build() + }; var recorder = new RecordingObserver<DivReply>(); @@ -102,30 +104,30 @@ namespace math } inputs.OnCompleted(); - Console.WriteLine("DivMany Result: " + string.Join("|", recorder.ToList().Result)); - } + Console.WriteLine("DivMany Result: " + string.Join("|", recorder.ToList().Result)); + } - public static void DependendRequestsExample(MathGrpc.IMathServiceClient stub) - { - var numberList = new List<Num> - { new Num.Builder{ Num_ = 1 }.Build(), - new Num.Builder{ Num_ = 2 }.Build(), new Num.Builder{ Num_ = 3 }.Build() - }; + public static void DependendRequestsExample(MathGrpc.IMathServiceClient stub) + { + var numberList = new List<Num> + { new Num.Builder{ Num_ = 1 }.Build(), + new Num.Builder{ Num_ = 2 }.Build(), new Num.Builder{ Num_ = 3 }.Build() + }; - numberList.ToObservable(); + numberList.ToObservable(); - //IObserver<Num> numbers; - //Task<Num> call = stub.Sum(out numbers); - //foreach (var num in numberList) - //{ - // numbers.OnNext(num); - //} - //numbers.OnCompleted(); + //IObserver<Num> numbers; + //Task<Num> call = stub.Sum(out numbers); + //foreach (var num in numberList) + //{ + // numbers.OnNext(num); + //} + //numbers.OnCompleted(); - //Num sum = call.Result; + //Num sum = call.Result; - //DivReply result = stub.Div(new DivArgs.Builder { Dividend = sum.Num_, Divisor = numberList.Count }.Build()); - } - } + //DivReply result = stub.Div(new DivArgs.Builder { Dividend = sum.Num_, Divisor = numberList.Count }.Build()); + } + } }