diff --git a/src/csharp/Grpc.Core.Tests/ThreadingModelTest.cs b/src/csharp/Grpc.Core.Tests/ThreadingModelTest.cs new file mode 100644 index 0000000000000000000000000000000000000000..fb181989455dd97f238f425b4cee5b010eb2a49c --- /dev/null +++ b/src/csharp/Grpc.Core.Tests/ThreadingModelTest.cs @@ -0,0 +1,98 @@ +#region Copyright notice and license + +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; +using NUnit.Framework; +using System.Threading; +using System.Threading.Tasks; + +namespace Grpc.Core.Tests +{ + public class ThreadingModelTest + { + const string Host = "127.0.0.1"; + + MockServiceHelper helper; + Server server; + Channel channel; + + [SetUp] + public void Init() + { + helper = new MockServiceHelper(Host); + server = helper.GetServer(); + server.Start(); + channel = helper.GetChannel(); + } + + [TearDown] + public void Cleanup() + { + channel.ShutdownAsync().Wait(); + server.ShutdownAsync().Wait(); + } + + [Test] + public void BlockingCallInServerHandlerDoesNotDeadlock() + { + helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) => + { + int recursionDepth = int.Parse(request); + if (recursionDepth <= 0) { + return "SUCCESS"; + } + return Calls.BlockingUnaryCall(helper.CreateUnaryCall(), (recursionDepth - 1).ToString()); + }); + + int maxRecursionDepth = Environment.ProcessorCount * 2; // make sure we have more pending blocking calls than threads in GrpcThreadPool + Assert.AreEqual("SUCCESS", Calls.BlockingUnaryCall(helper.CreateUnaryCall(), maxRecursionDepth.ToString())); + } + + [Test] + public void HandlerDoesNotRunOnGrpcThread() + { + helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) => + { + if (IsRunningOnGrpcThreadPool()) { + return "Server handler should not run on gRPC threadpool thread."; + } + return request; + }); + + Assert.AreEqual("ABC", Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "ABC")); + } + + [Test] + public async Task ContinuationDoesNotRunOnGrpcThread() + { + helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) => + { + return request; + }); + + await Calls.AsyncUnaryCall(helper.CreateUnaryCall(), "ABC"); + Assert.IsFalse(IsRunningOnGrpcThreadPool()); + } + + private static bool IsRunningOnGrpcThreadPool() + { + var threadName = Thread.CurrentThread.Name ?? ""; + return threadName.Contains("grpc"); + } + } +} diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj index 50358298f48ae90fa8e5dedef03e9b70113903be..e32711c5204c8eb4f9d3585126cbfa03eb9f40fa 100755 --- a/src/csharp/Grpc.Core/Grpc.Core.csproj +++ b/src/csharp/Grpc.Core/Grpc.Core.csproj @@ -64,6 +64,7 @@ <ItemGroup Condition=" '$(TargetFramework)' == 'netstandard1.5' "> <PackageReference Include="System.Runtime.Loader" Version="4.0.0" /> <PackageReference Include="System.Threading.Thread" Version="4.0.0" /> + <PackageReference Include="System.Threading.ThreadPool" Version="4.0.0" /> </ItemGroup> <Import Project="NativeDeps.csproj.include" /> diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs index 8d0c66aa5bf86a3098665cb44d20201f2a0326dc..0663ee92150093105016ee864fad852e89c3766a 100644 --- a/src/csharp/Grpc.Core/GrpcEnvironment.cs +++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs @@ -39,6 +39,7 @@ namespace Grpc.Core static int refCount; static int? customThreadPoolSize; static int? customCompletionQueueCount; + static bool inlineHandlers; static readonly HashSet<Channel> registeredChannels = new HashSet<Channel>(); static readonly HashSet<Server> registeredServers = new HashSet<Server>(); @@ -217,13 +218,32 @@ namespace Grpc.Core } } + /// <summary> + /// By default, gRPC's internal event handlers get offloaded to .NET default thread pool thread (<c>inlineHandlers=false</c>). + /// Setting <c>inlineHandlers</c> to <c>true</c> will allow scheduling the event handlers directly to + /// <c>GrpcThreadPool</c> internal threads. That can lead to significant performance gains in some situations, + /// but requires user to never block in async code (incorrectly written code can easily lead to deadlocks). + /// Inlining handlers is an advanced setting and you should only use it if you know what you are doing. + /// Most users should rely on the default value provided by gRPC library. + /// Note: this method is part of an experimental API that can change or be removed without any prior notice. + /// Note: <c>inlineHandlers=true</c> was the default in gRPC C# v1.4.x and earlier. + /// </summary> + public static void SetHandlerInlining(bool inlineHandlers) + { + lock (staticLock) + { + GrpcPreconditions.CheckState(instance == null, "Can only be set before GrpcEnvironment is initialized"); + GrpcEnvironment.inlineHandlers = inlineHandlers; + } + } + /// <summary> /// Creates gRPC environment. /// </summary> private GrpcEnvironment() { GrpcNativeInit(); - threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault(), GetCompletionQueueCountOrDefault()); + threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault(), GetCompletionQueueCountOrDefault(), inlineHandlers); threadPool.Start(); } diff --git a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs index f9ae77c74e9e2378eb7af8d66d39924b42b0771b..19b44c26189dbe1b79683629cd74e3da170d0a81 100644 --- a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs +++ b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs @@ -33,12 +33,15 @@ namespace Grpc.Core.Internal internal class GrpcThreadPool { static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<GrpcThreadPool>(); + static readonly WaitCallback RunCompletionQueueEventCallbackSuccess = new WaitCallback((callback) => RunCompletionQueueEventCallback((OpCompletionDelegate) callback, true)); + static readonly WaitCallback RunCompletionQueueEventCallbackFailure = new WaitCallback((callback) => RunCompletionQueueEventCallback((OpCompletionDelegate) callback, false)); readonly GrpcEnvironment environment; readonly object myLock = new object(); readonly List<Thread> threads = new List<Thread>(); readonly int poolSize; readonly int completionQueueCount; + readonly bool inlineHandlers; readonly List<BasicProfiler> threadProfilers = new List<BasicProfiler>(); // profilers assigned to threadpool threads @@ -52,11 +55,13 @@ namespace Grpc.Core.Internal /// <param name="environment">Environment.</param> /// <param name="poolSize">Pool size.</param> /// <param name="completionQueueCount">Completion queue count.</param> - public GrpcThreadPool(GrpcEnvironment environment, int poolSize, int completionQueueCount) + /// <param name="inlineHandlers">Handler inlining.</param> + public GrpcThreadPool(GrpcEnvironment environment, int poolSize, int completionQueueCount, bool inlineHandlers) { this.environment = environment; this.poolSize = poolSize; this.completionQueueCount = completionQueueCount; + this.inlineHandlers = inlineHandlers; GrpcPreconditions.CheckArgument(poolSize >= completionQueueCount, "Thread pool size cannot be smaller than the number of completion queues used."); } @@ -165,11 +170,19 @@ namespace Grpc.Core.Internal try { var callback = cq.CompletionRegistry.Extract(tag); - callback(success); + // Use cached delegates to avoid unnecessary allocations + if (!inlineHandlers) + { + ThreadPool.QueueUserWorkItem(success ? RunCompletionQueueEventCallbackSuccess : RunCompletionQueueEventCallbackFailure, callback); + } + else + { + RunCompletionQueueEventCallback(callback, success); + } } catch (Exception e) { - Logger.Error(e, "Exception occured while invoking completion delegate"); + Logger.Error(e, "Exception occured while extracting event from completion registry."); } } } @@ -186,5 +199,17 @@ namespace Grpc.Core.Internal } return list.AsReadOnly(); } + + private static void RunCompletionQueueEventCallback(OpCompletionDelegate callback, bool success) + { + try + { + callback(success); + } + catch (Exception e) + { + Logger.Error(e, "Exception occured while invoking completion delegate"); + } + } } } diff --git a/src/csharp/Grpc.IntegrationTesting/QpsWorker.cs b/src/csharp/Grpc.IntegrationTesting/QpsWorker.cs index 7009a93b1868563268bf04aa104e8bcb20899b05..a579fb80406d4eccaafb78cae258e0a347c89c95 100644 --- a/src/csharp/Grpc.IntegrationTesting/QpsWorker.cs +++ b/src/csharp/Grpc.IntegrationTesting/QpsWorker.cs @@ -63,11 +63,6 @@ namespace Grpc.IntegrationTesting private async Task RunAsync() { - // (ThreadPoolSize == ProcessorCount) gives best throughput in benchmarks - // and doesn't seem to harm performance even when server and client - // are running on the same machine. - GrpcEnvironment.SetThreadPoolSize(Environment.ProcessorCount); - string host = "0.0.0.0"; int port = options.DriverPort; diff --git a/src/csharp/tests.json b/src/csharp/tests.json index bc6adbbfe8b05de29398914b007bce1fe4911d7a..784105105267968ce7b66119166def8440f1a0e2 100644 --- a/src/csharp/tests.json +++ b/src/csharp/tests.json @@ -31,6 +31,7 @@ "Grpc.Core.Tests.ShutdownHookPendingCallTest", "Grpc.Core.Tests.ShutdownHookServerTest", "Grpc.Core.Tests.ShutdownTest", + "Grpc.Core.Tests.ThreadingModelTest", "Grpc.Core.Tests.TimeoutsTest", "Grpc.Core.Tests.UserAgentStringTest" ],