Skip to content
Snippets Groups Projects
Commit a7608b08 authored by Jan Tattermusch's avatar Jan Tattermusch
Browse files

Prototype of gRPC C# library (Core and API examples)

parent ea6f6d99
No related branches found
No related tags found
No related merge requests found
Showing
with 2935 additions and 0 deletions

Microsoft Visual Studio Solution File, Format Version 11.00
# Visual Studio 2010
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "GrpcDemo", "GrpcDemo\GrpcDemo.csproj", "{61ECB8EE-0C96-4F8E-B187-8E4D227417C0}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "GrpcApi", "GrpcApi\GrpcApi.csproj", "{7DC1433E-3225-42C7-B7EA-546D56E27A4B}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "GrpcCore", "GrpcCore\GrpcCore.csproj", "{CCC4440E-49F7-4790-B0AF-FEABB0837AE7}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "GrpcCoreTests", "GrpcCoreTests\GrpcCoreTests.csproj", "{86EC5CB4-4EA2-40A2-8057-86542A0353BB}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|x86 = Debug|x86
Release|x86 = Release|x86
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{61ECB8EE-0C96-4F8E-B187-8E4D227417C0}.Debug|x86.ActiveCfg = Debug|x86
{61ECB8EE-0C96-4F8E-B187-8E4D227417C0}.Debug|x86.Build.0 = Debug|x86
{61ECB8EE-0C96-4F8E-B187-8E4D227417C0}.Release|x86.ActiveCfg = Release|x86
{61ECB8EE-0C96-4F8E-B187-8E4D227417C0}.Release|x86.Build.0 = Release|x86
{7DC1433E-3225-42C7-B7EA-546D56E27A4B}.Debug|x86.ActiveCfg = Debug|Any CPU
{7DC1433E-3225-42C7-B7EA-546D56E27A4B}.Debug|x86.Build.0 = Debug|Any CPU
{7DC1433E-3225-42C7-B7EA-546D56E27A4B}.Release|x86.ActiveCfg = Release|Any CPU
{7DC1433E-3225-42C7-B7EA-546D56E27A4B}.Release|x86.Build.0 = Release|Any CPU
{86EC5CB4-4EA2-40A2-8057-86542A0353BB}.Debug|x86.ActiveCfg = Debug|Any CPU
{86EC5CB4-4EA2-40A2-8057-86542A0353BB}.Debug|x86.Build.0 = Debug|Any CPU
{86EC5CB4-4EA2-40A2-8057-86542A0353BB}.Release|x86.ActiveCfg = Release|Any CPU
{86EC5CB4-4EA2-40A2-8057-86542A0353BB}.Release|x86.Build.0 = Release|Any CPU
{CCC4440E-49F7-4790-B0AF-FEABB0837AE7}.Debug|x86.ActiveCfg = Debug|Any CPU
{CCC4440E-49F7-4790-B0AF-FEABB0837AE7}.Debug|x86.Build.0 = Debug|Any CPU
{CCC4440E-49F7-4790-B0AF-FEABB0837AE7}.Release|x86.ActiveCfg = Release|Any CPU
{CCC4440E-49F7-4790-B0AF-FEABB0837AE7}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(MonoDevelopProperties) = preSolution
StartupItem = GrpcDemo\GrpcDemo.csproj
EndGlobalSection
EndGlobal
test-results
bin
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Reactive.Linq;
namespace math
{
// /// <summary>
// /// Dummy local implementation of math service.
// /// </summary>
// public class DummyMathServiceClient : IMathServiceClient
// {
// public DivReply Div(DivArgs args, CancellationToken token = default(CancellationToken))
// {
// // TODO: cancellation...
// return DivInternal(args);
// }
//
// public Task<DivReply> DivAsync(DivArgs args, CancellationToken token = default(CancellationToken))
// {
// return Task.Factory.StartNew(() => DivInternal(args), token);
// }
//
// public IObservable<Num> Fib(FibArgs args, CancellationToken token = default(CancellationToken))
// {
// if (args.Limit > 0)
// {
// // TODO: cancellation
// return FibInternal(args.Limit).ToObservable();
// }
//
// throw new NotImplementedException("Not implemented yet");
// }
//
// public Task<Num> Sum(IObservable<Num> inputs, CancellationToken token = default(CancellationToken))
// {
// // TODO: implement
// inputs = null;
// return Task.Factory.StartNew(() => Num.CreateBuilder().Build(), token);
// }
//
// public IObservable<DivReply> DivMany(IObservable<DivArgs> inputs, CancellationToken token = default(CancellationToken))
// {
// // TODO: implement
// inputs = null;
// return new List<DivReply> { }.ToObservable ();
// }
//
//
// DivReply DivInternal(DivArgs args)
// {
// long quotient = args.Dividend / args.Divisor;
// long remainder = args.Dividend % args.Divisor;
// return new DivReply.Builder{ Quotient = quotient, Remainder = remainder }.Build();
// }
//
// IEnumerable<Num> FibInternal(long n)
// {
// long a = 0;
// yield return new Num.Builder{Num_=a}.Build();
//
// long b = 1;
// for (long i = 0; i < n - 1; i++)
// {
// long temp = a;
// a = b;
// b = temp + b;
// yield return new Num.Builder{Num_=a}.Build();
// }
// }
// }
}
using System;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Reactive.Linq;
namespace math
{
public class Examples
{
public static void DivExample(IMathServiceClient stub)
{
DivReply result = stub.Div(new DivArgs.Builder { Dividend = 10, Divisor = 3 }.Build());
Console.WriteLine("Div Result: " + result);
}
public static void DivAsyncExample(IMathServiceClient stub)
{
Task<DivReply> call = stub.DivAsync(new DivArgs.Builder { Dividend = 4, Divisor = 5 }.Build());
DivReply result = call.Result;
Console.WriteLine(result);
}
public static void DivAsyncWithCancellationExample(IMathServiceClient stub)
{
Task<DivReply> call = stub.DivAsync(new DivArgs.Builder { Dividend = 4, Divisor = 5 }.Build());
DivReply result = call.Result;
Console.WriteLine(result);
}
public static void FibExample(IMathServiceClient stub)
{
var recorder = new RecordingObserver<Num>();
stub.Fib(new FibArgs.Builder { Limit = 5 }.Build(), recorder);
List<Num> numbers = recorder.ToList().Result;
Console.WriteLine("Fib Result: " + string.Join("|", recorder.ToList().Result));
}
public static void SumExample(IMathServiceClient stub)
{
List<Num> numbers = new List<Num>{new Num.Builder { Num_ = 1 }.Build(),
new Num.Builder { Num_ = 2 }.Build(),
new Num.Builder { Num_ = 3 }.Build()};
var res = stub.Sum();
foreach (var num in numbers) {
res.Inputs.OnNext(num);
}
res.Inputs.OnCompleted();
Console.WriteLine("Sum Result: " + res.Task.Result);
}
public static void DivManyExample(IMathServiceClient stub)
{
List<DivArgs> divArgsList = new List<DivArgs>{
new DivArgs.Builder { Dividend = 10, Divisor = 3 }.Build(),
new DivArgs.Builder { Dividend = 100, Divisor = 21 }.Build(),
new DivArgs.Builder { Dividend = 7, Divisor = 2 }.Build()
};
var recorder = new RecordingObserver<DivReply>();
var inputs = stub.DivMany(recorder);
foreach (var input in divArgsList)
{
inputs.OnNext(input);
}
inputs.OnCompleted();
Console.WriteLine("DivMany Result: " + string.Join("|", recorder.ToList().Result));
}
public static void DependendRequestsExample(IMathServiceClient stub)
{
var numberList = new List<Num>
{ new Num.Builder{ Num_ = 1 }.Build(),
new Num.Builder{ Num_ = 2 }.Build(), new Num.Builder{ Num_ = 3 }.Build()
};
numberList.ToObservable();
//IObserver<Num> numbers;
//Task<Num> call = stub.Sum(out numbers);
//foreach (var num in numberList)
//{
// numbers.OnNext(num);
//}
//numbers.OnCompleted();
//Num sum = call.Result;
//DivReply result = stub.Div(new DivArgs.Builder { Dividend = sum.Num_, Divisor = numberList.Count }.Build());
}
}
}
<?xml version="1.0" encoding="utf-8"?>
<Project DefaultTargets="Build" ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
<ProductVersion>10.0.0</ProductVersion>
<SchemaVersion>2.0</SchemaVersion>
<ProjectGuid>{7DC1433E-3225-42C7-B7EA-546D56E27A4B}</ProjectGuid>
<OutputType>Library</OutputType>
<RootNamespace>GrpcApi</RootNamespace>
<AssemblyName>GrpcApi</AssemblyName>
<TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<DebugSymbols>true</DebugSymbols>
<DebugType>full</DebugType>
<Optimize>false</Optimize>
<OutputPath>bin\Debug</OutputPath>
<DefineConstants>DEBUG;</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
<ConsolePause>false</ConsolePause>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
<DebugType>full</DebugType>
<Optimize>true</Optimize>
<OutputPath>bin\Release</OutputPath>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
<ConsolePause>false</ConsolePause>
</PropertyGroup>
<ItemGroup>
<Reference Include="System" />
<Reference Include="System.Reactive.Linq, Version=2.2.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35">
<Private>False</Private>
</Reference>
<Reference Include="System.Data.Linq" />
<Reference Include="System.Reactive.Interfaces, Version=2.2.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35">
<Private>False</Private>
</Reference>
<Reference Include="System.Reactive.Core, Version=2.2.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35">
<Private>False</Private>
</Reference>
<Reference Include="Google.ProtocolBuffers">
<HintPath>..\lib\Google.ProtocolBuffers.dll</HintPath>
</Reference>
</ItemGroup>
<ItemGroup>
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="Examples.cs" />
<Compile Include="IMathServiceClient.cs" />
<Compile Include="Math.cs" />
<Compile Include="DummyMathServiceClient.cs" />
<Compile Include="MathServiceClientStub.cs" />
<Compile Include="RecordingObserver.cs" />
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup>
<ProjectReference Include="..\GrpcCore\GrpcCore.csproj">
<Project>{CCC4440E-49F7-4790-B0AF-FEABB0837AE7}</Project>
<Name>GrpcCore</Name>
</ProjectReference>
</ItemGroup>
</Project>
\ No newline at end of file
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Reactive.Linq;
using Google.GRPC.Core;
namespace math
{
/// <summary>
/// Hand-written stub for MathService defined in math.proto.
/// This code will be generated by gRPC codegen in the future.
/// </summary>
public interface IMathServiceClient
{
DivReply Div(DivArgs args, CancellationToken token = default(CancellationToken));
Task<DivReply> DivAsync(DivArgs args, CancellationToken token = default(CancellationToken));
Task Fib(FibArgs args, IObserver<Num> outputs, CancellationToken token = default(CancellationToken));
ClientStreamingAsyncResult<Num, Num> Sum(CancellationToken token = default(CancellationToken));
IObserver<DivArgs> DivMany(IObserver<DivReply> outputs, CancellationToken token = default(CancellationToken));
}
}
\ No newline at end of file
This diff is collapsed.
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Reactive.Linq;
using Google.GRPC.Core;
namespace math
{
/// <summary>
/// Implementation of math service stub (this is handwritten version of code
/// that will normally be generated).
/// </summary>
public class MathServiceClientStub : IMathServiceClient
{
readonly Channel channel;
readonly TimeSpan methodTimeout;
public MathServiceClientStub(Channel channel, TimeSpan methodTimeout)
{
this.channel = channel;
this.methodTimeout = methodTimeout;
}
public DivReply Div(DivArgs args, CancellationToken token = default(CancellationToken))
{
var call = new Google.GRPC.Core.Call<DivArgs, DivReply>("/math.Math/Div", Serialize_DivArgs, Deserialize_DivReply, methodTimeout, channel);
return Calls.BlockingUnaryCall(call, args, token);
}
public Task<DivReply> DivAsync(DivArgs args, CancellationToken token = default(CancellationToken))
{
var call = new Google.GRPC.Core.Call<DivArgs, DivReply>("/math.Math/Div", Serialize_DivArgs, Deserialize_DivReply, methodTimeout, channel);
return Calls.AsyncUnaryCall(call, args, token);
}
public Task Fib(FibArgs args, IObserver<Num> outputs, CancellationToken token = default(CancellationToken))
{
var call = new Google.GRPC.Core.Call<FibArgs, Num>("/math.Math/Fib", Serialize_FibArgs, Deserialize_Num, methodTimeout, channel);
return Calls.AsyncServerStreamingCall(call, args, outputs, token);
}
public ClientStreamingAsyncResult<Num, Num> Sum(CancellationToken token = default(CancellationToken))
{
var call = new Google.GRPC.Core.Call<Num, Num>("/math.Math/Sum", Serialize_Num, Deserialize_Num, methodTimeout, channel);
return Calls.AsyncClientStreamingCall(call, token);
}
public IObserver<DivArgs> DivMany(IObserver<DivReply> outputs, CancellationToken token = default(CancellationToken))
{
var call = new Google.GRPC.Core.Call<DivArgs, DivReply>("/math.Math/DivMany", Serialize_DivArgs, Deserialize_DivReply, methodTimeout, channel);
return Calls.DuplexStreamingCall(call, outputs, token);
}
private static byte[] Serialize_DivArgs(DivArgs arg) {
return arg.ToByteArray();
}
private static byte[] Serialize_FibArgs(FibArgs arg) {
return arg.ToByteArray();
}
private static byte[] Serialize_Num(Num arg) {
return arg.ToByteArray();
}
private static DivReply Deserialize_DivReply(byte[] payload) {
return DivReply.CreateBuilder().MergeFrom(payload).Build();
}
private static Num Deserialize_Num(byte[] payload) {
return Num.CreateBuilder().MergeFrom(payload).Build();
}
}
}
\ No newline at end of file
//using System;
//namespace Google.GRPC.Examples.Math
//{
// // Messages in this file are placeholders for actual protobuf message classes
// // that will be generated from math.proto file.
//
// public class DivArgs
// {
// public long Dividend{ get; set; }
// public long Divisor { get; set; }
// }
//
// public class DivReply
// {
// public long Quotient { get; set; }
// public long Remainder { get; set; }
// }
//
// public class FibArgs
// {
// public long Limit { get; set; }
// }
//
// public class Number
// {
// public long Num { get; set; }
// }
//
// public class FibReply
// {
// public long Count { get; set; }
// }
//}
using System.Reflection;
using System.Runtime.CompilerServices;
// Information about this assembly is defined by the following attributes.
// Change them to the values specific to your project.
[assembly: AssemblyTitle ("GrpcApi")]
[assembly: AssemblyDescription ("")]
[assembly: AssemblyConfiguration ("")]
[assembly: AssemblyCompany ("")]
[assembly: AssemblyProduct ("")]
[assembly: AssemblyCopyright ("jtattermusch")]
[assembly: AssemblyTrademark ("")]
[assembly: AssemblyCulture ("")]
// The assembly version has the format "{Major}.{Minor}.{Build}.{Revision}".
// The form "{Major}.{Minor}.*" will automatically update the build and revision,
// and "{Major}.{Minor}.{Build}.*" will update just the revision.
[assembly: AssemblyVersion ("1.0.*")]
// The following attributes are used to specify the signing key for the assembly,
// if desired. See the Mono documentation for more information about signing.
//[assembly: AssemblyDelaySign(false)]
//[assembly: AssemblyKeyFile("")]
using System;
using System.Threading.Tasks;
using System.Collections.Generic;
namespace math
{
public class RecordingObserver<T> : IObserver<T>
{
TaskCompletionSource<List<T>> tcs = new TaskCompletionSource<List<T>>();
List<T> data = new List<T>();
public void OnCompleted()
{
tcs.SetResult(data);
}
public void OnError(Exception error)
{
tcs.SetException(error);
}
public void OnNext(T value)
{
data.Add(value);
}
public Task<List<T>> ToList() {
return tcs.Task;
}
}
}
syntax = "proto2";
package math;
message DivArgs {
optional int64 dividend = 1;
optional int64 divisor = 2;
}
message DivReply {
optional int64 quotient = 1;
optional int64 remainder = 2;
}
message FibArgs {
optional int64 limit = 1;
}
message Num {
optional int64 num = 1;
}
message FibReply {
optional int64 count = 1;
}
service Math {
// Div divides args.dividend by args.divisor and returns the quotient and
// remainder.
rpc Div (DivArgs) returns (DivReply) {
}
// DivMany accepts an arbitrary number of division args from the client stream
// and sends back the results in the reply stream. The stream continues until
// the client closes its end; the server does the same after sending all the
// replies. The stream ends immediately if either end aborts.
rpc DivMany (stream DivArgs) returns (stream DivReply) {
}
// Fib generates numbers in the Fibonacci sequence. If args.limit > 0, Fib
// generates up to limit numbers; otherwise it continues until the call is
// canceled. Unlike Fib above, Fib has no final FibReply.
rpc Fib (FibArgs) returns (stream Num) {
}
// Sum sums a stream of numbers, returning the final result once the stream
// is closed.
rpc Sum (stream Num) returns (Num) {
}
}
bin
using System;
using Google.GRPC.Core.Internal;
namespace Google.GRPC.Core
{
public class Call<TRequest, TResponse>
{
readonly string methodName;
readonly Func<TRequest, byte[]> requestSerializer;
readonly Func<byte[], TResponse> responseDeserializer;
readonly TimeSpan timeout;
readonly Channel channel;
// TODO: channel param should be removed in the future.
public Call(string methodName,
Func<TRequest, byte[]> requestSerializer,
Func<byte[], TResponse> responseDeserializer,
TimeSpan timeout,
Channel channel) {
this.methodName = methodName;
this.requestSerializer = requestSerializer;
this.responseDeserializer = responseDeserializer;
this.timeout = timeout;
this.channel = channel;
}
public Channel Channel
{
get
{
return this.channel;
}
}
public TimeSpan Timeout
{
get
{
return this.timeout;
}
}
public string MethodName
{
get
{
return this.methodName;
}
}
public Func<TRequest, byte[]> RequestSerializer
{
get
{
return this.requestSerializer;
}
}
public Func<byte[], TResponse> ResponseDeserializer
{
get
{
return this.responseDeserializer;
}
}
}
}
using System;
using System.Threading;
using System.Threading.Tasks;
using Google.GRPC.Core.Internal;
namespace Google.GRPC.Core
{
// NOTE: this class is work-in-progress
/// <summary>
/// Helper methods for generated stubs to make RPC calls.
/// </summary>
public static class Calls
{
public static TResponse BlockingUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, CancellationToken token)
{
//TODO: implement this in real synchronous style once new GRPC C core API is available.
return AsyncUnaryCall(call, req, token).Result;
}
public static async Task<TResponse> AsyncUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, CancellationToken token)
{
var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestSerializer, call.ResponseDeserializer);
asyncCall.Initialize(call.Channel, call.MethodName);
asyncCall.Start(false, GetCompletionQueue());
await asyncCall.WriteAsync(req);
await asyncCall.WritesCompletedAsync();
TResponse response = await asyncCall.ReadAsync();
Status status = await asyncCall.Finished;
if (status.StatusCode != StatusCode.GRPC_STATUS_OK)
{
throw new RpcException(status);
}
return response;
}
public static async Task AsyncServerStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, IObserver<TResponse> outputs, CancellationToken token)
{
var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestSerializer, call.ResponseDeserializer);
asyncCall.Initialize(call.Channel, call.MethodName);
asyncCall.Start(false, GetCompletionQueue());
asyncCall.StartReadingToStream(outputs);
await asyncCall.WriteAsync(req);
await asyncCall.WritesCompletedAsync();
}
public static ClientStreamingAsyncResult<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, CancellationToken token)
{
var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestSerializer, call.ResponseDeserializer);
asyncCall.Initialize(call.Channel, call.MethodName);
asyncCall.Start(false, GetCompletionQueue());
var task = asyncCall.ReadAsync();
var inputs = new StreamingInputObserver<TRequest, TResponse>(asyncCall);
return new ClientStreamingAsyncResult<TRequest, TResponse>(task, inputs);
}
public static TResponse BlockingClientStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, IObservable<TRequest> inputs, CancellationToken token)
{
throw new NotImplementedException();
}
public static IObserver<TRequest> DuplexStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, IObserver<TResponse> outputs, CancellationToken token)
{
var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestSerializer, call.ResponseDeserializer);
asyncCall.Initialize(call.Channel, call.MethodName);
asyncCall.Start(false, GetCompletionQueue());
asyncCall.StartReadingToStream(outputs);
var inputs = new StreamingInputObserver<TRequest, TResponse>(asyncCall);
return inputs;
}
private static CompletionQueueSafeHandle GetCompletionQueue() {
return GrpcEnvironment.ThreadPool.CompletionQueue;
}
}
}
using System;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using Google.GRPC.Core.Internal;
namespace Google.GRPC.Core
{
public class Channel : IDisposable
{
/// <summary>
/// Make sure GPRC environment is initialized before any channels get used.
/// </summary>
static Channel() {
GrpcEnvironment.EnsureInitialized();
}
readonly ChannelSafeHandle handle;
readonly String target;
// TODO: add way how to create grpc_secure_channel....
// TODO: add support for channel args...
public Channel(string target)
{
this.handle = ChannelSafeHandle.Create(target, IntPtr.Zero);
this.target = target;
}
internal ChannelSafeHandle Handle
{
get
{
return this.handle;
}
}
public string Target
{
get
{
return this.target;
}
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (handle != null && !handle.IsInvalid)
{
handle.Dispose();
}
}
}
}
\ No newline at end of file
using System;
using System.Threading.Tasks;
namespace Google.GRPC.Core
{
/// <summary>
/// Return type for client streaming async method.
/// </summary>
public struct ClientStreamingAsyncResult<TRequest, TResponse>
{
readonly Task<TResponse> task;
readonly IObserver<TRequest> inputs;
public ClientStreamingAsyncResult(Task<TResponse> task, IObserver<TRequest> inputs)
{
this.task = task;
this.inputs = inputs;
}
public Task<TResponse> Task
{
get
{
return this.task;
}
}
public IObserver<TRequest> Inputs
{
get
{
return this.inputs;
}
}
}
}
<?xml version="1.0" encoding="utf-8"?>
<Project DefaultTargets="Build" ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
<ProductVersion>10.0.0</ProductVersion>
<SchemaVersion>2.0</SchemaVersion>
<ProjectGuid>{CCC4440E-49F7-4790-B0AF-FEABB0837AE7}</ProjectGuid>
<OutputType>Library</OutputType>
<RootNamespace>GrpcCore</RootNamespace>
<AssemblyName>GrpcCore</AssemblyName>
<TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<DebugSymbols>true</DebugSymbols>
<DebugType>full</DebugType>
<Optimize>false</Optimize>
<OutputPath>bin\Debug</OutputPath>
<DefineConstants>DEBUG;</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
<ConsolePause>false</ConsolePause>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
<DebugType>full</DebugType>
<Optimize>true</Optimize>
<OutputPath>bin\Release</OutputPath>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
<ConsolePause>false</ConsolePause>
</PropertyGroup>
<ItemGroup>
<Reference Include="System" />
</ItemGroup>
<ItemGroup>
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="RpcException.cs" />
<Compile Include="Calls.cs" />
<Compile Include="Call.cs" />
<Compile Include="ClientStreamingAsyncResult.cs" />
<Compile Include="GrpcEnvironment.cs" />
<Compile Include="Status.cs" />
<Compile Include="StatusCode.cs" />
<Compile Include="Server.cs" />
<Compile Include="Channel.cs" />
<Compile Include="Internal\CallSafeHandle.cs" />
<Compile Include="Internal\ChannelSafeHandle.cs" />
<Compile Include="Internal\CompletionQueueSafeHandle.cs" />
<Compile Include="Internal\Enums.cs" />
<Compile Include="Internal\Event.cs" />
<Compile Include="Internal\SafeHandleZeroIsInvalid.cs" />
<Compile Include="Internal\Timespec.cs" />
<Compile Include="Internal\GrpcThreadPool.cs" />
<Compile Include="Internal\AsyncCall.cs" />
<Compile Include="Internal\ServerSafeHandle.cs" />
<Compile Include="Internal\StreamingInputObserver.cs" />
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup>
<Folder Include="Internal\" />
</ItemGroup>
</Project>
\ No newline at end of file
using System;
using Google.GRPC.Core.Internal;
using System.Runtime.InteropServices;
namespace Google.GRPC.Core
{
/// <summary>
/// Encapsulates initialization and shutdown of GRPC C core library.
/// You should not need to initialize it manually, as static constructors
/// should load the library when needed.
/// </summary>
public static class GrpcEnvironment
{
const int THREAD_POOL_SIZE = 1;
[DllImport("libgrpc.so")]
static extern void grpc_init();
[DllImport("libgrpc.so")]
static extern void grpc_shutdown();
static object staticLock = new object();
static bool initCalled = false;
static bool shutdownCalled = false;
static GrpcThreadPool threadPool = new GrpcThreadPool(THREAD_POOL_SIZE);
/// <summary>
/// Makes sure GRPC environment is initialized.
/// </summary>
public static void EnsureInitialized() {
lock(staticLock)
{
if (!initCalled)
{
initCalled = true;
GrpcInit();
}
}
}
/// <summary>
/// Shuts down the GRPC environment if it was initialized before.
/// Repeated invocations have no effect.
/// </summary>
public static void Shutdown()
{
lock(staticLock)
{
if (initCalled && !shutdownCalled)
{
shutdownCalled = true;
GrpcShutdown();
}
}
}
/// <summary>
/// Initializes GRPC C Core library.
/// </summary>
private static void GrpcInit()
{
grpc_init();
threadPool.Start();
// TODO: use proper logging here
Console.WriteLine("GRPC initialized.");
}
/// <summary>
/// Shutdown GRPC C Core library.
/// </summary>
private static void GrpcShutdown()
{
threadPool.Stop();
grpc_shutdown();
// TODO: use proper logging here
Console.WriteLine("GRPC shutdown.");
}
internal static GrpcThreadPool ThreadPool
{
get
{
return threadPool;
}
}
}
}
using System;
using System.Runtime.InteropServices;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using System.Runtime.CompilerServices;
using Google.GRPC.Core.Internal;
namespace Google.GRPC.Core.Internal
{
/// <summary>
/// Listener for call events that can be delivered from a completion queue.
/// </summary>
internal interface ICallEventListener {
void OnClientMetadata();
void OnRead(byte[] payload);
void OnWriteAccepted(GRPCOpError error);
void OnFinishAccepted(GRPCOpError error);
// ignore the status on server
void OnFinished(Status status);
}
/// <summary>
/// Handle native call lifecycle and provides convenience methods.
/// </summary>
internal class AsyncCall<TWrite, TRead>: ICallEventListener, IDisposable
{
readonly Func<TWrite, byte[]> serializer;
readonly Func<byte[], TRead> deserializer;
// TODO: make sure the delegate doesn't get garbage collected while
// native callbacks are in the completion queue.
readonly EventCallbackDelegate callbackHandler;
object myLock = new object();
bool disposed;
CallSafeHandle call;
bool started;
bool errorOccured;
bool cancelRequested;
bool halfcloseRequested;
bool halfclosed;
bool doneWithReading;
Nullable<Status> finishedStatus;
TaskCompletionSource<object> writeTcs;
TaskCompletionSource<TRead> readTcs;
TaskCompletionSource<object> halfcloseTcs = new TaskCompletionSource<object>();
TaskCompletionSource<Status> finishedTcs = new TaskCompletionSource<Status>();
IObserver<TRead> readObserver;
public AsyncCall(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer)
{
this.serializer = serializer;
this.deserializer = deserializer;
this.callbackHandler = HandleEvent;
}
public Task WriteAsync(TWrite msg)
{
return StartWrite(msg, false).Task;
}
public Task WritesCompletedAsync()
{
WritesDone();
return halfcloseTcs.Task;
}
public Task WriteStatusAsync(Status status)
{
WriteStatus(status);
return halfcloseTcs.Task;
}
public Task<TRead> ReadAsync()
{
return StartRead().Task;
}
public Task<Status> Finished
{
get
{
return finishedTcs.Task;
}
}
/// <summary>
/// Initiates reading to given observer.
/// </summary>
public void StartReadingToStream(IObserver<TRead> readObserver) {
lock (myLock)
{
CheckStarted();
if (this.readObserver != null)
{
throw new InvalidOperationException("Already registered an observer.");
}
this.readObserver = readObserver;
StartRead();
}
}
public void Initialize(Channel channel, String methodName) {
lock (myLock)
{
this.call = CallSafeHandle.Create(channel.Handle, methodName, channel.Target, Timespec.InfFuture);
}
}
public void InitializeServer(CallSafeHandle call)
{
lock(myLock)
{
this.call = call;
}
}
// Client only
public void Start(bool buffered, CompletionQueueSafeHandle cq)
{
lock (myLock)
{
if (started)
{
throw new InvalidOperationException("Already started.");
}
call.Invoke(cq, buffered, callbackHandler, callbackHandler);
started = true;
}
}
// Server only
public void Accept(CompletionQueueSafeHandle cq)
{
lock (myLock)
{
if (started)
{
throw new InvalidOperationException("Already started.");
}
call.ServerAccept(cq, callbackHandler);
call.ServerEndInitialMetadata(0);
started = true;
}
}
public TaskCompletionSource<object> StartWrite(TWrite msg, bool buffered)
{
lock (myLock)
{
CheckStarted();
CheckNotFinished();
CheckNoError();
CheckCancelNotRequested();
if (halfcloseRequested || halfclosed)
{
throw new InvalidOperationException("Already halfclosed.");
}
if (writeTcs != null)
{
throw new InvalidOperationException("Only one write can be pending at a time");
}
// TODO: wrap serialization...
byte[] payload = serializer(msg);
call.StartWrite(payload, buffered, callbackHandler);
writeTcs = new TaskCompletionSource<object>();
return writeTcs;
}
}
// client only
public void WritesDone()
{
lock (myLock)
{
CheckStarted();
CheckNotFinished();
CheckNoError();
CheckCancelNotRequested();
if (halfcloseRequested || halfclosed)
{
throw new InvalidOperationException("Already halfclosed.");
}
call.WritesDone(callbackHandler);
halfcloseRequested = true;
}
}
// server only
public void WriteStatus(Status status)
{
lock (myLock)
{
CheckStarted();
CheckNotFinished();
CheckNoError();
CheckCancelNotRequested();
if (halfcloseRequested || halfclosed)
{
throw new InvalidOperationException("Already halfclosed.");
}
call.StartWriteStatus(status, callbackHandler);
halfcloseRequested = true;
}
}
public TaskCompletionSource<TRead> StartRead()
{
lock (myLock)
{
CheckStarted();
CheckNotFinished();
CheckNoError();
// TODO: add check for not cancelled?
if (doneWithReading)
{
throw new InvalidOperationException("Already read the last message.");
}
if (readTcs != null)
{
throw new InvalidOperationException("Only one read can be pending at a time");
}
call.StartRead(callbackHandler);
readTcs = new TaskCompletionSource<TRead>();
return readTcs;
}
}
public void Cancel()
{
lock (myLock)
{
CheckStarted();
CheckNotFinished();
cancelRequested = true;
}
// grpc_call_cancel is threadsafe
call.Cancel();
}
public void CancelWithStatus(Status status)
{
lock (myLock)
{
CheckStarted();
CheckNotFinished();
cancelRequested = true;
}
// grpc_call_cancel_with_status is threadsafe
call.CancelWithStatus(status);
}
public void OnClientMetadata()
{
// TODO: implement....
}
public void OnRead(byte[] payload)
{
TaskCompletionSource<TRead> oldTcs = null;
IObserver<TRead> observer = null;
lock (myLock)
{
oldTcs = readTcs;
readTcs = null;
if (payload == null)
{
doneWithReading = true;
}
observer = readObserver;
}
// TODO: wrap deserialization...
TRead msg = payload != null ? deserializer(payload) : default(TRead);
oldTcs.SetResult(msg);
// TODO: make sure we deliver reads in the right order.
if (observer != null)
{
if (payload != null)
{
// TODO: wrap to handle exceptions
observer.OnNext(msg);
// start a new read
StartRead();
}
else
{
// TODO: wrap to handle exceptions;
observer.OnCompleted();
}
}
}
public void OnWriteAccepted(GRPCOpError error)
{
TaskCompletionSource<object> oldTcs = null;
lock (myLock)
{
UpdateErrorOccured(error);
oldTcs = writeTcs;
writeTcs = null;
}
if (errorOccured)
{
// TODO: use the right type of exception...
oldTcs.SetException(new Exception("Write failed"));
}
else
{
// TODO: where does the continuation run?
oldTcs.SetResult(null);
}
}
public void OnFinishAccepted(GRPCOpError error)
{
lock (myLock)
{
UpdateErrorOccured(error);
halfclosed = true;
}
if (errorOccured)
{
halfcloseTcs.SetException(new Exception("Halfclose failed"));
}
else
{
halfcloseTcs.SetResult(null);
}
}
public void OnFinished(Status status)
{
lock (myLock)
{
finishedStatus = status;
DisposeResourcesIfNeeded();
}
finishedTcs.SetResult(status);
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (!disposed)
{
if (disposing)
{
if (call != null)
{
call.Dispose();
}
}
disposed = true;
}
}
private void UpdateErrorOccured(GRPCOpError error)
{
if (error == GRPCOpError.GRPC_OP_ERROR)
{
errorOccured = true;
}
}
private void CheckStarted()
{
if (!started)
{
throw new InvalidOperationException("Call not started");
}
}
private void CheckNoError()
{
if (errorOccured)
{
throw new InvalidOperationException("Error occured when processing call.");
}
}
private void CheckNotFinished()
{
if (finishedStatus.HasValue)
{
throw new InvalidOperationException("Already finished.");
}
}
private void CheckCancelNotRequested()
{
if (cancelRequested)
{
throw new InvalidOperationException("Cancel has been requested.");
}
}
private void DisposeResourcesIfNeeded()
{
if (call != null && started && finishedStatus.HasValue)
{
// TODO: should we also wait for all the pending events to finish?
call.Dispose();
}
}
private void HandleEvent(IntPtr eventPtr) {
try {
var ev = new EventSafeHandleNotOwned(eventPtr);
switch (ev.GetCompletionType())
{
case GRPCCompletionType.GRPC_CLIENT_METADATA_READ:
OnClientMetadata();
break;
case GRPCCompletionType.GRPC_READ:
byte[] payload = ev.GetReadData();
OnRead(payload);
break;
case GRPCCompletionType.GRPC_WRITE_ACCEPTED:
OnWriteAccepted(ev.GetWriteAccepted());
break;
case GRPCCompletionType.GRPC_FINISH_ACCEPTED:
OnFinishAccepted(ev.GetFinishAccepted());
break;
case GRPCCompletionType.GRPC_FINISHED:
OnFinished(ev.GetFinished());
break;
default:
throw new ArgumentException("Unexpected completion type");
}
} catch(Exception e) {
Console.WriteLine("Caught exception in a native handler: " + e);
}
}
}
}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment