Skip to content
Snippets Groups Projects
Commit 633434ae authored by Jan Tattermusch's avatar Jan Tattermusch
Browse files

dont run user handlers on grpc threadpool threads

parent 2c12d506
No related branches found
No related tags found
No related merge requests found
#region Copyright notice and license
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#endregion
using System;
using NUnit.Framework;
using System.Threading;
using System.Threading.Tasks;
namespace Grpc.Core.Tests
{
public class ThreadingModelTest
{
const string Host = "127.0.0.1";
MockServiceHelper helper;
Server server;
Channel channel;
[SetUp]
public void Init()
{
helper = new MockServiceHelper(Host);
server = helper.GetServer();
server.Start();
channel = helper.GetChannel();
}
[TearDown]
public void Cleanup()
{
channel.ShutdownAsync().Wait();
server.ShutdownAsync().Wait();
}
[Test]
public void BlockingCallInServerHandlerDoesNotDeadlock()
{
helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
{
int recursionDepth = int.Parse(request);
if (recursionDepth <= 0) {
return "SUCCESS";
}
return Calls.BlockingUnaryCall(helper.CreateUnaryCall(), (recursionDepth - 1).ToString());
});
int maxRecursionDepth = Environment.ProcessorCount * 2; // make sure we have more pending blocking calls than threads in GrpcThreadPool
Assert.AreEqual("SUCCESS", Calls.BlockingUnaryCall(helper.CreateUnaryCall(), maxRecursionDepth.ToString()));
}
[Test]
public void HandlerDoesNotRunOnGrpcThread()
{
helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
{
if (IsRunningOnGrpcThreadPool()) {
return "Server handler should not run on gRPC threadpool thread.";
}
return request;
});
Assert.AreEqual("ABC", Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "ABC"));
}
[Test]
public async Task ContinuationDoesNotRunOnGrpcThread()
{
helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
{
return request;
});
await Calls.AsyncUnaryCall(helper.CreateUnaryCall(), "ABC");
Assert.IsFalse(IsRunningOnGrpcThreadPool());
}
private static bool IsRunningOnGrpcThreadPool()
{
var threadName = Thread.CurrentThread.Name ?? "";
return threadName.Contains("grpc");
}
}
}
...@@ -64,6 +64,7 @@ ...@@ -64,6 +64,7 @@
<ItemGroup Condition=" '$(TargetFramework)' == 'netstandard1.5' "> <ItemGroup Condition=" '$(TargetFramework)' == 'netstandard1.5' ">
<PackageReference Include="System.Runtime.Loader" Version="4.0.0" /> <PackageReference Include="System.Runtime.Loader" Version="4.0.0" />
<PackageReference Include="System.Threading.Thread" Version="4.0.0" /> <PackageReference Include="System.Threading.Thread" Version="4.0.0" />
<PackageReference Include="System.Threading.ThreadPool" Version="4.0.0" />
</ItemGroup> </ItemGroup>
<Import Project="NativeDeps.csproj.include" /> <Import Project="NativeDeps.csproj.include" />
......
...@@ -33,6 +33,8 @@ namespace Grpc.Core.Internal ...@@ -33,6 +33,8 @@ namespace Grpc.Core.Internal
internal class GrpcThreadPool internal class GrpcThreadPool
{ {
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<GrpcThreadPool>(); static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<GrpcThreadPool>();
static readonly WaitCallback RunCompletionQueueEventCallbackSuccess = new WaitCallback((callback) => RunCompletionQueueEventCallback((OpCompletionDelegate) callback, true));
static readonly WaitCallback RunCompletionQueueEventCallbackFailure = new WaitCallback((callback) => RunCompletionQueueEventCallback((OpCompletionDelegate) callback, false));
readonly GrpcEnvironment environment; readonly GrpcEnvironment environment;
readonly object myLock = new object(); readonly object myLock = new object();
...@@ -165,11 +167,12 @@ namespace Grpc.Core.Internal ...@@ -165,11 +167,12 @@ namespace Grpc.Core.Internal
try try
{ {
var callback = cq.CompletionRegistry.Extract(tag); var callback = cq.CompletionRegistry.Extract(tag);
callback(success); // Use cached delegates to avoid unnecessary allocations
ThreadPool.QueueUserWorkItem(success ? RunCompletionQueueEventCallbackSuccess : RunCompletionQueueEventCallbackFailure, callback);
} }
catch (Exception e) catch (Exception e)
{ {
Logger.Error(e, "Exception occured while invoking completion delegate"); Logger.Error(e, "Exception occured while extracting event from completion registry.");
} }
} }
} }
...@@ -186,5 +189,17 @@ namespace Grpc.Core.Internal ...@@ -186,5 +189,17 @@ namespace Grpc.Core.Internal
} }
return list.AsReadOnly(); return list.AsReadOnly();
} }
private static void RunCompletionQueueEventCallback(OpCompletionDelegate callback, bool success)
{
try
{
callback(success);
}
catch (Exception e)
{
Logger.Error(e, "Exception occured while invoking completion delegate");
}
}
} }
} }
...@@ -31,6 +31,7 @@ ...@@ -31,6 +31,7 @@
"Grpc.Core.Tests.ShutdownHookPendingCallTest", "Grpc.Core.Tests.ShutdownHookPendingCallTest",
"Grpc.Core.Tests.ShutdownHookServerTest", "Grpc.Core.Tests.ShutdownHookServerTest",
"Grpc.Core.Tests.ShutdownTest", "Grpc.Core.Tests.ShutdownTest",
"Grpc.Core.Tests.ThreadingModelTest",
"Grpc.Core.Tests.TimeoutsTest", "Grpc.Core.Tests.TimeoutsTest",
"Grpc.Core.Tests.UserAgentStringTest" "Grpc.Core.Tests.UserAgentStringTest"
], ],
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment