Skip to content
Snippets Groups Projects
Commit 04eb89ca authored by Jan Tattermusch's avatar Jan Tattermusch
Browse files

no need to call GrpcEnvironment.Initialize() explicitly

parent aa91ad2c
No related branches found
No related tags found
No related merge requests found
Showing
with 202 additions and 162 deletions
......@@ -73,12 +73,6 @@ namespace Grpc.Core.Tests
Server server;
Channel channel;
[TestFixtureSetUp]
public void InitClass()
{
GrpcEnvironment.Initialize();
}
[SetUp]
public void Init()
{
......
......@@ -43,16 +43,17 @@ namespace Grpc.Core.Tests
[Test]
public void InitializeAndShutdownGrpcEnvironment()
{
GrpcEnvironment.Initialize();
Assert.IsNotNull(GrpcEnvironment.ThreadPool.CompletionQueue);
var env = GrpcEnvironment.GetInstance();
Assert.IsNotNull(env.CompletionQueue);
GrpcEnvironment.Shutdown();
}
[Test]
public void SubsequentInvocations()
{
GrpcEnvironment.Initialize();
GrpcEnvironment.Initialize();
var env1 = GrpcEnvironment.GetInstance();
var env2 = GrpcEnvironment.GetInstance();
Assert.IsTrue(object.ReferenceEquals(env1, env2));
GrpcEnvironment.Shutdown();
GrpcEnvironment.Shutdown();
}
......@@ -60,15 +61,13 @@ namespace Grpc.Core.Tests
[Test]
public void InitializeAfterShutdown()
{
GrpcEnvironment.Initialize();
var tp1 = GrpcEnvironment.ThreadPool;
var env1 = GrpcEnvironment.GetInstance();
GrpcEnvironment.Shutdown();
GrpcEnvironment.Initialize();
var tp2 = GrpcEnvironment.ThreadPool;
var env2 = GrpcEnvironment.GetInstance();
GrpcEnvironment.Shutdown();
Assert.IsFalse(object.ReferenceEquals(tp1, tp2));
Assert.IsFalse(object.ReferenceEquals(env1, env2));
}
}
}
......@@ -53,18 +53,6 @@ namespace Grpc.Core.Tests
[DllImport("grpc_csharp_ext.dll")]
static extern IntPtr grpcsharp_test_nop(IntPtr ptr);
[TestFixtureSetUp]
public void Init()
{
GrpcEnvironment.Initialize();
}
[TestFixtureTearDown]
public void Cleanup()
{
GrpcEnvironment.Shutdown();
}
/// <summary>
/// (~1.26us .NET Windows)
/// </summary>
......
......@@ -44,13 +44,10 @@ namespace Grpc.Core.Tests
[Test]
public void StartAndShutdownServer()
{
GrpcEnvironment.Initialize();
Server server = new Server();
server.AddListeningPort("localhost", Server.PickUnusedPort);
server.Start();
server.ShutdownAsync().Wait();
GrpcEnvironment.Shutdown();
}
}
......
......@@ -58,7 +58,7 @@ namespace Grpc.Core
where TResponse : class
{
var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.Name);
asyncCall.Initialize(call.Channel, call.Channel.CompletionQueue, call.Name);
var asyncResult = asyncCall.UnaryCallAsync(req, call.Headers);
RegisterCancellationCallback(asyncCall, token);
return await asyncResult;
......@@ -69,7 +69,7 @@ namespace Grpc.Core
where TResponse : class
{
var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.Name);
asyncCall.Initialize(call.Channel, call.Channel.CompletionQueue, call.Name);
asyncCall.StartServerStreamingCall(req, call.Headers);
RegisterCancellationCallback(asyncCall, token);
var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall);
......@@ -81,7 +81,7 @@ namespace Grpc.Core
where TResponse : class
{
var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.Name);
asyncCall.Initialize(call.Channel, call.Channel.CompletionQueue, call.Name);
var resultTask = asyncCall.ClientStreamingCallAsync(call.Headers);
RegisterCancellationCallback(asyncCall, token);
var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall);
......@@ -93,7 +93,7 @@ namespace Grpc.Core
where TResponse : class
{
var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.Name);
asyncCall.Initialize(call.Channel, call.Channel.CompletionQueue, call.Name);
asyncCall.StartDuplexStreamingCall(call.Headers);
RegisterCancellationCallback(asyncCall, token);
var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall);
......@@ -108,13 +108,5 @@ namespace Grpc.Core
token.Register(() => asyncCall.Cancel());
}
}
/// <summary>
/// Gets shared completion queue used for async calls.
/// </summary>
private static CompletionQueueSafeHandle GetCompletionQueue()
{
return GrpcEnvironment.ThreadPool.CompletionQueue;
}
}
}
......@@ -42,8 +42,10 @@ namespace Grpc.Core
/// </summary>
public class Channel : IDisposable
{
readonly GrpcEnvironment environment;
readonly ChannelSafeHandle handle;
readonly string target;
bool disposed;
/// <summary>
/// Creates a channel that connects to a specific host.
......@@ -54,6 +56,7 @@ namespace Grpc.Core
/// <param name="options">Channel options.</param>
public Channel(string host, Credentials credentials = null, IEnumerable<ChannelOption> options = null)
{
this.environment = GrpcEnvironment.GetInstance();
using (ChannelArgsSafeHandle nativeChannelArgs = ChannelOptions.CreateChannelArgs(options))
{
if (credentials != null)
......@@ -105,10 +108,35 @@ namespace Grpc.Core
}
}
internal CompletionQueueSafeHandle CompletionQueue
{
get
{
return this.environment.CompletionQueue;
}
}
internal CompletionRegistry CompletionRegistry
{
get
{
return this.environment.CompletionRegistry;
}
}
internal GrpcEnvironment Environment
{
get
{
return this.environment;
}
}
protected virtual void Dispose(bool disposing)
{
if (handle != null && !handle.IsInvalid)
if (disposing && handle != null && !disposed)
{
disposed = true;
handle.Dispose();
}
}
......
......@@ -33,7 +33,9 @@
using System;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
namespace Grpc.Core
{
......@@ -51,20 +53,18 @@ namespace Grpc.Core
static extern void grpcsharp_shutdown();
static object staticLock = new object();
static volatile GrpcEnvironment instance;
static GrpcEnvironment instance;
readonly GrpcThreadPool threadPool;
readonly CompletionRegistry completionRegistry;
readonly DebugStats debugStats = new DebugStats();
bool isClosed;
/// <summary>
/// Makes sure GRPC environment is initialized. Subsequent invocations don't have any
/// effect unless you call Shutdown first.
/// Although normal use cases assume you will call this just once in your application's
/// lifetime (and call Shutdown once you're done), for the sake of easier testing it's
/// allowed to initialize the environment again after it has been successfully shutdown.
/// Returns an instance of initialized gRPC environment.
/// Subsequent invocations return the same instance unless Shutdown has been called first.
/// </summary>
public static void Initialize()
internal static GrpcEnvironment GetInstance()
{
lock (staticLock)
{
......@@ -72,12 +72,13 @@ namespace Grpc.Core
{
instance = new GrpcEnvironment();
}
return instance;
}
}
/// <summary>
/// Shuts down the GRPC environment if it was initialized before.
/// Repeated invocations have no effect.
/// Shuts down the gRPC environment if it was initialized before.
/// Blocks until the environment has been fully shutdown.
/// </summary>
public static void Shutdown()
{
......@@ -87,50 +88,55 @@ namespace Grpc.Core
{
instance.Close();
instance = null;
CheckDebugStats();
}
}
}
internal static GrpcThreadPool ThreadPool
/// <summary>
/// Creates gRPC environment.
/// </summary>
private GrpcEnvironment()
{
GrpcLog.RedirectNativeLogs(Console.Error);
grpcsharp_init();
completionRegistry = new CompletionRegistry(this);
threadPool = new GrpcThreadPool(this, THREAD_POOL_SIZE);
threadPool.Start();
// TODO: use proper logging here
Console.WriteLine("GRPC initialized.");
}
/// <summary>
/// Gets the completion registry used by this gRPC environment.
/// </summary>
internal CompletionRegistry CompletionRegistry
{
get
{
var inst = instance;
if (inst == null)
{
throw new InvalidOperationException("GRPC environment not initialized");
}
return inst.threadPool;
return this.completionRegistry;
}
}
internal static CompletionRegistry CompletionRegistry
/// <summary>
/// Gets the completion queue used by this gRPC environment.
/// </summary>
internal CompletionQueueSafeHandle CompletionQueue
{
get
{
var inst = instance;
if (inst == null)
{
throw new InvalidOperationException("GRPC environment not initialized");
}
return inst.completionRegistry;
return this.threadPool.CompletionQueue;
}
}
/// <summary>
/// Creates gRPC environment.
/// Gets the completion queue used by this gRPC environment.
/// </summary>
private GrpcEnvironment()
internal DebugStats DebugStats
{
GrpcLog.RedirectNativeLogs(Console.Error);
grpcsharp_init();
completionRegistry = new CompletionRegistry();
threadPool = new GrpcThreadPool(THREAD_POOL_SIZE);
threadPool.Start();
// TODO: use proper logging here
Console.WriteLine("GRPC initialized.");
get
{
return this.debugStats;
}
}
/// <summary>
......@@ -146,32 +152,28 @@ namespace Grpc.Core
grpcsharp_shutdown();
isClosed = true;
debugStats.CheckOK();
// TODO: use proper logging here
Console.WriteLine("GRPC shutdown.");
}
private static void CheckDebugStats()
/// <summary>
/// Shuts down this environment asynchronously.
/// </summary>
private Task CloseAsync()
{
var remainingClientCalls = DebugStats.ActiveClientCalls.Count;
if (remainingClientCalls != 0)
{
DebugWarning(string.Format("Detected {0} client calls that weren't disposed properly.", remainingClientCalls));
}
var remainingServerCalls = DebugStats.ActiveServerCalls.Count;
if (remainingServerCalls != 0)
{
DebugWarning(string.Format("Detected {0} server calls that weren't disposed properly.", remainingServerCalls));
}
var pendingBatchCompletions = DebugStats.PendingBatchCompletions.Count;
if (pendingBatchCompletions != 0)
return Task.Run(() =>
{
DebugWarning(string.Format("Detected {0} pending batch completions.", pendingBatchCompletions));
}
}
private static void DebugWarning(string message)
{
throw new Exception("Shutdown check: " + message);
try
{
Close();
}
catch (Exception e)
{
Console.WriteLine("Error occured while shutting down GrpcEnvironment: " + e);
}
});
}
}
}
......@@ -47,6 +47,8 @@ namespace Grpc.Core.Internal
/// </summary>
internal class AsyncCall<TRequest, TResponse> : AsyncCallBase<TRequest, TResponse>
{
Channel channel;
// Completion of a pending unary response if not null.
TaskCompletionSource<TResponse> unaryResponseTcs;
......@@ -61,8 +63,9 @@ namespace Grpc.Core.Internal
public void Initialize(Channel channel, CompletionQueueSafeHandle cq, string methodName)
{
var call = CallSafeHandle.Create(channel.Handle, cq, methodName, channel.Target, Timespec.InfFuture);
DebugStats.ActiveClientCalls.Increment();
this.channel = channel;
var call = CallSafeHandle.Create(channel.Handle, channel.CompletionRegistry, cq, methodName, channel.Target, Timespec.InfFuture);
channel.Environment.DebugStats.ActiveClientCalls.Increment();
InitializeInternal(call);
}
......@@ -277,7 +280,7 @@ namespace Grpc.Core.Internal
protected override void OnReleaseResources()
{
DebugStats.ActiveClientCalls.Decrement();
channel.Environment.DebugStats.ActiveClientCalls.Decrement();
}
/// <summary>
......
......@@ -48,14 +48,17 @@ namespace Grpc.Core.Internal
internal class AsyncCallServer<TRequest, TResponse> : AsyncCallBase<TResponse, TRequest>
{
readonly TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>();
readonly GrpcEnvironment environment;
public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer) : base(serializer, deserializer)
public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, GrpcEnvironment environment) : base(serializer, deserializer)
{
this.environment = Preconditions.CheckNotNull(environment);
}
public void Initialize(CallSafeHandle call)
{
DebugStats.ActiveServerCalls.Increment();
call.SetCompletionRegistry(environment.CompletionRegistry);
environment.DebugStats.ActiveServerCalls.Increment();
InitializeInternal(call);
}
......@@ -114,7 +117,7 @@ namespace Grpc.Core.Internal
protected override void OnReleaseResources()
{
DebugStats.ActiveServerCalls.Decrement();
environment.DebugStats.ActiveServerCalls.Decrement();
}
/// <summary>
......
......@@ -43,6 +43,7 @@ namespace Grpc.Core.Internal
internal class CallSafeHandle : SafeHandleZeroIsInvalid
{
const uint GRPC_WRITE_BUFFER_HINT = 1;
CompletionRegistry completionRegistry;
[DllImport("grpc_csharp_ext.dll")]
static extern CallSafeHandle grpcsharp_channel_create_call(ChannelSafeHandle channel, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline);
......@@ -97,15 +98,22 @@ namespace Grpc.Core.Internal
{
}
public static CallSafeHandle Create(ChannelSafeHandle channel, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline)
public static CallSafeHandle Create(ChannelSafeHandle channel, CompletionRegistry registry, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline)
{
return grpcsharp_channel_create_call(channel, cq, method, host, deadline);
var result = grpcsharp_channel_create_call(channel, cq, method, host, deadline);
result.SetCompletionRegistry(registry);
return result;
}
public void SetCompletionRegistry(CompletionRegistry completionRegistry)
{
this.completionRegistry = completionRegistry;
}
public void StartUnary(byte[] payload, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
{
var ctx = BatchContextSafeHandle.Create();
GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
completionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray)
.CheckOk();
}
......@@ -119,56 +127,56 @@ namespace Grpc.Core.Internal
public void StartClientStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
{
var ctx = BatchContextSafeHandle.Create();
GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
completionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_start_client_streaming(this, ctx, metadataArray).CheckOk();
}
public void StartServerStreaming(byte[] payload, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
{
var ctx = BatchContextSafeHandle.Create();
GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
completionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray).CheckOk();
}
public void StartDuplexStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
{
var ctx = BatchContextSafeHandle.Create();
GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
completionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray).CheckOk();
}
public void StartSendMessage(byte[] payload, BatchCompletionDelegate callback)
{
var ctx = BatchContextSafeHandle.Create();
GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
completionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length)).CheckOk();
}
public void StartSendCloseFromClient(BatchCompletionDelegate callback)
{
var ctx = BatchContextSafeHandle.Create();
GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
completionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_send_close_from_client(this, ctx).CheckOk();
}
public void StartSendStatusFromServer(Status status, BatchCompletionDelegate callback)
{
var ctx = BatchContextSafeHandle.Create();
GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
completionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail).CheckOk();
}
public void StartReceiveMessage(BatchCompletionDelegate callback)
{
var ctx = BatchContextSafeHandle.Create();
GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
completionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_recv_message(this, ctx).CheckOk();
}
public void StartServerSide(BatchCompletionDelegate callback)
{
var ctx = BatchContextSafeHandle.Create();
GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
completionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_start_serverside(this, ctx).CheckOk();
}
......
......@@ -45,11 +45,17 @@ namespace Grpc.Core.Internal
internal class CompletionRegistry
{
readonly ConcurrentDictionary<IntPtr, OpCompletionDelegate> dict = new ConcurrentDictionary<IntPtr, OpCompletionDelegate>();
readonly GrpcEnvironment environment;
readonly ConcurrentDictionary<IntPtr, OpCompletionDelegate> dict = new ConcurrentDictionary<IntPtr, OpCompletionDelegate>();
public CompletionRegistry(GrpcEnvironment environment)
{
this.environment = environment;
}
public void Register(IntPtr key, OpCompletionDelegate callback)
{
DebugStats.PendingBatchCompletions.Increment();
environment.DebugStats.PendingBatchCompletions.Increment();
Preconditions.CheckState(dict.TryAdd(key, callback));
}
......@@ -63,7 +69,7 @@ namespace Grpc.Core.Internal
{
OpCompletionDelegate value;
Preconditions.CheckState(dict.TryRemove(key, out value));
DebugStats.PendingBatchCompletions.Decrement();
environment.DebugStats.PendingBatchCompletions.Decrement();
return value;
}
......
......@@ -36,12 +36,39 @@ using System.Threading;
namespace Grpc.Core.Internal
{
internal static class DebugStats
internal class DebugStats
{
public static readonly AtomicCounter ActiveClientCalls = new AtomicCounter();
public readonly AtomicCounter ActiveClientCalls = new AtomicCounter();
public static readonly AtomicCounter ActiveServerCalls = new AtomicCounter();
public readonly AtomicCounter ActiveServerCalls = new AtomicCounter();
public static readonly AtomicCounter PendingBatchCompletions = new AtomicCounter();
public readonly AtomicCounter PendingBatchCompletions = new AtomicCounter();
/// <summary>
/// Checks the debug stats and take action for any inconsistency found.
/// </summary>
public void CheckOK()
{
var remainingClientCalls = ActiveClientCalls.Count;
if (remainingClientCalls != 0)
{
DebugWarning(string.Format("Detected {0} client calls that weren't disposed properly.", remainingClientCalls));
}
var remainingServerCalls = ActiveServerCalls.Count;
if (remainingServerCalls != 0)
{
DebugWarning(string.Format("Detected {0} server calls that weren't disposed properly.", remainingServerCalls));
}
var pendingBatchCompletions = PendingBatchCompletions.Count;
if (pendingBatchCompletions != 0)
{
DebugWarning(string.Format("Detected {0} pending batch completions.", pendingBatchCompletions));
}
}
private void DebugWarning(string message)
{
throw new Exception("Shutdown check: " + message);
}
}
}
......@@ -45,14 +45,16 @@ namespace Grpc.Core.Internal
/// </summary>
internal class GrpcThreadPool
{
readonly GrpcEnvironment environment;
readonly object myLock = new object();
readonly List<Thread> threads = new List<Thread>();
readonly int poolSize;
CompletionQueueSafeHandle cq;
public GrpcThreadPool(int poolSize)
public GrpcThreadPool(GrpcEnvironment environment, int poolSize)
{
this.environment = environment;
this.poolSize = poolSize;
}
......@@ -80,7 +82,7 @@ namespace Grpc.Core.Internal
{
cq.Shutdown();
Console.WriteLine("Waiting for GPRC threads to finish.");
Console.WriteLine("Waiting for GRPC threads to finish.");
foreach (var thread in threads)
{
thread.Join();
......@@ -122,7 +124,7 @@ namespace Grpc.Core.Internal
IntPtr tag = ev.tag;
try
{
var callback = GrpcEnvironment.CompletionRegistry.Extract(tag);
var callback = environment.CompletionRegistry.Extract(tag);
callback(success);
}
catch (Exception e)
......
......@@ -42,7 +42,7 @@ namespace Grpc.Core.Internal
{
internal interface IServerCallHandler
{
Task HandleCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq);
Task HandleCall(string methodName, CallSafeHandle call, GrpcEnvironment environment);
}
internal class UnaryServerCallHandler<TRequest, TResponse> : IServerCallHandler
......@@ -58,11 +58,12 @@ namespace Grpc.Core.Internal
this.handler = handler;
}
public async Task HandleCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq)
public async Task HandleCall(string methodName, CallSafeHandle call, GrpcEnvironment environment)
{
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer);
method.RequestMarshaller.Deserializer,
environment);
asyncCall.Initialize(call);
var finishedTask = asyncCall.ServerSideCallAsync();
......@@ -110,11 +111,12 @@ namespace Grpc.Core.Internal
this.handler = handler;
}
public async Task HandleCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq)
public async Task HandleCall(string methodName, CallSafeHandle call, GrpcEnvironment environment)
{
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer);
method.RequestMarshaller.Deserializer,
environment);
asyncCall.Initialize(call);
var finishedTask = asyncCall.ServerSideCallAsync();
......@@ -163,11 +165,12 @@ namespace Grpc.Core.Internal
this.handler = handler;
}
public async Task HandleCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq)
public async Task HandleCall(string methodName, CallSafeHandle call, GrpcEnvironment environment)
{
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer);
method.RequestMarshaller.Deserializer,
environment);
asyncCall.Initialize(call);
var finishedTask = asyncCall.ServerSideCallAsync();
......@@ -219,11 +222,12 @@ namespace Grpc.Core.Internal
this.handler = handler;
}
public async Task HandleCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq)
public async Task HandleCall(string methodName, CallSafeHandle call, GrpcEnvironment environment)
{
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer);
method.RequestMarshaller.Deserializer,
environment);
asyncCall.Initialize(call);
var finishedTask = asyncCall.ServerSideCallAsync();
......@@ -255,11 +259,11 @@ namespace Grpc.Core.Internal
internal class NoSuchMethodCallHandler : IServerCallHandler
{
public async Task HandleCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq)
public async Task HandleCall(string methodName, CallSafeHandle call, GrpcEnvironment environment)
{
// We don't care about the payload type here.
var asyncCall = new AsyncCallServer<byte[], byte[]>(
(payload) => payload, (payload) => payload);
(payload) => payload, (payload) => payload, environment);
asyncCall.Initialize(call);
var finishedTask = asyncCall.ServerSideCallAsync();
......
......@@ -91,19 +91,19 @@ namespace Grpc.Core.Internal
{
grpcsharp_server_start(this);
}
public void ShutdownAndNotify(CompletionQueueSafeHandle cq, BatchCompletionDelegate callback)
public void ShutdownAndNotify(BatchCompletionDelegate callback, GrpcEnvironment environment)
{
var ctx = BatchContextSafeHandle.Create();
GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_server_shutdown_and_notify_callback(this, cq, ctx);
environment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_server_shutdown_and_notify_callback(this, environment.CompletionQueue, ctx);
}
public void RequestCall(CompletionQueueSafeHandle cq, BatchCompletionDelegate callback)
public void RequestCall(BatchCompletionDelegate callback, GrpcEnvironment environment)
{
var ctx = BatchContextSafeHandle.Create();
GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_server_request_call(this, cq, ctx).CheckOk();
environment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_server_request_call(this, environment.CompletionQueue, ctx).CheckOk();
}
protected override bool ReleaseHandle()
......
......@@ -52,6 +52,7 @@ namespace Grpc.Core
/// </summary>
public const int PickUnusedPort = 0;
readonly GrpcEnvironment environment;
readonly ServerSafeHandle handle;
readonly object myLock = new object();
......@@ -67,9 +68,10 @@ namespace Grpc.Core
/// <param name="options">Channel options.</param>
public Server(IEnumerable<ChannelOption> options = null)
{
this.environment = GrpcEnvironment.GetInstance();
using (var channelArgs = ChannelOptions.CreateChannelArgs(options))
{
this.handle = ServerSafeHandle.NewServer(GetCompletionQueue(), channelArgs);
this.handle = ServerSafeHandle.NewServer(environment.CompletionQueue, channelArgs);
}
}
......@@ -144,7 +146,7 @@ namespace Grpc.Core
shutdownRequested = true;
}
handle.ShutdownAndNotify(GetCompletionQueue(), HandleServerShutdown);
handle.ShutdownAndNotify(HandleServerShutdown, environment);
await shutdownTcs.Task;
handle.Dispose();
}
......@@ -173,7 +175,7 @@ namespace Grpc.Core
shutdownRequested = true;
}
handle.ShutdownAndNotify(GetCompletionQueue(), HandleServerShutdown);
handle.ShutdownAndNotify(HandleServerShutdown, environment);
handle.CancelAllCalls();
await shutdownTcs.Task;
handle.Dispose();
......@@ -208,7 +210,7 @@ namespace Grpc.Core
{
if (!shutdownRequested)
{
handle.RequestCall(GetCompletionQueue(), HandleNewServerRpc);
handle.RequestCall(HandleNewServerRpc, environment);
}
}
}
......@@ -225,7 +227,7 @@ namespace Grpc.Core
{
callHandler = new NoSuchMethodCallHandler();
}
await callHandler.HandleCall(method, call, GetCompletionQueue());
await callHandler.HandleCall(method, call, environment);
}
catch (Exception e)
{
......@@ -259,10 +261,5 @@ namespace Grpc.Core
{
shutdownTcs.SetResult(null);
}
private static CompletionQueueSafeHandle GetCompletionQueue()
{
return GrpcEnvironment.ThreadPool.CompletionQueue;
}
}
}
......@@ -39,8 +39,6 @@ namespace math
{
public static void Main(string[] args)
{
GrpcEnvironment.Initialize();
using (Channel channel = new Channel("127.0.0.1", 23456))
{
Math.IMathClient stub = new Math.MathClient(channel);
......
......@@ -42,8 +42,6 @@ namespace math
{
string host = "0.0.0.0";
GrpcEnvironment.Initialize();
Server server = new Server();
server.AddServiceDefinition(Math.BindService(new MathServiceImpl()));
int port = server.AddListeningPort(host, 23456);
......
......@@ -54,8 +54,6 @@ namespace math.Tests
[TestFixtureSetUp]
public void Init()
{
GrpcEnvironment.Initialize();
server = new Server();
server.AddServiceDefinition(Math.BindService(new MathServiceImpl()));
int port = server.AddListeningPort(host, Server.PickUnusedPort);
......@@ -75,7 +73,6 @@ namespace math.Tests
public void Cleanup()
{
channel.Dispose();
server.ShutdownAsync().Wait();
GrpcEnvironment.Shutdown();
}
......
......@@ -102,8 +102,6 @@ namespace Grpc.IntegrationTesting
private void Run()
{
GrpcEnvironment.Initialize();
Credentials credentials = null;
if (options.useTls)
{
......@@ -135,7 +133,6 @@ namespace Grpc.IntegrationTesting
TestService.ITestServiceClient client = new TestService.TestServiceClient(channel, stubConfig);
RunTestCase(options.testCase, client);
}
GrpcEnvironment.Shutdown();
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment