diff --git a/BUILD b/BUILD index e2860fb0532ebc8402dba5f89301e4aa445a7e2e..e862898eaa95c61f31e234c675f2a9ede09979f6 100644 --- a/BUILD +++ b/BUILD @@ -620,6 +620,7 @@ cc_library( "include/grpc++/generic_stub.h", "include/grpc++/impl/call.h", "include/grpc++/impl/client_unary_call.h", + "include/grpc++/impl/grpc_library.h", "include/grpc++/impl/internal_stub.h", "include/grpc++/impl/rpc_method.h", "include/grpc++/impl/rpc_service_method.h", @@ -698,6 +699,7 @@ cc_library( "include/grpc++/generic_stub.h", "include/grpc++/impl/call.h", "include/grpc++/impl/client_unary_call.h", + "include/grpc++/impl/grpc_library.h", "include/grpc++/impl/internal_stub.h", "include/grpc++/impl/rpc_method.h", "include/grpc++/impl/rpc_service_method.h", diff --git a/Makefile b/Makefile index f30624fbd47b42919459f0f3100e8e03c1006294..64d092867dd9daafb7a8216120ce73a0db839215 100644 --- a/Makefile +++ b/Makefile @@ -3854,6 +3854,7 @@ PUBLIC_HEADERS_CXX += \ include/grpc++/generic_stub.h \ include/grpc++/impl/call.h \ include/grpc++/impl/client_unary_call.h \ + include/grpc++/impl/grpc_library.h \ include/grpc++/impl/internal_stub.h \ include/grpc++/impl/rpc_method.h \ include/grpc++/impl/rpc_service_method.h \ @@ -4118,6 +4119,7 @@ PUBLIC_HEADERS_CXX += \ include/grpc++/generic_stub.h \ include/grpc++/impl/call.h \ include/grpc++/impl/client_unary_call.h \ + include/grpc++/impl/grpc_library.h \ include/grpc++/impl/internal_stub.h \ include/grpc++/impl/rpc_method.h \ include/grpc++/impl/rpc_service_method.h \ diff --git a/build.json b/build.json index 9574df5006745c7dce6dda36ebc7ee5eb26b1a03..8888979160a98cd90b1bbc00783da72684213a95 100644 --- a/build.json +++ b/build.json @@ -28,6 +28,7 @@ "include/grpc++/generic_stub.h", "include/grpc++/impl/call.h", "include/grpc++/impl/client_unary_call.h", + "include/grpc++/impl/grpc_library.h", "include/grpc++/impl/internal_stub.h", "include/grpc++/impl/rpc_method.h", "include/grpc++/impl/rpc_service_method.h", diff --git a/examples/pubsub/main.cc b/examples/pubsub/main.cc index cf0f21eace9e2e3ab186b902d4e73e3f7c8f6c4d..68620e64c5c280f63632fa68c6f5bdd56b439a49 100644 --- a/examples/pubsub/main.cc +++ b/examples/pubsub/main.cc @@ -64,7 +64,6 @@ const char kMessageData[] = "Test Data"; } // namespace int main(int argc, char** argv) { - grpc_init(); grpc::testing::InitTest(&argc, &argv, true); gpr_log(GPR_INFO, "Start PUBSUB client"); @@ -145,7 +144,5 @@ int main(int argc, char** argv) { subscriber.Shutdown(); publisher.Shutdown(); - channel.reset(); - grpc_shutdown(); return 0; } diff --git a/examples/pubsub/publisher_test.cc b/examples/pubsub/publisher_test.cc index ac4921283f4bdfd20fc412ab90a7099d65f5823b..6b9dcacc499a8ee228cc8cae7019a2804c796535 100644 --- a/examples/pubsub/publisher_test.cc +++ b/examples/pubsub/publisher_test.cc @@ -148,10 +148,8 @@ TEST_F(PublisherTest, TestPublisher) { int main(int argc, char** argv) { grpc_test_init(argc, argv); - grpc_init(); ::testing::InitGoogleTest(&argc, argv); gpr_log(GPR_INFO, "Start test ..."); int result = RUN_ALL_TESTS(); - grpc_shutdown(); return result; } diff --git a/examples/pubsub/subscriber_test.cc b/examples/pubsub/subscriber_test.cc index 9ab60ed6a760eabf34c946da8cafb74f623b0f71..b0e7fc034b8e46d8a3db5e821c9ec0cc9519a632 100644 --- a/examples/pubsub/subscriber_test.cc +++ b/examples/pubsub/subscriber_test.cc @@ -147,10 +147,8 @@ TEST_F(SubscriberTest, TestSubscriber) { int main(int argc, char** argv) { grpc_test_init(argc, argv); - grpc_init(); ::testing::InitGoogleTest(&argc, argv); gpr_log(GPR_INFO, "Start test ..."); int result = RUN_ALL_TESTS(); - grpc_shutdown(); return result; } diff --git a/include/grpc++/completion_queue.h b/include/grpc++/completion_queue.h index ba390c96e9d6e0949b94b62a7e179704ae102c43..5c2b1cce93d1557f9c139598be6d930834d268df 100644 --- a/include/grpc++/completion_queue.h +++ b/include/grpc++/completion_queue.h @@ -36,6 +36,7 @@ #include <grpc/support/time.h> #include <grpc++/impl/client_unary_call.h> +#include <grpc++/impl/grpc_library.h> #include <grpc++/time.h> struct grpc_completion_queue; @@ -71,11 +72,11 @@ class CompletionQueueTag { }; // grpc_completion_queue wrapper class -class CompletionQueue { +class CompletionQueue : public GrpcLibrary { public: CompletionQueue(); explicit CompletionQueue(grpc_completion_queue* take); - ~CompletionQueue(); + ~CompletionQueue() GRPC_OVERRIDE; // Tri-state return for AsyncNext: SHUTDOWN, GOT_EVENT, TIMEOUT enum NextStatus { SHUTDOWN, GOT_EVENT, TIMEOUT }; diff --git a/include/grpc++/credentials.h b/include/grpc++/credentials.h index a193bba2c748d5242c07a187aeb585dde936dfbc..61c40946910301cd35f205c58d17addfd69d1775 100644 --- a/include/grpc++/credentials.h +++ b/include/grpc++/credentials.h @@ -37,15 +37,16 @@ #include <memory> #include <grpc++/config.h> +#include <grpc++/impl/grpc_library.h> namespace grpc { class ChannelArguments; class ChannelInterface; class SecureCredentials; -class Credentials { +class Credentials : public GrpcLibrary { public: - virtual ~Credentials(); + ~Credentials() GRPC_OVERRIDE; protected: friend std::unique_ptr<Credentials> CompositeCredentials( diff --git a/include/grpc++/impl/grpc_library.h b/include/grpc++/impl/grpc_library.h new file mode 100644 index 0000000000000000000000000000000000000000..f9fa677901b799cca3f39bce162e9d18625f646b --- /dev/null +++ b/include/grpc++/impl/grpc_library.h @@ -0,0 +1,50 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPCXX_IMPL_GRPC_LIBRARY_H +#define GRPCXX_IMPL_GRPC_LIBRARY_H + +#include <grpc/grpc.h> + +namespace grpc { + +class GrpcLibrary { + public: + GrpcLibrary() { grpc_init(); } + virtual ~GrpcLibrary() { grpc_shutdown(); } +}; + +} // namespace grpc + + +#endif // GRPCXX_IMPL_GRPC_LIBRARY_H diff --git a/include/grpc++/server.h b/include/grpc++/server.h index eb5061157357f9dfffaac0e77fab68ee41226f29..0ae27e9e9f8d3ccb2277779673a9412962ee3964 100644 --- a/include/grpc++/server.h +++ b/include/grpc++/server.h @@ -40,6 +40,7 @@ #include <grpc++/completion_queue.h> #include <grpc++/config.h> #include <grpc++/impl/call.h> +#include <grpc++/impl/grpc_library.h> #include <grpc++/impl/service_type.h> #include <grpc++/impl/sync.h> #include <grpc++/status.h> @@ -56,7 +57,8 @@ class ServerCredentials; class ThreadPoolInterface; // Currently it only supports handling rpcs in a single thread. -class Server GRPC_FINAL : private CallHook, +class Server GRPC_FINAL : public GrpcLibrary, + private CallHook, private AsynchronousService::DispatchImpl { public: ~Server(); diff --git a/src/cpp/client/channel.h b/src/cpp/client/channel.h index aaf4dbe10dcb1ff3490a9d48d01cac4a3c5b8f24..cd239247c8270c9518f39d27a068515c5dd31353 100644 --- a/src/cpp/client/channel.h +++ b/src/cpp/client/channel.h @@ -38,6 +38,7 @@ #include <grpc++/channel_interface.h> #include <grpc++/config.h> +#include <grpc++/impl/grpc_library.h> struct grpc_channel; @@ -49,7 +50,8 @@ class CompletionQueue; class Credentials; class StreamContextInterface; -class Channel GRPC_FINAL : public ChannelInterface { +class Channel GRPC_FINAL : public GrpcLibrary, + public ChannelInterface { public: Channel(const grpc::string& target, grpc_channel* c_channel); ~Channel() GRPC_OVERRIDE; diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs index 14add60c7289a4b80e2fee9d72819bebd5b2d5a8..c97a3bc2b16901f091dede5b901372fd189ef898 100644 --- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs @@ -33,6 +33,7 @@ using System; using System.Diagnostics; using System.Runtime.InteropServices; using Grpc.Core; +using Grpc.Core.Utils; namespace Grpc.Core.Internal { @@ -180,7 +181,7 @@ namespace Grpc.Core.Internal private static void AssertCallOk(GRPCCallError callError) { - Trace.Assert(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK"); + Preconditions.CheckState(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK"); } private static uint GetFlags(bool buffered) diff --git a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs index a59da0982215178791f698a96c66d65b0a16fa6a..8080643d8c18eb6b7cd008d0e77071d9491e9e82 100644 --- a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs @@ -35,6 +35,7 @@ using System; using System.Collections.Concurrent; using System.Diagnostics; using System.Runtime.InteropServices; +using Grpc.Core.Utils; namespace Grpc.Core.Internal { @@ -105,9 +106,9 @@ namespace Grpc.Core.Internal grpcsharp_server_shutdown_and_notify_CALLBACK(this, callback); } - public GRPCCallError RequestCall(CompletionQueueSafeHandle cq, CompletionCallbackDelegate callback) + public void RequestCall(CompletionQueueSafeHandle cq, CompletionCallbackDelegate callback) { - return grpcsharp_server_request_call(this, cq, callback); + AssertCallOk(grpcsharp_server_request_call(this, cq, callback)); } protected override bool ReleaseHandle() @@ -115,5 +116,10 @@ namespace Grpc.Core.Internal grpcsharp_server_destroy(handle); return true; } + + private static void AssertCallOk(GRPCCallError callError) + { + Preconditions.CheckState(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK"); + } } } diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs index f086fa8beb957124d6ef22066eaf80103a2150d0..e686cdddef782c6bbfd16175f5778d6753d970ba 100644 --- a/src/csharp/Grpc.Core/Server.cs +++ b/src/csharp/Grpc.Core/Server.cs @@ -38,27 +38,29 @@ using System.Diagnostics; using System.Runtime.InteropServices; using System.Threading.Tasks; using Grpc.Core.Internal; +using Grpc.Core.Utils; namespace Grpc.Core { /// <summary> - /// Server is implemented only to be able to do - /// in-process testing. + /// A gRPC server. /// </summary> public class Server { - // TODO: make sure the delegate doesn't get garbage collected while + // TODO(jtattermusch) : make sure the delegate doesn't get garbage collected while // native callbacks are in the completion queue. readonly ServerShutdownCallbackDelegate serverShutdownHandler; readonly CompletionCallbackDelegate newServerRpcHandler; - readonly BlockingCollection<NewRpcInfo> newRpcQueue = new BlockingCollection<NewRpcInfo>(); readonly ServerSafeHandle handle; + readonly object myLock = new object(); readonly Dictionary<string, IServerCallHandler> callHandlers = new Dictionary<string, IServerCallHandler>(); - readonly TaskCompletionSource<object> shutdownTcs = new TaskCompletionSource<object>(); + bool startRequested; + bool shutdownRequested; + public Server() { this.handle = ServerSafeHandle.NewServer(GetCompletionQueue(), IntPtr.Zero); @@ -66,71 +68,81 @@ namespace Grpc.Core this.serverShutdownHandler = HandleServerShutdown; } - // only call this before Start() + /// <summary> + /// Adds a service definition to the server. This is how you register + /// handlers for a service with the server. + /// Only call this before Start(). + /// </summary> public void AddServiceDefinition(ServerServiceDefinition serviceDefinition) { - foreach (var entry in serviceDefinition.CallHandlers) + lock (myLock) { - callHandlers.Add(entry.Key, entry.Value); + Preconditions.CheckState(!startRequested); + foreach (var entry in serviceDefinition.CallHandlers) + { + callHandlers.Add(entry.Key, entry.Value); + } } } - // only call before Start() + /// <summary> + /// Add a non-secure port on which server should listen. + /// Only call this before Start(). + /// </summary> public int AddListeningPort(string addr) { - return handle.AddListeningPort(addr); - } - - // only call before Start() - public int AddListeningPort(string addr, ServerCredentials credentials) - { - using (var nativeCredentials = credentials.ToNativeCredentials()) + lock (myLock) { - return handle.AddListeningPort(addr, nativeCredentials); + Preconditions.CheckState(!startRequested); + return handle.AddListeningPort(addr); } } - public void Start() - { - handle.Start(); - - // TODO: this basically means the server is single threaded.... - StartHandlingRpcs(); - } - /// <summary> - /// Requests and handles single RPC call. + /// Add a secure port on which server should listen. + /// Only call this before Start(). /// </summary> - internal void RunRpc() + public int AddListeningPort(string addr, ServerCredentials credentials) { - AllowOneRpc(); - - try + lock (myLock) { - var rpcInfo = newRpcQueue.Take(); - - // Console.WriteLine("Server received RPC " + rpcInfo.Method); - - IServerCallHandler callHandler; - if (!callHandlers.TryGetValue(rpcInfo.Method, out callHandler)) + Preconditions.CheckState(!startRequested); + using (var nativeCredentials = credentials.ToNativeCredentials()) { - callHandler = new NoSuchMethodCallHandler(); + return handle.AddListeningPort(addr, nativeCredentials); } - callHandler.StartCall(rpcInfo.Method, rpcInfo.Call, GetCompletionQueue()); } - catch (Exception e) + } + + /// <summary> + /// Starts the server. + /// </summary> + public void Start() + { + lock (myLock) { - Console.WriteLine("Exception while handling RPC: " + e); + Preconditions.CheckState(!startRequested); + startRequested = true; + + handle.Start(); + AllowOneRpc(); } } /// <summary> /// Requests server shutdown and when there are no more calls being serviced, - /// cleans up used resources. + /// cleans up used resources. The returned task finishes when shutdown procedure + /// is complete. /// </summary> - /// <returns>The async.</returns> public async Task ShutdownAsync() { + lock (myLock) + { + Preconditions.CheckState(startRequested); + Preconditions.CheckState(!shutdownRequested); + shutdownRequested = true; + } + handle.ShutdownAndNotify(serverShutdownHandler); await shutdownTcs.Task; handle.Dispose(); @@ -152,19 +164,43 @@ namespace Grpc.Core handle.Dispose(); } - private async Task StartHandlingRpcs() + /// <summary> + /// Allows one new RPC call to be received by server. + /// </summary> + private void AllowOneRpc() { - while (true) + lock (myLock) { - await Task.Factory.StartNew(RunRpc); + if (!shutdownRequested) + { + handle.RequestCall(GetCompletionQueue(), newServerRpcHandler); + } } } - private void AllowOneRpc() + /// <summary> + /// Selects corresponding handler for given call and handles the call. + /// </summary> + private void InvokeCallHandler(CallSafeHandle call, string method) { - AssertCallOk(handle.RequestCall(GetCompletionQueue(), newServerRpcHandler)); + try + { + IServerCallHandler callHandler; + if (!callHandlers.TryGetValue(method, out callHandler)) + { + callHandler = new NoSuchMethodCallHandler(); + } + callHandler.StartCall(method, call, GetCompletionQueue()); + } + catch (Exception e) + { + Console.WriteLine("Exception while handling RPC: " + e); + } } + /// <summary> + /// Handles the native callback. + /// </summary> private void HandleNewServerRpc(GRPCOpError error, IntPtr batchContextPtr) { try @@ -176,13 +212,16 @@ namespace Grpc.Core // TODO: handle error } - var rpcInfo = new NewRpcInfo(ctx.GetServerRpcNewCall(), ctx.GetServerRpcNewMethod()); + CallSafeHandle call = ctx.GetServerRpcNewCall(); + string method = ctx.GetServerRpcNewMethod(); // after server shutdown, the callback returns with null call - if (!rpcInfo.Call.IsInvalid) + if (!call.IsInvalid) { - newRpcQueue.Add(rpcInfo); + Task.Run(() => InvokeCallHandler(call, method)); } + + AllowOneRpc(); } catch (Exception e) { @@ -190,6 +229,10 @@ namespace Grpc.Core } } + /// <summary> + /// Handles native callback. + /// </summary> + /// <param name="eventPtr"></param> private void HandleServerShutdown(IntPtr eventPtr) { try @@ -202,42 +245,9 @@ namespace Grpc.Core } } - private static void AssertCallOk(GRPCCallError callError) - { - Trace.Assert(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK"); - } - private static CompletionQueueSafeHandle GetCompletionQueue() { return GrpcEnvironment.ThreadPool.CompletionQueue; } - - private struct NewRpcInfo - { - private CallSafeHandle call; - private string method; - - public NewRpcInfo(CallSafeHandle call, string method) - { - this.call = call; - this.method = method; - } - - public CallSafeHandle Call - { - get - { - return this.call; - } - } - - public string Method - { - get - { - return this.method; - } - } - } } } diff --git a/src/csharp/Grpc.Examples.MathServer/MathServer.cs b/src/csharp/Grpc.Examples.MathServer/MathServer.cs index f7429fb43f87b8d46095fca058cdb9a9890814ee..abc7ef05e4b28d57a072eff4e101665728eea6c3 100644 --- a/src/csharp/Grpc.Examples.MathServer/MathServer.cs +++ b/src/csharp/Grpc.Examples.MathServer/MathServer.cs @@ -40,7 +40,7 @@ namespace math { public static void Main(string[] args) { - String host = "0.0.0.0"; + string host = "0.0.0.0"; GrpcEnvironment.Initialize(); diff --git a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj index 584bf1068de88e688acab3b3a8cf93d2606d2e51..6ae8041fb7cdddd9ccfc0d430fd14daad9310c08 100644 --- a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj +++ b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj @@ -80,5 +80,7 @@ <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory> </None> </ItemGroup> - <ItemGroup /> + <ItemGroup> + <Service Include="{82A7F48D-3B50-4B1E-B82E-3ADA8210C358}" /> + </ItemGroup> </Project> \ No newline at end of file diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c index 6da7d3c830532e8113ddebb90f73250c5f5b1eea..f4ae6fab8467f434894867ed01b50169b76d0376 100644 --- a/src/ruby/ext/grpc/rb_call.c +++ b/src/ruby/ext/grpc/rb_call.c @@ -607,19 +607,19 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag, rb_raise(grpc_rb_eCallError, "grpc_call_start_batch failed with %s (code=%d)", grpc_call_error_detail_of(err), err); - return; + return Qnil; } ev = grpc_rb_completion_queue_pluck_event(cqueue, tag, timeout); if (ev == NULL) { grpc_run_batch_stack_cleanup(&st); rb_raise(grpc_rb_eOutOfTime, "grpc_call_start_batch timed out"); - return; + return Qnil; } if (ev->data.op_complete != GRPC_OP_OK) { grpc_run_batch_stack_cleanup(&st); rb_raise(grpc_rb_eCallError, "start_batch completion failed, (code=%d)", ev->data.op_complete); - return; + return Qnil; } /* Build and return the BatchResult struct result */ diff --git a/src/ruby/grpc.gemspec b/src/ruby/grpc.gemspec index c633579102531d5f43c73b56ffb628a917000f31..19b3e21cb6d1f170008a6bce8a02c85e2005fbfd 100755 --- a/src/ruby/grpc.gemspec +++ b/src/ruby/grpc.gemspec @@ -26,7 +26,7 @@ Gem::Specification.new do |s| s.add_dependency 'google-protobuf', '~> 3.0.0alpha.1.1' s.add_dependency 'googleauth', '~> 0.4' # reqd for interop tests - s.add_dependency 'logging', '~> 1.8' + s.add_dependency 'logging', '~> 2.0' s.add_dependency 'minitest', '~> 5.4' # reqd for interop tests s.add_development_dependency 'simplecov', '~> 0.9' diff --git a/src/ruby/lib/grpc.rb b/src/ruby/lib/grpc.rb index b0f68035cd6d8dae0653333ac51c98ddaf8c9ee9..80b5743e9145b638dd0f33d9d55e1efdb990a83d 100644 --- a/src/ruby/lib/grpc.rb +++ b/src/ruby/lib/grpc.rb @@ -30,6 +30,7 @@ require 'grpc/errors' require 'grpc/grpc' require 'grpc/logconfig' +require 'grpc/notifier' require 'grpc/version' require 'grpc/core/time_consts' require 'grpc/generic/active_call' diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index 43ba549905986c5f175a5d7b80ca0ad73f9a5486..947c39cd2265685ef9c9ebd937fd0360a9eca441 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -188,7 +188,7 @@ module GRPC # @param marshalled [false, true] indicates if the object is already # marshalled. def remote_send(req, marshalled = false) - logger.debug("sending #{req.inspect}, marshalled? #{marshalled}") + logger.debug("sending #{req}, marshalled? #{marshalled}") if marshalled payload = req else diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb index b813ab5b542420df20986b40513c5cc81e1003bb..4ca3004d6f0755052b3764d4dc0c9140f5690a6e 100644 --- a/src/ruby/lib/grpc/generic/bidi_call.rb +++ b/src/ruby/lib/grpc/generic/bidi_call.rb @@ -123,8 +123,7 @@ module GRPC break if req.equal?(END_OF_READS) yield req end - @loop_th.join - @enq_th.join + @enq_th.join if @enq_th.alive? end # during bidi-streaming, read the requests to send from a separate thread diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index 88c24aa92bbdc18e14e13694e025c69ae3eaa1d0..3375fcf20ac2be27ad021d071a47d321d5d920c5 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -54,6 +54,18 @@ module GRPC end module_function :handle_signals + # Sets up a signal handler that adds signals to the signal handling global. + # + # Signal handlers should do as little as humanly possible. + # Here, they just add themselves to $grpc_signals + # + # RpcServer (and later other parts of gRPC) monitors the signals + # $grpc_signals in its own non-signal context. + def trap_signals + %w(INT TERM).each { |sig| trap(sig) { $grpc_signals << sig } } + end + module_function :trap_signals + # Pool is a simple thread pool. class Pool # Default keep alive period is 1s @@ -172,17 +184,6 @@ module GRPC # Signal check period is 0.25s SIGNAL_CHECK_PERIOD = 0.25 - # Sets up a signal handler that adds signals to the signal handling global. - # - # Signal handlers should do as little as humanly possible. - # Here, they just add themselves to $grpc_signals - # - # RpcServer (and later other parts of gRPC) monitors the signals - # $grpc_signals in its own non-signal context. - def self.trap_signals - %w(INT TERM).each { |sig| trap(sig) { $grpc_signals << sig } } - end - # setup_cq is used by #initialize to constuct a Core::CompletionQueue from # its arguments. def self.setup_cq(alt_cq) @@ -299,12 +300,12 @@ module GRPC # Runs the server in its own thread, then waits for signal INT or TERM on # the current thread to terminate it. def run_till_terminated - self.class.trap_signals + GRPC.trap_signals t = Thread.new { run } wait_till_running loop do sleep SIGNAL_CHECK_PERIOD - break unless handle_signals + break unless GRPC.handle_signals end stop t.join diff --git a/src/ruby/lib/grpc/notifier.rb b/src/ruby/lib/grpc/notifier.rb new file mode 100644 index 0000000000000000000000000000000000000000..caa18bbed6de4f30fa899ba6011e2e9e95d28317 --- /dev/null +++ b/src/ruby/lib/grpc/notifier.rb @@ -0,0 +1,60 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +# GRPC contains the General RPC module. +module GRPC + # Notifier is useful high-level synchronization primitive. + class Notifier + attr_reader :payload, :notified + alias_method :notified?, :notified + + def initialize + @mutex = Mutex.new + @cvar = ConditionVariable.new + @notified = false + @payload = nil + end + + def wait + @mutex.synchronize do + @cvar.wait(@mutex) until notified? + end + end + + def notify(payload) + @mutex.synchronize do + return Error.new('already notified') if notified? + @payload = payload + @notified = true + @cvar.signal + return nil + end + end + end +end diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb index 88c6b44c229aa99f2afaf20c1d0e9e585f1269c7..98d68ccfbb89d775f8304700d3d806bc00fdc6cd 100644 --- a/src/ruby/spec/generic/client_stub_spec.rb +++ b/src/ruby/spec/generic/client_stub_spec.rb @@ -29,37 +29,8 @@ require 'grpc' -# Notifier is useful high-level synchronization primitive. -class Notifier - attr_reader :payload, :notified - alias_method :notified?, :notified - - def initialize - @mutex = Mutex.new - @cvar = ConditionVariable.new - @notified = false - @payload = nil - end - - def wait - @mutex.synchronize do - @cvar.wait(@mutex) until notified? - end - end - - def notify(payload) - @mutex.synchronize do - return Error.new('already notified') if notified? - @payload = payload - @notified = true - @cvar.signal - return nil - end - end -end - def wakey_thread(&blk) - n = Notifier.new + n = GRPC::Notifier.new t = Thread.new do blk.call(n) end diff --git a/test/cpp/client/credentials_test.cc b/test/cpp/client/credentials_test.cc index d7ea09a5f1fe85a0a99c47fa2ace7fae3dec4bde..6840418989cb1bad0bf524bbfba8b895e7a8eeab 100644 --- a/test/cpp/client/credentials_test.cc +++ b/test/cpp/client/credentials_test.cc @@ -56,8 +56,6 @@ TEST_F(CredentialsTest, InvalidServiceAccountCreds) { int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); - grpc_init(); int ret = RUN_ALL_TESTS(); - grpc_shutdown(); return ret; } diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index c5d6840bcc8d779d1fd0270a474eee77ecc19948..6c0dfadbb912b644487e98e7bc459227eff8b9db 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -594,9 +594,6 @@ TEST_F(AsyncEnd2endTest, MetadataRpc) { int main(int argc, char** argv) { grpc_test_init(argc, argv); - grpc_init(); ::testing::InitGoogleTest(&argc, argv); - int result = RUN_ALL_TESTS(); - grpc_shutdown(); - return result; + return RUN_ALL_TESTS(); } diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index 07b53193cebcb03b790621dbacedd70b990e07ae..1ad467aa23b87dd1acde6dba48768d2230296cf9 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -561,9 +561,6 @@ TEST_F(End2endTest, ClientCancelsBidi) { int main(int argc, char** argv) { grpc_test_init(argc, argv); - grpc_init(); ::testing::InitGoogleTest(&argc, argv); - int result = RUN_ALL_TESTS(); - grpc_shutdown(); - return result; + return RUN_ALL_TESTS(); } diff --git a/test/cpp/end2end/generic_end2end_test.cc b/test/cpp/end2end/generic_end2end_test.cc index ef3d0addd691102b60dda38b62b59a1464feec0c..103f613f70efa23b1b7edab8fe733a812cc9d071 100644 --- a/test/cpp/end2end/generic_end2end_test.cc +++ b/test/cpp/end2end/generic_end2end_test.cc @@ -279,9 +279,6 @@ TEST_F(GenericEnd2endTest, SimpleBidiStreaming) { int main(int argc, char** argv) { grpc_test_init(argc, argv); - grpc_init(); ::testing::InitGoogleTest(&argc, argv); - int result = RUN_ALL_TESTS(); - grpc_shutdown(); - return result; + return RUN_ALL_TESTS(); } diff --git a/test/cpp/interop/client.cc b/test/cpp/interop/client.cc index 42d16f2f85efa8a603bee063dafc324d6dfb4204..072968f7cdc6a6572ae37a838de0f7363b44aafa 100644 --- a/test/cpp/interop/client.cc +++ b/test/cpp/interop/client.cc @@ -76,8 +76,6 @@ using grpc::testing::CreateChannelForTestCase; using grpc::testing::GetServiceAccountJsonKey; int main(int argc, char** argv) { - grpc_init(); - grpc::testing::InitTest(&argc, &argv, true); int ret = 0; @@ -129,8 +127,6 @@ int main(int argc, char** argv) { FLAGS_test_case.c_str()); ret = 1; } - client.Reset(nullptr); - grpc_shutdown(); return ret; } diff --git a/test/cpp/interop/server.cc b/test/cpp/interop/server.cc index 7888102837a393de4a1b6f55371b998a2518c6cc..22b8910a249d8fc39310ee1ce760224fce2cba5b 100644 --- a/test/cpp/interop/server.cc +++ b/test/cpp/interop/server.cc @@ -218,13 +218,11 @@ void RunServer() { static void sigint_handler(int x) { got_sigint = true; } int main(int argc, char** argv) { - grpc_init(); grpc::testing::InitTest(&argc, &argv, true); signal(SIGINT, sigint_handler); GPR_ASSERT(FLAGS_port != 0); RunServer(); - grpc_shutdown(); return 0; } diff --git a/test/cpp/qps/qps_driver.cc b/test/cpp/qps/qps_driver.cc index 0669ccf808bf9781cd4ce57f91aad03398edba6c..93b1247d73911831e51d53d64f326268d50795d9 100644 --- a/test/cpp/qps/qps_driver.cc +++ b/test/cpp/qps/qps_driver.cc @@ -69,7 +69,6 @@ using grpc::testing::RpcType; using grpc::testing::ResourceUsage; int main(int argc, char** argv) { - grpc_init(); grpc::testing::InitTest(&argc, &argv, true); RpcType rpc_type; @@ -104,6 +103,5 @@ int main(int argc, char** argv) { ReportLatency(result); ReportTimes(result); - grpc_shutdown(); return 0; } diff --git a/test/cpp/qps/smoke_test.cc b/test/cpp/qps/smoke_test.cc index 9531913b00ee0791efa97c9aebf523682770f9f1..e390730832555a81403947d59e0bea2a48e3724e 100644 --- a/test/cpp/qps/smoke_test.cc +++ b/test/cpp/qps/smoke_test.cc @@ -136,14 +136,11 @@ static void RunQPS() { } // namespace grpc int main(int argc, char** argv) { - grpc_init(); - using namespace grpc::testing; RunSynchronousStreamingPingPong(); RunSynchronousUnaryPingPong(); RunAsyncUnaryPingPong(); RunQPS(); - grpc_shutdown(); return 0; } diff --git a/test/cpp/qps/worker.cc b/test/cpp/qps/worker.cc index 896a85cc593cc6165e272a7789dde6092ebaf116..281c617382a662dcbf24a42d5d76dd2509606b10 100644 --- a/test/cpp/qps/worker.cc +++ b/test/cpp/qps/worker.cc @@ -64,13 +64,11 @@ static void RunServer() { } // namespace grpc int main(int argc, char** argv) { - grpc_init(); grpc::testing::InitTest(&argc, &argv, true); signal(SIGINT, sigint_handler); grpc::testing::RunServer(); - - grpc_shutdown(); + return 0; } diff --git a/test/cpp/util/cli_call_test.cc b/test/cpp/util/cli_call_test.cc index 32ef392cc4c0368b97562d64fb65d6d82a056c27..457a5e77de8d1b6d5f6263a5bb8b27f96936e433 100644 --- a/test/cpp/util/cli_call_test.cc +++ b/test/cpp/util/cli_call_test.cc @@ -123,9 +123,6 @@ TEST_F(CliCallTest, SimpleRpc) { int main(int argc, char** argv) { grpc_test_init(argc, argv); - grpc_init(); ::testing::InitGoogleTest(&argc, argv); - int result = RUN_ALL_TESTS(); - grpc_shutdown(); - return result; + return RUN_ALL_TESTS(); } diff --git a/test/cpp/util/grpc_cli.cc b/test/cpp/util/grpc_cli.cc index ee9f2752737bcdba92fcaa02cce7509ddd652f57..d71a7a0b778dc1400cc2706d7dcf94b1d595530a 100644 --- a/test/cpp/util/grpc_cli.cc +++ b/test/cpp/util/grpc_cli.cc @@ -79,8 +79,6 @@ DEFINE_string(output_binary_file, "output.bin", "Path to output file to write serialized response."); int main(int argc, char** argv) { - grpc_init(); - grpc::testing::InitTest(&argc, &argv, true); if (argc < 4 || grpc::string(argv[1]) != "call") { @@ -127,7 +125,5 @@ int main(int argc, char** argv) { output_file << response; } - channel.reset(); - grpc_shutdown(); return 0; } diff --git a/vsprojects/grpc++/grpc++.vcxproj b/vsprojects/grpc++/grpc++.vcxproj index 50f3061393d5b206e1480d9d23d1736e1d8a4395..ac3191a8b2d0b1bca169b3a11657f22194f1b266 100644 --- a/vsprojects/grpc++/grpc++.vcxproj +++ b/vsprojects/grpc++/grpc++.vcxproj @@ -96,6 +96,7 @@ <ClInclude Include="..\..\include\grpc++\generic_stub.h" /> <ClInclude Include="..\..\include\grpc++\impl\call.h" /> <ClInclude Include="..\..\include\grpc++\impl\client_unary_call.h" /> + <ClInclude Include="..\..\include\grpc++\impl\grpc_library.h" /> <ClInclude Include="..\..\include\grpc++\impl\internal_stub.h" /> <ClInclude Include="..\..\include\grpc++\impl\rpc_method.h" /> <ClInclude Include="..\..\include\grpc++\impl\rpc_service_method.h" /> diff --git a/vsprojects/grpc++/grpc++.vcxproj.filters b/vsprojects/grpc++/grpc++.vcxproj.filters index 698f1cecc8a58ed77674c5954f7f9be014f82de4..d5eeb7179074badce1ff38f0f4c2cfb0a1eea5ad 100644 --- a/vsprojects/grpc++/grpc++.vcxproj.filters +++ b/vsprojects/grpc++/grpc++.vcxproj.filters @@ -120,6 +120,9 @@ <ClInclude Include="..\..\include\grpc++\impl\client_unary_call.h"> <Filter>include\grpc++\impl</Filter> </ClInclude> + <ClInclude Include="..\..\include\grpc++\impl\grpc_library.h"> + <Filter>include\grpc++\impl</Filter> + </ClInclude> <ClInclude Include="..\..\include\grpc++\impl\internal_stub.h"> <Filter>include\grpc++\impl</Filter> </ClInclude>