Skip to content
Snippets Groups Projects
Commit e6d1de60 authored by Jan Tattermusch's avatar Jan Tattermusch
Browse files

eliminate global completion registry

parent b089320d
No related branches found
No related tags found
No related merge requests found
......@@ -136,7 +136,7 @@ namespace Grpc.Core
tcs.SetCanceled();
}
});
handle.WatchConnectivityState(lastObservedState, deadlineTimespec, completionQueue, environment.CompletionRegistry, handler);
handle.WatchConnectivityState(lastObservedState, deadlineTimespec, completionQueue, handler);
return tcs.Task;
}
......
......@@ -58,7 +58,6 @@ namespace Grpc.Core
static ILogger logger = new ConsoleLogger();
readonly GrpcThreadPool threadPool;
readonly CompletionRegistry completionRegistry;
readonly DebugStats debugStats = new DebugStats();
readonly AtomicCounter cqPickerCounter = new AtomicCounter();
bool isClosed;
......@@ -167,24 +166,12 @@ namespace Grpc.Core
private GrpcEnvironment()
{
GrpcNativeInit();
completionRegistry = new CompletionRegistry(this);
var cqCount = customCompletionQueueCount ?? DefaultCompletionQueueCount;
threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault(), cqCount);
threadPool.Start();
}
/// <summary>
/// Gets the completion registry used by this gRPC environment.
/// </summary>
internal CompletionRegistry CompletionRegistry
{
get
{
return this.completionRegistry;
}
}
/// <summary>
/// Gets the completion queues used by this gRPC environment.
/// </summary>
......
......@@ -82,11 +82,10 @@ namespace Grpc.Core.Internal
return Native.grpcsharp_channel_check_connectivity_state(this, tryToConnect ? 1 : 0);
}
public void WatchConnectivityState(ChannelState lastObservedState, Timespec deadline, CompletionQueueSafeHandle cq,
CompletionRegistry completionRegistry, BatchCompletionDelegate callback)
public void WatchConnectivityState(ChannelState lastObservedState, Timespec deadline, CompletionQueueSafeHandle cq, BatchCompletionDelegate callback)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);
cq.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
Native.grpcsharp_channel_watch_connectivity_state(this, lastObservedState, deadline, cq, ctx);
}
......
......@@ -140,7 +140,7 @@ namespace Grpc.Core.Internal
IntPtr tag = ev.tag;
try
{
var callback = environment.CompletionRegistry.Extract(tag);
var callback = cq.CompletionRegistry.Extract(tag);
callback(success);
}
catch (Exception e)
......
......@@ -72,17 +72,17 @@ namespace Grpc.Core.Internal
Native.grpcsharp_server_start(this);
}
public void ShutdownAndNotify(BatchCompletionDelegate callback, GrpcEnvironment environment)
public void ShutdownAndNotify(BatchCompletionDelegate callback, CompletionQueueSafeHandle completionQueue)
{
var ctx = BatchContextSafeHandle.Create();
environment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
Native.grpcsharp_server_shutdown_and_notify_callback(this, environment.PickCompletionQueue(), ctx);
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
Native.grpcsharp_server_shutdown_and_notify_callback(this, completionQueue, ctx);
}
public void RequestCall(BatchCompletionDelegate callback, GrpcEnvironment environment, CompletionQueueSafeHandle completionQueue)
public void RequestCall(BatchCompletionDelegate callback, CompletionQueueSafeHandle completionQueue)
{
var ctx = BatchContextSafeHandle.Create();
environment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
Native.grpcsharp_server_request_call(this, completionQueue, ctx).CheckOk();
}
......
......@@ -34,6 +34,7 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Grpc.Core.Internal;
using Grpc.Core.Logging;
......@@ -160,7 +161,8 @@ namespace Grpc.Core
shutdownRequested = true;
}
handle.ShutdownAndNotify(HandleServerShutdown, environment);
var cq = environment.CompletionQueues.First(); // any cq will do
handle.ShutdownAndNotify(HandleServerShutdown, cq);
await shutdownTcs.Task.ConfigureAwait(false);
DisposeHandle();
......@@ -180,7 +182,8 @@ namespace Grpc.Core
shutdownRequested = true;
}
handle.ShutdownAndNotify(HandleServerShutdown, environment);
var cq = environment.CompletionQueues.First(); // any cq will do
handle.ShutdownAndNotify(HandleServerShutdown, cq);
handle.CancelAllCalls();
await shutdownTcs.Task.ConfigureAwait(false);
DisposeHandle();
......@@ -254,7 +257,7 @@ namespace Grpc.Core
{
if (!shutdownRequested)
{
handle.RequestCall((success, ctx) => HandleNewServerRpc(success, ctx, cq), environment, cq);
handle.RequestCall((success, ctx) => HandleNewServerRpc(success, ctx, cq), cq);
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment