From 633434aed2b7db1e206c884257b198e04f2bf60e Mon Sep 17 00:00:00 2001 From: Jan Tattermusch <jtattermusch@google.com> Date: Mon, 24 Jul 2017 17:11:35 +0200 Subject: [PATCH] dont run user handlers on grpc threadpool threads --- .../Grpc.Core.Tests/ThreadingModelTest.cs | 98 +++++++++++++++++++ src/csharp/Grpc.Core/Grpc.Core.csproj | 1 + .../Grpc.Core/Internal/GrpcThreadPool.cs | 19 +++- src/csharp/tests.json | 1 + 4 files changed, 117 insertions(+), 2 deletions(-) create mode 100644 src/csharp/Grpc.Core.Tests/ThreadingModelTest.cs diff --git a/src/csharp/Grpc.Core.Tests/ThreadingModelTest.cs b/src/csharp/Grpc.Core.Tests/ThreadingModelTest.cs new file mode 100644 index 0000000000..fb18198945 --- /dev/null +++ b/src/csharp/Grpc.Core.Tests/ThreadingModelTest.cs @@ -0,0 +1,98 @@ +#region Copyright notice and license + +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; +using NUnit.Framework; +using System.Threading; +using System.Threading.Tasks; + +namespace Grpc.Core.Tests +{ + public class ThreadingModelTest + { + const string Host = "127.0.0.1"; + + MockServiceHelper helper; + Server server; + Channel channel; + + [SetUp] + public void Init() + { + helper = new MockServiceHelper(Host); + server = helper.GetServer(); + server.Start(); + channel = helper.GetChannel(); + } + + [TearDown] + public void Cleanup() + { + channel.ShutdownAsync().Wait(); + server.ShutdownAsync().Wait(); + } + + [Test] + public void BlockingCallInServerHandlerDoesNotDeadlock() + { + helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) => + { + int recursionDepth = int.Parse(request); + if (recursionDepth <= 0) { + return "SUCCESS"; + } + return Calls.BlockingUnaryCall(helper.CreateUnaryCall(), (recursionDepth - 1).ToString()); + }); + + int maxRecursionDepth = Environment.ProcessorCount * 2; // make sure we have more pending blocking calls than threads in GrpcThreadPool + Assert.AreEqual("SUCCESS", Calls.BlockingUnaryCall(helper.CreateUnaryCall(), maxRecursionDepth.ToString())); + } + + [Test] + public void HandlerDoesNotRunOnGrpcThread() + { + helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) => + { + if (IsRunningOnGrpcThreadPool()) { + return "Server handler should not run on gRPC threadpool thread."; + } + return request; + }); + + Assert.AreEqual("ABC", Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "ABC")); + } + + [Test] + public async Task ContinuationDoesNotRunOnGrpcThread() + { + helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) => + { + return request; + }); + + await Calls.AsyncUnaryCall(helper.CreateUnaryCall(), "ABC"); + Assert.IsFalse(IsRunningOnGrpcThreadPool()); + } + + private static bool IsRunningOnGrpcThreadPool() + { + var threadName = Thread.CurrentThread.Name ?? ""; + return threadName.Contains("grpc"); + } + } +} diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj index 50358298f4..e32711c520 100755 --- a/src/csharp/Grpc.Core/Grpc.Core.csproj +++ b/src/csharp/Grpc.Core/Grpc.Core.csproj @@ -64,6 +64,7 @@ <ItemGroup Condition=" '$(TargetFramework)' == 'netstandard1.5' "> <PackageReference Include="System.Runtime.Loader" Version="4.0.0" /> <PackageReference Include="System.Threading.Thread" Version="4.0.0" /> + <PackageReference Include="System.Threading.ThreadPool" Version="4.0.0" /> </ItemGroup> <Import Project="NativeDeps.csproj.include" /> diff --git a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs index f9ae77c74e..8640058b0c 100644 --- a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs +++ b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs @@ -33,6 +33,8 @@ namespace Grpc.Core.Internal internal class GrpcThreadPool { static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<GrpcThreadPool>(); + static readonly WaitCallback RunCompletionQueueEventCallbackSuccess = new WaitCallback((callback) => RunCompletionQueueEventCallback((OpCompletionDelegate) callback, true)); + static readonly WaitCallback RunCompletionQueueEventCallbackFailure = new WaitCallback((callback) => RunCompletionQueueEventCallback((OpCompletionDelegate) callback, false)); readonly GrpcEnvironment environment; readonly object myLock = new object(); @@ -165,11 +167,12 @@ namespace Grpc.Core.Internal try { var callback = cq.CompletionRegistry.Extract(tag); - callback(success); + // Use cached delegates to avoid unnecessary allocations + ThreadPool.QueueUserWorkItem(success ? RunCompletionQueueEventCallbackSuccess : RunCompletionQueueEventCallbackFailure, callback); } catch (Exception e) { - Logger.Error(e, "Exception occured while invoking completion delegate"); + Logger.Error(e, "Exception occured while extracting event from completion registry."); } } } @@ -186,5 +189,17 @@ namespace Grpc.Core.Internal } return list.AsReadOnly(); } + + private static void RunCompletionQueueEventCallback(OpCompletionDelegate callback, bool success) + { + try + { + callback(success); + } + catch (Exception e) + { + Logger.Error(e, "Exception occured while invoking completion delegate"); + } + } } } diff --git a/src/csharp/tests.json b/src/csharp/tests.json index bc6adbbfe8..7841051052 100644 --- a/src/csharp/tests.json +++ b/src/csharp/tests.json @@ -31,6 +31,7 @@ "Grpc.Core.Tests.ShutdownHookPendingCallTest", "Grpc.Core.Tests.ShutdownHookServerTest", "Grpc.Core.Tests.ShutdownTest", + "Grpc.Core.Tests.ThreadingModelTest", "Grpc.Core.Tests.TimeoutsTest", "Grpc.Core.Tests.UserAgentStringTest" ], -- GitLab