diff --git a/src/csharp/Grpc.Core/Channel.cs b/src/csharp/Grpc.Core/Channel.cs index f2211111f6c0895aadf25aec3f4b68d144bf0d25..886adfec334f8c8711ebedb98346190337a7a6be 100644 --- a/src/csharp/Grpc.Core/Channel.cs +++ b/src/csharp/Grpc.Core/Channel.cs @@ -136,7 +136,7 @@ namespace Grpc.Core tcs.SetCanceled(); } }); - handle.WatchConnectivityState(lastObservedState, deadlineTimespec, completionQueue, environment.CompletionRegistry, handler); + handle.WatchConnectivityState(lastObservedState, deadlineTimespec, completionQueue, handler); return tcs.Task; } diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs index acc7b214369f44a7d487cf1c3a7892313adf6514..7fa06bf672ea380ef352cbd80e5cce45e11beaeb 100644 --- a/src/csharp/Grpc.Core/GrpcEnvironment.cs +++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs @@ -58,7 +58,6 @@ namespace Grpc.Core static ILogger logger = new ConsoleLogger(); readonly GrpcThreadPool threadPool; - readonly CompletionRegistry completionRegistry; readonly DebugStats debugStats = new DebugStats(); readonly AtomicCounter cqPickerCounter = new AtomicCounter(); bool isClosed; @@ -167,24 +166,12 @@ namespace Grpc.Core private GrpcEnvironment() { GrpcNativeInit(); - completionRegistry = new CompletionRegistry(this); var cqCount = customCompletionQueueCount ?? DefaultCompletionQueueCount; threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault(), cqCount); threadPool.Start(); } - /// <summary> - /// Gets the completion registry used by this gRPC environment. - /// </summary> - internal CompletionRegistry CompletionRegistry - { - get - { - return this.completionRegistry; - } - } - /// <summary> /// Gets the completion queues used by this gRPC environment. /// </summary> diff --git a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs index a83deaa26f1b98f1c263e8ffa61858e9dcca5268..62864dff0cb88cc774471aed241cfa449b0599ad 100644 --- a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs @@ -82,11 +82,10 @@ namespace Grpc.Core.Internal return Native.grpcsharp_channel_check_connectivity_state(this, tryToConnect ? 1 : 0); } - public void WatchConnectivityState(ChannelState lastObservedState, Timespec deadline, CompletionQueueSafeHandle cq, - CompletionRegistry completionRegistry, BatchCompletionDelegate callback) + public void WatchConnectivityState(ChannelState lastObservedState, Timespec deadline, CompletionQueueSafeHandle cq, BatchCompletionDelegate callback) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, callback); + cq.CompletionRegistry.RegisterBatchCompletion(ctx, callback); Native.grpcsharp_channel_watch_connectivity_state(this, lastObservedState, deadline, cq, ctx); } diff --git a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs index 4059247e30722ea79499f8cf22a2d32184c2a342..ab2cb3836a70867fe8a345274a06efc1152be6cc 100644 --- a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs +++ b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs @@ -140,7 +140,7 @@ namespace Grpc.Core.Internal IntPtr tag = ev.tag; try { - var callback = environment.CompletionRegistry.Extract(tag); + var callback = cq.CompletionRegistry.Extract(tag); callback(success); } catch (Exception e) diff --git a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs index 191b412669e019861ec3adfab5ff958af09f76ad..85813027064f47359be46b25bdb3bf8cf5e0736e 100644 --- a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs @@ -72,17 +72,17 @@ namespace Grpc.Core.Internal Native.grpcsharp_server_start(this); } - public void ShutdownAndNotify(BatchCompletionDelegate callback, GrpcEnvironment environment) + public void ShutdownAndNotify(BatchCompletionDelegate callback, CompletionQueueSafeHandle completionQueue) { var ctx = BatchContextSafeHandle.Create(); - environment.CompletionRegistry.RegisterBatchCompletion(ctx, callback); - Native.grpcsharp_server_shutdown_and_notify_callback(this, environment.PickCompletionQueue(), ctx); + completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, callback); + Native.grpcsharp_server_shutdown_and_notify_callback(this, completionQueue, ctx); } - public void RequestCall(BatchCompletionDelegate callback, GrpcEnvironment environment, CompletionQueueSafeHandle completionQueue) + public void RequestCall(BatchCompletionDelegate callback, CompletionQueueSafeHandle completionQueue) { var ctx = BatchContextSafeHandle.Create(); - environment.CompletionRegistry.RegisterBatchCompletion(ctx, callback); + completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, callback); Native.grpcsharp_server_request_call(this, completionQueue, ctx).CheckOk(); } diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs index b66de8178924dd613094f4b152953df1c4c9392f..069185e13af9b6da2af5fee9ecaea1174d951d3f 100644 --- a/src/csharp/Grpc.Core/Server.cs +++ b/src/csharp/Grpc.Core/Server.cs @@ -34,6 +34,7 @@ using System; using System.Collections; using System.Collections.Generic; +using System.Linq; using System.Threading.Tasks; using Grpc.Core.Internal; using Grpc.Core.Logging; @@ -160,7 +161,8 @@ namespace Grpc.Core shutdownRequested = true; } - handle.ShutdownAndNotify(HandleServerShutdown, environment); + var cq = environment.CompletionQueues.First(); // any cq will do + handle.ShutdownAndNotify(HandleServerShutdown, cq); await shutdownTcs.Task.ConfigureAwait(false); DisposeHandle(); @@ -180,7 +182,8 @@ namespace Grpc.Core shutdownRequested = true; } - handle.ShutdownAndNotify(HandleServerShutdown, environment); + var cq = environment.CompletionQueues.First(); // any cq will do + handle.ShutdownAndNotify(HandleServerShutdown, cq); handle.CancelAllCalls(); await shutdownTcs.Task.ConfigureAwait(false); DisposeHandle(); @@ -254,7 +257,7 @@ namespace Grpc.Core { if (!shutdownRequested) { - handle.RequestCall((success, ctx) => HandleNewServerRpc(success, ctx, cq), environment, cq); + handle.RequestCall((success, ctx) => HandleNewServerRpc(success, ctx, cq), cq); } }