Skip to content
Snippets Groups Projects
Commit 694ef25a authored by Julien Boeuf's avatar Julien Boeuf
Browse files

Merge branch 'master' of github.com:grpc/grpc into security_connector_refactoring

parents 87047d7e b7e55a20
No related branches found
No related tags found
No related merge requests found
Showing
with 220 additions and 137 deletions
......@@ -41,12 +41,6 @@ namespace Grpc.Core.Tests
{
public class ChannelTest
{
[TestFixtureTearDown]
public void CleanupClass()
{
GrpcEnvironment.Shutdown();
}
[Test]
public void Constructor_RejectsInvalidParams()
{
......@@ -56,36 +50,33 @@ namespace Grpc.Core.Tests
[Test]
public void State_IdleAfterCreation()
{
using (var channel = new Channel("localhost", Credentials.Insecure))
{
var channel = new Channel("localhost", Credentials.Insecure);
Assert.AreEqual(ChannelState.Idle, channel.State);
}
channel.ShutdownAsync().Wait();
}
[Test]
public void WaitForStateChangedAsync_InvalidArgument()
{
using (var channel = new Channel("localhost", Credentials.Insecure))
{
var channel = new Channel("localhost", Credentials.Insecure);
Assert.Throws(typeof(ArgumentException), () => channel.WaitForStateChangedAsync(ChannelState.FatalFailure));
}
channel.ShutdownAsync().Wait();
}
[Test]
public void ResolvedTarget()
{
using (var channel = new Channel("127.0.0.1", Credentials.Insecure))
{
var channel = new Channel("127.0.0.1", Credentials.Insecure);
Assert.IsTrue(channel.ResolvedTarget.Contains("127.0.0.1"));
}
channel.ShutdownAsync().Wait();
}
[Test]
public void Dispose_IsIdempotent()
public void Shutdown_AllowedOnlyOnce()
{
var channel = new Channel("localhost", Credentials.Insecure);
channel.Dispose();
channel.Dispose();
channel.ShutdownAsync().Wait();
Assert.Throws(typeof(InvalidOperationException), () => channel.ShutdownAsync().GetAwaiter().GetResult());
}
}
}
......@@ -63,16 +63,10 @@ namespace Grpc.Core.Tests
[TearDown]
public void Cleanup()
{
channel.Dispose();
channel.ShutdownAsync().Wait();
server.ShutdownAsync().Wait();
}
[TestFixtureTearDown]
public void CleanupClass()
{
GrpcEnvironment.Shutdown();
}
[Test]
public async Task UnaryCall()
{
......@@ -207,13 +201,6 @@ namespace Grpc.Core.Tests
CollectionAssert.AreEqual(headers[1].ValueBytes, trailers[1].ValueBytes);
}
[Test]
public void UnaryCall_DisposedChannel()
{
channel.Dispose();
Assert.Throws(typeof(ObjectDisposedException), () => Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "ABC"));
}
[Test]
public void UnaryCallPerformance()
{
......
......@@ -62,16 +62,10 @@ namespace Grpc.Core.Tests
[TearDown]
public void Cleanup()
{
channel.Dispose();
channel.ShutdownAsync().Wait();
server.ShutdownAsync().Wait();
}
[TestFixtureTearDown]
public void CleanupClass()
{
GrpcEnvironment.Shutdown();
}
[Test]
public void WriteOptions_Unary()
{
......
......@@ -62,16 +62,10 @@ namespace Grpc.Core.Tests
[TearDown]
public void Cleanup()
{
channel.Dispose();
channel.ShutdownAsync().Wait();
server.ShutdownAsync().Wait();
}
[TestFixtureTearDown]
public void CleanupClass()
{
GrpcEnvironment.Shutdown();
}
[Test]
public async Task PropagateCancellation()
{
......
......@@ -64,6 +64,7 @@
<Link>Version.cs</Link>
</Compile>
<Compile Include="ClientBaseTest.cs" />
<Compile Include="ShutdownTest.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="ClientServerTest.cs" />
<Compile Include="ServerTest.cs" />
......
......@@ -43,33 +43,39 @@ namespace Grpc.Core.Tests
[Test]
public void InitializeAndShutdownGrpcEnvironment()
{
var env = GrpcEnvironment.GetInstance();
var env = GrpcEnvironment.AddRef();
Assert.IsNotNull(env.CompletionQueue);
GrpcEnvironment.Shutdown();
GrpcEnvironment.Release();
}
[Test]
public void SubsequentInvocations()
{
var env1 = GrpcEnvironment.GetInstance();
var env2 = GrpcEnvironment.GetInstance();
var env1 = GrpcEnvironment.AddRef();
var env2 = GrpcEnvironment.AddRef();
Assert.IsTrue(object.ReferenceEquals(env1, env2));
GrpcEnvironment.Shutdown();
GrpcEnvironment.Shutdown();
GrpcEnvironment.Release();
GrpcEnvironment.Release();
}
[Test]
public void InitializeAfterShutdown()
{
var env1 = GrpcEnvironment.GetInstance();
GrpcEnvironment.Shutdown();
var env1 = GrpcEnvironment.AddRef();
GrpcEnvironment.Release();
var env2 = GrpcEnvironment.GetInstance();
GrpcEnvironment.Shutdown();
var env2 = GrpcEnvironment.AddRef();
GrpcEnvironment.Release();
Assert.IsFalse(object.ReferenceEquals(env1, env2));
}
[Test]
public void ReleaseWithoutAddRef()
{
Assert.Throws(typeof(InvalidOperationException), () => GrpcEnvironment.Release());
}
[Test]
public void GetCoreVersionString()
{
......
......@@ -69,16 +69,10 @@ namespace Grpc.Core.Tests
[TearDown]
public void Cleanup()
{
channel.Dispose();
channel.ShutdownAsync().Wait();
server.ShutdownAsync().Wait();
}
[TestFixtureTearDown]
public void CleanupClass()
{
GrpcEnvironment.Shutdown();
}
[Test]
public void WriteResponseHeaders_NullNotAllowed()
{
......
......@@ -51,7 +51,6 @@ namespace Grpc.Core.Tests
};
server.Start();
server.ShutdownAsync().Wait();
GrpcEnvironment.Shutdown();
}
[Test]
......@@ -67,8 +66,7 @@ namespace Grpc.Core.Tests
Assert.Greater(boundPort.BoundPort, 0);
server.Start();
server.ShutdownAsync();
GrpcEnvironment.Shutdown();
server.ShutdownAsync().Wait();
}
[Test]
......@@ -83,7 +81,6 @@ namespace Grpc.Core.Tests
Assert.Throws(typeof(InvalidOperationException), () => server.Services.Add(ServerServiceDefinition.CreateBuilder("serviceName").Build()));
server.ShutdownAsync().Wait();
GrpcEnvironment.Shutdown();
}
}
}
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
using NUnit.Framework;
namespace Grpc.Core.Tests
{
public class ShutdownTest
{
const string Host = "127.0.0.1";
MockServiceHelper helper;
Server server;
Channel channel;
[SetUp]
public void Init()
{
helper = new MockServiceHelper(Host);
server = helper.GetServer();
server.Start();
channel = helper.GetChannel();
}
[Test]
public async Task AbandonedCall()
{
helper.DuplexStreamingHandler = new DuplexStreamingServerMethod<string, string>(async (requestStream, responseStream, context) =>
{
await requestStream.ToListAsync();
});
var call = Calls.AsyncDuplexStreamingCall(helper.CreateDuplexStreamingCall(new CallOptions(deadline: DateTime.UtcNow.AddMilliseconds(1))));
channel.ShutdownAsync().Wait();
server.ShutdownAsync().Wait();
}
}
}
......@@ -65,16 +65,10 @@ namespace Grpc.Core.Tests
[TearDown]
public void Cleanup()
{
channel.Dispose();
channel.ShutdownAsync().Wait();
server.ShutdownAsync().Wait();
}
[TestFixtureTearDown]
public void CleanupClass()
{
GrpcEnvironment.Shutdown();
}
[Test]
public void InfiniteDeadline()
{
......
......@@ -45,14 +45,19 @@ namespace Grpc.Core
/// <summary>
/// gRPC Channel
/// </summary>
public class Channel : IDisposable
public class Channel
{
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Channel>();
readonly object myLock = new object();
readonly AtomicCounter activeCallCounter = new AtomicCounter();
readonly string target;
readonly GrpcEnvironment environment;
readonly ChannelSafeHandle handle;
readonly List<ChannelOption> options;
bool shutdownRequested;
bool disposed;
/// <summary>
......@@ -65,7 +70,7 @@ namespace Grpc.Core
public Channel(string target, Credentials credentials, IEnumerable<ChannelOption> options = null)
{
this.target = Preconditions.CheckNotNull(target, "target");
this.environment = GrpcEnvironment.GetInstance();
this.environment = GrpcEnvironment.AddRef();
this.options = options != null ? new List<ChannelOption>(options) : new List<ChannelOption>();
EnsureUserAgentChannelOption(this.options);
......@@ -172,12 +177,26 @@ namespace Grpc.Core
}
/// <summary>
/// Destroys the underlying channel.
/// Waits until there are no more active calls for this channel and then cleans up
/// resources used by this channel.
/// </summary>
public void Dispose()
public async Task ShutdownAsync()
{
lock (myLock)
{
Preconditions.CheckState(!shutdownRequested);
shutdownRequested = true;
}
var activeCallCount = activeCallCounter.Count;
if (activeCallCount > 0)
{
Dispose(true);
GC.SuppressFinalize(this);
Logger.Warning("Channel shutdown was called but there are still {0} active calls for that channel.", activeCallCount);
}
handle.Dispose();
await Task.Run(() => GrpcEnvironment.Release());
}
internal ChannelSafeHandle Handle
......@@ -196,13 +215,20 @@ namespace Grpc.Core
}
}
protected virtual void Dispose(bool disposing)
internal void AddCallReference(object call)
{
if (disposing && handle != null && !disposed)
{
disposed = true;
handle.Dispose();
activeCallCounter.Increment();
bool success = false;
handle.DangerousAddRef(ref success);
Preconditions.CheckState(success);
}
internal void RemoveCallReference(object call)
{
handle.DangerousRelease();
activeCallCounter.Decrement();
}
private static void EnsureUserAgentChannelOption(List<ChannelOption> options)
......
......@@ -58,6 +58,7 @@ namespace Grpc.Core
static object staticLock = new object();
static GrpcEnvironment instance;
static int refCount;
static ILogger logger = new ConsoleLogger();
......@@ -67,13 +68,14 @@ namespace Grpc.Core
bool isClosed;
/// <summary>
/// Returns an instance of initialized gRPC environment.
/// Subsequent invocations return the same instance unless Shutdown has been called first.
/// Returns a reference-counted instance of initialized gRPC environment.
/// Subsequent invocations return the same instance unless reference count has dropped to zero previously.
/// </summary>
internal static GrpcEnvironment GetInstance()
internal static GrpcEnvironment AddRef()
{
lock (staticLock)
{
refCount++;
if (instance == null)
{
instance = new GrpcEnvironment();
......@@ -83,14 +85,16 @@ namespace Grpc.Core
}
/// <summary>
/// Shuts down the gRPC environment if it was initialized before.
/// Blocks until the environment has been fully shutdown.
/// Decrements the reference count for currently active environment and shuts down the gRPC environment if reference count drops to zero.
/// (and blocks until the environment has been fully shutdown).
/// </summary>
public static void Shutdown()
internal static void Release()
{
lock (staticLock)
{
if (instance != null)
Preconditions.CheckState(refCount > 0);
refCount--;
if (refCount == 0)
{
instance.Close();
instance = null;
......@@ -125,12 +129,10 @@ namespace Grpc.Core
private GrpcEnvironment()
{
NativeLogRedirector.Redirect();
grpcsharp_init();
GrpcNativeInit();
completionRegistry = new CompletionRegistry(this);
threadPool = new GrpcThreadPool(this, THREAD_POOL_SIZE);
threadPool.Start();
// TODO: use proper logging here
Logger.Info("gRPC initialized.");
}
/// <summary>
......@@ -175,6 +177,17 @@ namespace Grpc.Core
return Marshal.PtrToStringAnsi(ptr);
}
internal static void GrpcNativeInit()
{
grpcsharp_init();
}
internal static void GrpcNativeShutdown()
{
grpcsharp_shutdown();
}
/// <summary>
/// Shuts down this environment.
/// </summary>
......@@ -185,12 +198,10 @@ namespace Grpc.Core
throw new InvalidOperationException("Close has already been called");
}
threadPool.Stop();
grpcsharp_shutdown();
GrpcNativeShutdown();
isClosed = true;
debugStats.CheckOK();
Logger.Info("gRPC shutdown.");
}
}
}
......@@ -311,9 +311,9 @@ namespace Grpc.Core.Internal
}
}
protected override void OnReleaseResources()
protected override void OnAfterReleaseResources()
{
details.Channel.Environment.DebugStats.ActiveClientCalls.Decrement();
details.Channel.RemoveCallReference(this);
}
private void Initialize(CompletionQueueSafeHandle cq)
......@@ -323,7 +323,9 @@ namespace Grpc.Core.Internal
var call = details.Channel.Handle.CreateCall(details.Channel.Environment.CompletionRegistry,
parentCall, ContextPropagationToken.DefaultMask, cq,
details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline.Value));
details.Channel.Environment.DebugStats.ActiveClientCalls.Increment();
details.Channel.AddCallReference(this);
InitializeInternal(call);
RegisterCancellationCallback();
}
......
......@@ -189,15 +189,15 @@ namespace Grpc.Core.Internal
private void ReleaseResources()
{
OnReleaseResources();
if (call != null)
{
call.Dispose();
}
disposed = true;
OnAfterReleaseResources();
}
protected virtual void OnReleaseResources()
protected virtual void OnAfterReleaseResources()
{
}
......@@ -212,7 +212,7 @@ namespace Grpc.Core.Internal
Preconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time");
}
protected void CheckReadingAllowed()
protected virtual void CheckReadingAllowed()
{
Preconditions.CheckState(started);
Preconditions.CheckState(!disposed);
......
......@@ -50,16 +50,19 @@ namespace Grpc.Core.Internal
readonly TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>();
readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
readonly GrpcEnvironment environment;
readonly Server server;
public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, GrpcEnvironment environment) : base(serializer, deserializer)
public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, GrpcEnvironment environment, Server server) : base(serializer, deserializer)
{
this.environment = Preconditions.CheckNotNull(environment);
this.server = Preconditions.CheckNotNull(server);
}
public void Initialize(CallSafeHandle call)
{
call.SetCompletionRegistry(environment.CompletionRegistry);
environment.DebugStats.ActiveServerCalls.Increment();
server.AddCallReference(this);
InitializeInternal(call);
}
......@@ -168,9 +171,15 @@ namespace Grpc.Core.Internal
}
}
protected override void OnReleaseResources()
protected override void CheckReadingAllowed()
{
base.CheckReadingAllowed();
Preconditions.CheckArgument(!cancelRequested);
}
protected override void OnAfterReleaseResources()
{
environment.DebugStats.ActiveServerCalls.Decrement();
server.RemoveCallReference(this);
}
/// <summary>
......
......@@ -134,7 +134,7 @@ namespace Grpc.Core.Internal
}
// Gets data of server_rpc_new completion.
public ServerRpcNew GetServerRpcNew()
public ServerRpcNew GetServerRpcNew(Server server)
{
var call = grpcsharp_batch_context_server_rpc_new_call(this);
......@@ -145,7 +145,7 @@ namespace Grpc.Core.Internal
IntPtr metadataArrayPtr = grpcsharp_batch_context_server_rpc_new_request_metadata(this);
var metadata = MetadataArraySafeHandle.ReadMetadataFromPtrUnsafe(metadataArrayPtr);
return new ServerRpcNew(call, method, host, deadline, metadata);
return new ServerRpcNew(server, call, method, host, deadline, metadata);
}
// Gets data of receive_close_on_server completion.
......@@ -198,14 +198,16 @@ namespace Grpc.Core.Internal
/// </summary>
internal struct ServerRpcNew
{
readonly Server server;
readonly CallSafeHandle call;
readonly string method;
readonly string host;
readonly Timespec deadline;
readonly Metadata requestMetadata;
public ServerRpcNew(CallSafeHandle call, string method, string host, Timespec deadline, Metadata requestMetadata)
public ServerRpcNew(Server server, CallSafeHandle call, string method, string host, Timespec deadline, Metadata requestMetadata)
{
this.server = server;
this.call = call;
this.method = method;
this.host = host;
......@@ -213,6 +215,14 @@ namespace Grpc.Core.Internal
this.requestMetadata = requestMetadata;
}
public Server Server
{
get
{
return this.server;
}
}
public CallSafeHandle Call
{
get
......
......@@ -68,11 +68,17 @@ namespace Grpc.Core.Internal
public static ChannelSafeHandle CreateInsecure(string target, ChannelArgsSafeHandle channelArgs)
{
// Increment reference count for the native gRPC environment to make sure we don't do grpc_shutdown() before destroying the server handle.
// Doing so would make object finalizer crash if we end up abandoning the handle.
GrpcEnvironment.GrpcNativeInit();
return grpcsharp_insecure_channel_create(target, channelArgs);
}
public static ChannelSafeHandle CreateSecure(CredentialsSafeHandle credentials, string target, ChannelArgsSafeHandle channelArgs)
{
// Increment reference count for the native gRPC environment to make sure we don't do grpc_shutdown() before destroying the server handle.
// Doing so would make object finalizer crash if we end up abandoning the handle.
GrpcEnvironment.GrpcNativeInit();
return grpcsharp_secure_channel_create(credentials, target, channelArgs);
}
......@@ -107,6 +113,7 @@ namespace Grpc.Core.Internal
protected override bool ReleaseHandle()
{
grpcsharp_channel_destroy(handle);
GrpcEnvironment.GrpcNativeShutdown();
return true;
}
}
......
......@@ -38,10 +38,6 @@ namespace Grpc.Core.Internal
{
internal class DebugStats
{
public readonly AtomicCounter ActiveClientCalls = new AtomicCounter();
public readonly AtomicCounter ActiveServerCalls = new AtomicCounter();
public readonly AtomicCounter PendingBatchCompletions = new AtomicCounter();
/// <summary>
......@@ -49,16 +45,6 @@ namespace Grpc.Core.Internal
/// </summary>
public void CheckOK()
{
var remainingClientCalls = ActiveClientCalls.Count;
if (remainingClientCalls != 0)
{
DebugWarning(string.Format("Detected {0} client calls that weren't disposed properly.", remainingClientCalls));
}
var remainingServerCalls = ActiveServerCalls.Count;
if (remainingServerCalls != 0)
{
DebugWarning(string.Format("Detected {0} server calls that weren't disposed properly.", remainingServerCalls));
}
var pendingBatchCompletions = PendingBatchCompletions.Count;
if (pendingBatchCompletions != 0)
{
......
......@@ -83,8 +83,6 @@ namespace Grpc.Core.Internal
lock (myLock)
{
cq.Shutdown();
Logger.Info("Waiting for GRPC threads to finish.");
foreach (var thread in threads)
{
thread.Join();
......@@ -136,7 +134,6 @@ namespace Grpc.Core.Internal
}
}
while (ev.type != GRPCCompletionType.Shutdown);
Logger.Info("Completion queue has shutdown successfully, thread {0} exiting.", Thread.CurrentThread.Name);
}
}
}
......@@ -67,7 +67,7 @@ namespace Grpc.Core.Internal
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer,
environment);
environment, newRpc.Server);
asyncCall.Initialize(newRpc.Call);
var finishedTask = asyncCall.ServerSideCallAsync();
......@@ -123,7 +123,7 @@ namespace Grpc.Core.Internal
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer,
environment);
environment, newRpc.Server);
asyncCall.Initialize(newRpc.Call);
var finishedTask = asyncCall.ServerSideCallAsync();
......@@ -179,7 +179,7 @@ namespace Grpc.Core.Internal
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer,
environment);
environment, newRpc.Server);
asyncCall.Initialize(newRpc.Call);
var finishedTask = asyncCall.ServerSideCallAsync();
......@@ -239,7 +239,7 @@ namespace Grpc.Core.Internal
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer,
environment);
environment, newRpc.Server);
asyncCall.Initialize(newRpc.Call);
var finishedTask = asyncCall.ServerSideCallAsync();
......@@ -278,7 +278,7 @@ namespace Grpc.Core.Internal
{
// We don't care about the payload type here.
var asyncCall = new AsyncCallServer<byte[], byte[]>(
(payload) => payload, (payload) => payload, environment);
(payload) => payload, (payload) => payload, environment, newRpc.Server);
asyncCall.Initialize(newRpc.Call);
var finishedTask = asyncCall.ServerSideCallAsync();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment