Skip to content
Snippets Groups Projects
Commit a02d8c88 authored by Jan Tattermusch's avatar Jan Tattermusch Committed by GitHub
Browse files

Merge pull request #11930 from jtattermusch/csharp_safe_threadpool

C#: offload work from GrpcThreadPool by default.
parents 02f3b860 75e9eaed
No related branches found
No related tags found
No related merge requests found
#region Copyright notice and license
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#endregion
using System;
using NUnit.Framework;
using System.Threading;
using System.Threading.Tasks;
namespace Grpc.Core.Tests
{
public class ThreadingModelTest
{
const string Host = "127.0.0.1";
MockServiceHelper helper;
Server server;
Channel channel;
[SetUp]
public void Init()
{
helper = new MockServiceHelper(Host);
server = helper.GetServer();
server.Start();
channel = helper.GetChannel();
}
[TearDown]
public void Cleanup()
{
channel.ShutdownAsync().Wait();
server.ShutdownAsync().Wait();
}
[Test]
public void BlockingCallInServerHandlerDoesNotDeadlock()
{
helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
{
int recursionDepth = int.Parse(request);
if (recursionDepth <= 0) {
return "SUCCESS";
}
return Calls.BlockingUnaryCall(helper.CreateUnaryCall(), (recursionDepth - 1).ToString());
});
int maxRecursionDepth = Environment.ProcessorCount * 2; // make sure we have more pending blocking calls than threads in GrpcThreadPool
Assert.AreEqual("SUCCESS", Calls.BlockingUnaryCall(helper.CreateUnaryCall(), maxRecursionDepth.ToString()));
}
[Test]
public void HandlerDoesNotRunOnGrpcThread()
{
helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
{
if (IsRunningOnGrpcThreadPool()) {
return "Server handler should not run on gRPC threadpool thread.";
}
return request;
});
Assert.AreEqual("ABC", Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "ABC"));
}
[Test]
public async Task ContinuationDoesNotRunOnGrpcThread()
{
helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
{
return request;
});
await Calls.AsyncUnaryCall(helper.CreateUnaryCall(), "ABC");
Assert.IsFalse(IsRunningOnGrpcThreadPool());
}
private static bool IsRunningOnGrpcThreadPool()
{
var threadName = Thread.CurrentThread.Name ?? "";
return threadName.Contains("grpc");
}
}
}
...@@ -64,6 +64,7 @@ ...@@ -64,6 +64,7 @@
<ItemGroup Condition=" '$(TargetFramework)' == 'netstandard1.5' "> <ItemGroup Condition=" '$(TargetFramework)' == 'netstandard1.5' ">
<PackageReference Include="System.Runtime.Loader" Version="4.0.0" /> <PackageReference Include="System.Runtime.Loader" Version="4.0.0" />
<PackageReference Include="System.Threading.Thread" Version="4.0.0" /> <PackageReference Include="System.Threading.Thread" Version="4.0.0" />
<PackageReference Include="System.Threading.ThreadPool" Version="4.0.0" />
</ItemGroup> </ItemGroup>
<Import Project="NativeDeps.csproj.include" /> <Import Project="NativeDeps.csproj.include" />
......
...@@ -39,6 +39,7 @@ namespace Grpc.Core ...@@ -39,6 +39,7 @@ namespace Grpc.Core
static int refCount; static int refCount;
static int? customThreadPoolSize; static int? customThreadPoolSize;
static int? customCompletionQueueCount; static int? customCompletionQueueCount;
static bool inlineHandlers;
static readonly HashSet<Channel> registeredChannels = new HashSet<Channel>(); static readonly HashSet<Channel> registeredChannels = new HashSet<Channel>();
static readonly HashSet<Server> registeredServers = new HashSet<Server>(); static readonly HashSet<Server> registeredServers = new HashSet<Server>();
...@@ -217,13 +218,32 @@ namespace Grpc.Core ...@@ -217,13 +218,32 @@ namespace Grpc.Core
} }
} }
/// <summary>
/// By default, gRPC's internal event handlers get offloaded to .NET default thread pool thread (<c>inlineHandlers=false</c>).
/// Setting <c>inlineHandlers</c> to <c>true</c> will allow scheduling the event handlers directly to
/// <c>GrpcThreadPool</c> internal threads. That can lead to significant performance gains in some situations,
/// but requires user to never block in async code (incorrectly written code can easily lead to deadlocks).
/// Inlining handlers is an advanced setting and you should only use it if you know what you are doing.
/// Most users should rely on the default value provided by gRPC library.
/// Note: this method is part of an experimental API that can change or be removed without any prior notice.
/// Note: <c>inlineHandlers=true</c> was the default in gRPC C# v1.4.x and earlier.
/// </summary>
public static void SetHandlerInlining(bool inlineHandlers)
{
lock (staticLock)
{
GrpcPreconditions.CheckState(instance == null, "Can only be set before GrpcEnvironment is initialized");
GrpcEnvironment.inlineHandlers = inlineHandlers;
}
}
/// <summary> /// <summary>
/// Creates gRPC environment. /// Creates gRPC environment.
/// </summary> /// </summary>
private GrpcEnvironment() private GrpcEnvironment()
{ {
GrpcNativeInit(); GrpcNativeInit();
threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault(), GetCompletionQueueCountOrDefault()); threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault(), GetCompletionQueueCountOrDefault(), inlineHandlers);
threadPool.Start(); threadPool.Start();
} }
......
...@@ -33,12 +33,15 @@ namespace Grpc.Core.Internal ...@@ -33,12 +33,15 @@ namespace Grpc.Core.Internal
internal class GrpcThreadPool internal class GrpcThreadPool
{ {
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<GrpcThreadPool>(); static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<GrpcThreadPool>();
static readonly WaitCallback RunCompletionQueueEventCallbackSuccess = new WaitCallback((callback) => RunCompletionQueueEventCallback((OpCompletionDelegate) callback, true));
static readonly WaitCallback RunCompletionQueueEventCallbackFailure = new WaitCallback((callback) => RunCompletionQueueEventCallback((OpCompletionDelegate) callback, false));
readonly GrpcEnvironment environment; readonly GrpcEnvironment environment;
readonly object myLock = new object(); readonly object myLock = new object();
readonly List<Thread> threads = new List<Thread>(); readonly List<Thread> threads = new List<Thread>();
readonly int poolSize; readonly int poolSize;
readonly int completionQueueCount; readonly int completionQueueCount;
readonly bool inlineHandlers;
readonly List<BasicProfiler> threadProfilers = new List<BasicProfiler>(); // profilers assigned to threadpool threads readonly List<BasicProfiler> threadProfilers = new List<BasicProfiler>(); // profilers assigned to threadpool threads
...@@ -52,11 +55,13 @@ namespace Grpc.Core.Internal ...@@ -52,11 +55,13 @@ namespace Grpc.Core.Internal
/// <param name="environment">Environment.</param> /// <param name="environment">Environment.</param>
/// <param name="poolSize">Pool size.</param> /// <param name="poolSize">Pool size.</param>
/// <param name="completionQueueCount">Completion queue count.</param> /// <param name="completionQueueCount">Completion queue count.</param>
public GrpcThreadPool(GrpcEnvironment environment, int poolSize, int completionQueueCount) /// <param name="inlineHandlers">Handler inlining.</param>
public GrpcThreadPool(GrpcEnvironment environment, int poolSize, int completionQueueCount, bool inlineHandlers)
{ {
this.environment = environment; this.environment = environment;
this.poolSize = poolSize; this.poolSize = poolSize;
this.completionQueueCount = completionQueueCount; this.completionQueueCount = completionQueueCount;
this.inlineHandlers = inlineHandlers;
GrpcPreconditions.CheckArgument(poolSize >= completionQueueCount, GrpcPreconditions.CheckArgument(poolSize >= completionQueueCount,
"Thread pool size cannot be smaller than the number of completion queues used."); "Thread pool size cannot be smaller than the number of completion queues used.");
} }
...@@ -165,11 +170,19 @@ namespace Grpc.Core.Internal ...@@ -165,11 +170,19 @@ namespace Grpc.Core.Internal
try try
{ {
var callback = cq.CompletionRegistry.Extract(tag); var callback = cq.CompletionRegistry.Extract(tag);
callback(success); // Use cached delegates to avoid unnecessary allocations
if (!inlineHandlers)
{
ThreadPool.QueueUserWorkItem(success ? RunCompletionQueueEventCallbackSuccess : RunCompletionQueueEventCallbackFailure, callback);
}
else
{
RunCompletionQueueEventCallback(callback, success);
}
} }
catch (Exception e) catch (Exception e)
{ {
Logger.Error(e, "Exception occured while invoking completion delegate"); Logger.Error(e, "Exception occured while extracting event from completion registry.");
} }
} }
} }
...@@ -186,5 +199,17 @@ namespace Grpc.Core.Internal ...@@ -186,5 +199,17 @@ namespace Grpc.Core.Internal
} }
return list.AsReadOnly(); return list.AsReadOnly();
} }
private static void RunCompletionQueueEventCallback(OpCompletionDelegate callback, bool success)
{
try
{
callback(success);
}
catch (Exception e)
{
Logger.Error(e, "Exception occured while invoking completion delegate");
}
}
} }
} }
...@@ -63,11 +63,6 @@ namespace Grpc.IntegrationTesting ...@@ -63,11 +63,6 @@ namespace Grpc.IntegrationTesting
private async Task RunAsync() private async Task RunAsync()
{ {
// (ThreadPoolSize == ProcessorCount) gives best throughput in benchmarks
// and doesn't seem to harm performance even when server and client
// are running on the same machine.
GrpcEnvironment.SetThreadPoolSize(Environment.ProcessorCount);
string host = "0.0.0.0"; string host = "0.0.0.0";
int port = options.DriverPort; int port = options.DriverPort;
......
...@@ -31,6 +31,7 @@ ...@@ -31,6 +31,7 @@
"Grpc.Core.Tests.ShutdownHookPendingCallTest", "Grpc.Core.Tests.ShutdownHookPendingCallTest",
"Grpc.Core.Tests.ShutdownHookServerTest", "Grpc.Core.Tests.ShutdownHookServerTest",
"Grpc.Core.Tests.ShutdownTest", "Grpc.Core.Tests.ShutdownTest",
"Grpc.Core.Tests.ThreadingModelTest",
"Grpc.Core.Tests.TimeoutsTest", "Grpc.Core.Tests.TimeoutsTest",
"Grpc.Core.Tests.UserAgentStringTest" "Grpc.Core.Tests.UserAgentStringTest"
], ],
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment