Skip to content
Snippets Groups Projects
Commit 5858441a authored by Jan Tattermusch's avatar Jan Tattermusch
Browse files

make environment shutdown asynchronous

parent 85030e31
No related branches found
No related tags found
No related merge requests found
...@@ -49,7 +49,7 @@ namespace Grpc.Core.Tests ...@@ -49,7 +49,7 @@ namespace Grpc.Core.Tests
{ {
Assert.IsNotNull(env.CompletionQueues.ElementAt(i)); Assert.IsNotNull(env.CompletionQueues.ElementAt(i));
} }
GrpcEnvironment.Release(); GrpcEnvironment.ReleaseAsync().Wait();
} }
[Test] [Test]
...@@ -58,8 +58,8 @@ namespace Grpc.Core.Tests ...@@ -58,8 +58,8 @@ namespace Grpc.Core.Tests
var env1 = GrpcEnvironment.AddRef(); var env1 = GrpcEnvironment.AddRef();
var env2 = GrpcEnvironment.AddRef(); var env2 = GrpcEnvironment.AddRef();
Assert.AreSame(env1, env2); Assert.AreSame(env1, env2);
GrpcEnvironment.Release(); GrpcEnvironment.ReleaseAsync().Wait();
GrpcEnvironment.Release(); GrpcEnvironment.ReleaseAsync().Wait();
} }
[Test] [Test]
...@@ -68,10 +68,10 @@ namespace Grpc.Core.Tests ...@@ -68,10 +68,10 @@ namespace Grpc.Core.Tests
Assert.AreEqual(0, GrpcEnvironment.GetRefCount()); Assert.AreEqual(0, GrpcEnvironment.GetRefCount());
var env1 = GrpcEnvironment.AddRef(); var env1 = GrpcEnvironment.AddRef();
GrpcEnvironment.Release(); GrpcEnvironment.ReleaseAsync().Wait();
var env2 = GrpcEnvironment.AddRef(); var env2 = GrpcEnvironment.AddRef();
GrpcEnvironment.Release(); GrpcEnvironment.ReleaseAsync().Wait();
Assert.AreNotSame(env1, env2); Assert.AreNotSame(env1, env2);
} }
...@@ -80,7 +80,7 @@ namespace Grpc.Core.Tests ...@@ -80,7 +80,7 @@ namespace Grpc.Core.Tests
public void ReleaseWithoutAddRef() public void ReleaseWithoutAddRef()
{ {
Assert.AreEqual(0, GrpcEnvironment.GetRefCount()); Assert.AreEqual(0, GrpcEnvironment.GetRefCount());
Assert.Throws(typeof(InvalidOperationException), () => GrpcEnvironment.Release()); Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await GrpcEnvironment.ReleaseAsync());
} }
[Test] [Test]
......
...@@ -48,7 +48,7 @@ namespace Grpc.Core.Internal.Tests ...@@ -48,7 +48,7 @@ namespace Grpc.Core.Internal.Tests
GrpcEnvironment.AddRef(); GrpcEnvironment.AddRef();
var cq = CompletionQueueSafeHandle.Create(); var cq = CompletionQueueSafeHandle.Create();
cq.Dispose(); cq.Dispose();
GrpcEnvironment.Release(); GrpcEnvironment.ReleaseAsync().Wait();
} }
[Test] [Test]
...@@ -59,7 +59,7 @@ namespace Grpc.Core.Internal.Tests ...@@ -59,7 +59,7 @@ namespace Grpc.Core.Internal.Tests
cq.Shutdown(); cq.Shutdown();
var ev = cq.Next(); var ev = cq.Next();
cq.Dispose(); cq.Dispose();
GrpcEnvironment.Release(); GrpcEnvironment.ReleaseAsync().Wait();
Assert.AreEqual(CompletionQueueEvent.CompletionType.Shutdown, ev.type); Assert.AreEqual(CompletionQueueEvent.CompletionType.Shutdown, ev.type);
Assert.AreNotEqual(IntPtr.Zero, ev.success); Assert.AreNotEqual(IntPtr.Zero, ev.success);
Assert.AreEqual(IntPtr.Zero, ev.tag); Assert.AreEqual(IntPtr.Zero, ev.tag);
......
...@@ -65,7 +65,7 @@ namespace Grpc.Core.Tests ...@@ -65,7 +65,7 @@ namespace Grpc.Core.Tests
cq.Dispose(); cq.Dispose();
}); });
GrpcEnvironment.Release(); GrpcEnvironment.ReleaseAsync().Wait();
} }
/// <summary> /// <summary>
......
...@@ -220,7 +220,7 @@ namespace Grpc.Core ...@@ -220,7 +220,7 @@ namespace Grpc.Core
handle.Dispose(); handle.Dispose();
await Task.Run(() => GrpcEnvironment.Release()).ConfigureAwait(false); await GrpcEnvironment.ReleaseAsync().ConfigureAwait(false);
} }
internal ChannelSafeHandle Handle internal ChannelSafeHandle Handle
......
...@@ -35,6 +35,7 @@ using System; ...@@ -35,6 +35,7 @@ using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Runtime.InteropServices; using System.Runtime.InteropServices;
using System.Threading.Tasks;
using Grpc.Core.Internal; using Grpc.Core.Internal;
using Grpc.Core.Logging; using Grpc.Core.Logging;
using Grpc.Core.Utils; using Grpc.Core.Utils;
...@@ -79,21 +80,26 @@ namespace Grpc.Core ...@@ -79,21 +80,26 @@ namespace Grpc.Core
} }
/// <summary> /// <summary>
/// Decrements the reference count for currently active environment and shuts down the gRPC environment if reference count drops to zero. /// Decrements the reference count for currently active environment and asynchronously shuts down the gRPC environment if reference count drops to zero.
/// (and blocks until the environment has been fully shutdown).
/// </summary> /// </summary>
internal static void Release() internal static async Task ReleaseAsync()
{ {
GrpcEnvironment instanceToShutdown = null;
lock (staticLock) lock (staticLock)
{ {
GrpcPreconditions.CheckState(refCount > 0); GrpcPreconditions.CheckState(refCount > 0);
refCount--; refCount--;
if (refCount == 0) if (refCount == 0)
{ {
instance.Close(); instanceToShutdown = instance;
instance = null; instance = null;
} }
} }
if (instanceToShutdown != null)
{
await instanceToShutdown.ShutdownAsync();
}
} }
internal static int GetRefCount() internal static int GetRefCount()
...@@ -223,13 +229,13 @@ namespace Grpc.Core ...@@ -223,13 +229,13 @@ namespace Grpc.Core
/// <summary> /// <summary>
/// Shuts down this environment. /// Shuts down this environment.
/// </summary> /// </summary>
private void Close() private async Task ShutdownAsync()
{ {
if (isClosed) if (isClosed)
{ {
throw new InvalidOperationException("Close has already been called"); throw new InvalidOperationException("Close has already been called");
} }
threadPool.Stop(); await threadPool.StopAsync().ConfigureAwait(false);
GrpcNativeShutdown(); GrpcNativeShutdown();
isClosed = true; isClosed = true;
......
...@@ -35,6 +35,7 @@ using System; ...@@ -35,6 +35,7 @@ using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Threading; using System.Threading;
using System.Threading.Tasks;
using Grpc.Core.Logging; using Grpc.Core.Logging;
using Grpc.Core.Utils; using Grpc.Core.Utils;
...@@ -53,6 +54,8 @@ namespace Grpc.Core.Internal ...@@ -53,6 +54,8 @@ namespace Grpc.Core.Internal
readonly int poolSize; readonly int poolSize;
readonly int completionQueueCount; readonly int completionQueueCount;
bool stopRequested;
IReadOnlyCollection<CompletionQueueSafeHandle> completionQueues; IReadOnlyCollection<CompletionQueueSafeHandle> completionQueues;
/// <summary> /// <summary>
...@@ -84,15 +87,21 @@ namespace Grpc.Core.Internal ...@@ -84,15 +87,21 @@ namespace Grpc.Core.Internal
} }
} }
public void Stop() public Task StopAsync()
{ {
lock (myLock) lock (myLock)
{ {
GrpcPreconditions.CheckState(!stopRequested, "Stop already requested.");
stopRequested = true;
foreach (var cq in completionQueues) foreach (var cq in completionQueues)
{ {
cq.Shutdown(); cq.Shutdown();
} }
}
return Task.Run(() =>
{
foreach (var thread in threads) foreach (var thread in threads)
{ {
thread.Join(); thread.Join();
...@@ -102,7 +111,7 @@ namespace Grpc.Core.Internal ...@@ -102,7 +111,7 @@ namespace Grpc.Core.Internal
{ {
cq.Dispose(); cq.Dispose();
} }
} });
} }
internal IReadOnlyCollection<CompletionQueueSafeHandle> CompletionQueues internal IReadOnlyCollection<CompletionQueueSafeHandle> CompletionQueues
......
...@@ -169,7 +169,7 @@ namespace Grpc.Core ...@@ -169,7 +169,7 @@ namespace Grpc.Core
await shutdownTcs.Task.ConfigureAwait(false); await shutdownTcs.Task.ConfigureAwait(false);
DisposeHandle(); DisposeHandle();
await Task.Run(() => GrpcEnvironment.Release()).ConfigureAwait(false); await GrpcEnvironment.ReleaseAsync().ConfigureAwait(false);
} }
/// <summary> /// <summary>
...@@ -194,7 +194,7 @@ namespace Grpc.Core ...@@ -194,7 +194,7 @@ namespace Grpc.Core
await shutdownTcs.Task.ConfigureAwait(false); await shutdownTcs.Task.ConfigureAwait(false);
DisposeHandle(); DisposeHandle();
await Task.Run(() => GrpcEnvironment.Release()).ConfigureAwait(false); await GrpcEnvironment.ReleaseAsync().ConfigureAwait(false);
} }
internal void AddCallReference(object call) internal void AddCallReference(object call)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment