Skip to content
Snippets Groups Projects
Commit 37f3184b authored by Jan Tattermusch's avatar Jan Tattermusch
Browse files

Merge pull request #6611 from murgatroid99/ruby_signal_handling_improvement

Handle signals properly when dropping GVL
parents b9f11c3f 746ea97a
No related branches found
No related tags found
No related merge requests found
......@@ -52,21 +52,41 @@ typedef struct next_call_stack {
grpc_event event;
gpr_timespec timeout;
void *tag;
volatile int interrupted;
} next_call_stack;
/* Calls grpc_completion_queue_next without holding the ruby GIL */
static void *grpc_rb_completion_queue_next_no_gil(void *param) {
next_call_stack *const next_call = (next_call_stack*)param;
next_call->event =
grpc_completion_queue_next(next_call->cq, next_call->timeout, NULL);
gpr_timespec increment = gpr_time_from_millis(20, GPR_TIMESPAN);
gpr_timespec deadline;
do {
deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), increment);
next_call->event = grpc_completion_queue_next(next_call->cq,
deadline, NULL);
if (next_call->event.type != GRPC_QUEUE_TIMEOUT ||
gpr_time_cmp(deadline, next_call->timeout) > 0) {
break;
}
} while (!next_call->interrupted);
return NULL;
}
/* Calls grpc_completion_queue_pluck without holding the ruby GIL */
static void *grpc_rb_completion_queue_pluck_no_gil(void *param) {
next_call_stack *const next_call = (next_call_stack*)param;
next_call->event = grpc_completion_queue_pluck(next_call->cq, next_call->tag,
next_call->timeout, NULL);
gpr_timespec increment = gpr_time_from_millis(20, GPR_TIMESPAN);
gpr_timespec deadline;
do {
deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), increment);
next_call->event = grpc_completion_queue_pluck(next_call->cq,
next_call->tag,
deadline, NULL);
if (next_call->event.type != GRPC_QUEUE_TIMEOUT ||
gpr_time_cmp(deadline, next_call->timeout) > 0) {
break;
}
} while (!next_call->interrupted);
return NULL;
}
......@@ -139,6 +159,11 @@ static VALUE grpc_rb_completion_queue_alloc(VALUE cls) {
return TypedData_Wrap_Struct(cls, &grpc_rb_completion_queue_data_type, cq);
}
static void unblock_func(void *param) {
next_call_stack *const next_call = (next_call_stack*)param;
next_call->interrupted = 1;
}
/* Blocks until the next event for given tag is available, and returns the
* event. */
grpc_event grpc_rb_completion_queue_pluck_event(VALUE self, VALUE tag,
......@@ -158,8 +183,23 @@ grpc_event grpc_rb_completion_queue_pluck_event(VALUE self, VALUE tag,
next_call.tag = ROBJECT(tag);
}
next_call.event.type = GRPC_QUEUE_TIMEOUT;
rb_thread_call_without_gvl(grpc_rb_completion_queue_pluck_no_gil,
(void *)&next_call, NULL, NULL);
/* Loop until we finish a pluck without an interruption. The internal
pluck function runs either until it is interrupted or it gets an
event, or time runs out.
The basic reason we need this relatively complicated construction is that
we need to re-acquire the GVL when an interrupt comes in, so that the ruby
interpreter can do what it needs to do with the interrupt. But we also need
to get back to plucking when the interrupt has been handled. */
do {
next_call.interrupted = 0;
rb_thread_call_without_gvl(grpc_rb_completion_queue_pluck_no_gil,
(void *)&next_call, unblock_func,
(void *)&next_call);
/* If an interrupt prevented pluck from returning useful information, then
any plucks that did complete must have timed out */
} while (next_call.interrupted &&
next_call.event.type == GRPC_QUEUE_TIMEOUT);
return next_call.event;
}
......
......@@ -46,7 +46,7 @@ grpc_completion_queue *grpc_rb_get_wrapped_completion_queue(VALUE v);
*
* This avoids having code that holds the GIL repeated at multiple sites.
*/
grpc_event grpc_rb_completion_queue_pluck_event(VALUE cqueue, VALUE tag,
grpc_event grpc_rb_completion_queue_pluck_event(VALUE self, VALUE tag,
VALUE timeout);
/* Initializes the CompletionQueue class. */
......
......@@ -50,7 +50,6 @@
#include "rb_loader.h"
#include "rb_server.h"
#include "rb_server_credentials.h"
#include "rb_signal.h"
static VALUE grpc_rb_cTimeVal = Qnil;
......@@ -333,7 +332,6 @@ void Init_grpc_c() {
Init_grpc_channel_credentials();
Init_grpc_server();
Init_grpc_server_credentials();
Init_grpc_signals();
Init_grpc_status_codes();
Init_grpc_time_consts();
}
......@@ -60,6 +60,7 @@ typedef struct grpc_rb_server {
VALUE mark;
/* The actual server */
grpc_server *wrapped;
grpc_completion_queue *queue;
} grpc_rb_server;
/* Destroys server instances. */
......@@ -145,6 +146,7 @@ static VALUE grpc_rb_server_init(VALUE self, VALUE cqueue, VALUE channel_args) {
}
grpc_server_register_completion_queue(srv, cq, NULL);
wrapper->wrapped = srv;
wrapper->queue = cq;
/* Add the cq as the server's mark object. This ensures the ruby cq can't be
GCed before the server */
......
/*
*
* Copyright 2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include <ruby/ruby.h>
#include <signal.h>
#include <stdbool.h>
#include <grpc/support/log.h>
#include "rb_grpc.h"
static void (*old_sigint_handler)(int);
static void (*old_sigterm_handler)(int);
static volatile bool signal_received = false;
/* This has to be handled at the C level instead of Ruby, because Ruby signal
* handlers are constrained to run in the main interpreter thread. If that main
* thread is blocked on grpc_completion_queue_pluck, the signal handlers will
* never run */
static void handle_signal(int signum) {
signal_received = true;
if (signum == SIGINT) {
old_sigint_handler(signum);
} else if (signum == SIGTERM) {
old_sigterm_handler(signum);
}
}
static VALUE grpc_rb_signal_received(VALUE self) {
(void)self;
return signal_received ? Qtrue : Qfalse;
}
void Init_grpc_signals() {
old_sigint_handler = signal(SIGINT, handle_signal);
old_sigterm_handler = signal(SIGTERM, handle_signal);
rb_define_singleton_method(grpc_rb_mGrpcCore, "signal_received?",
grpc_rb_signal_received, 0);
}
/*
*
* Copyright 2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef GRPC_RB_SIGNAL_H_
#define GRPC_RB_SIGNAL_H_
void Init_grpc_signals();
#endif /* GRPC_RB_SIGNAL_H_ */
......@@ -33,7 +33,6 @@ require_relative 'grpc/errors'
require_relative 'grpc/grpc'
require_relative 'grpc/logconfig'
require_relative 'grpc/notifier'
require_relative 'grpc/signals'
require_relative 'grpc/version'
require_relative 'grpc/core/time_consts'
require_relative 'grpc/generic/active_call'
......@@ -48,5 +47,3 @@ begin
ensure
file.close
end
GRPC::Signals.wait_for_signals
......@@ -30,7 +30,6 @@
require 'forwardable'
require 'weakref'
require_relative 'bidi_call'
require_relative '../signals'
class Struct
# BatchResult is the struct returned by calls to call#start_batch.
......@@ -123,10 +122,6 @@ module GRPC
@unmarshal = unmarshal
@metadata_tag = metadata_tag
@op_notifier = nil
weak_self = WeakRef.new(self)
remove_handler = GRPC::Signals.register_handler(&weak_self
.method(:cancel))
ObjectSpace.define_finalizer(self, remove_handler)
end
# output_metadata are provides access to hash that can be used to
......
......@@ -28,7 +28,6 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
require_relative '../grpc'
require_relative '../signals'
require_relative 'active_call'
require_relative 'service'
require 'thread'
......@@ -353,10 +352,7 @@ module GRPC
transition_running_state(:running)
@run_cond.broadcast
end
remove_signal_handler = GRPC::Signals.register_handler { stop }
loop_handle_server_calls
# Remove signal handler when server stops
remove_signal_handler.call
end
alias_method :run_till_terminated, :run
......
# Copyright 2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
require 'thread'
require_relative 'grpc'
# GRPC contains the General RPC module.
module GRPC
# Signals contains gRPC functions related to signal handling
module Signals
@interpreter_exiting = false
@signal_handlers = []
@handlers_mutex = Mutex.new
def register_handler(&handler)
@handlers_mutex.synchronize do
@signal_handlers.push(handler)
handler.call if @exit_signal_received
end
# Returns a function to remove the handler
lambda do
@handlers_mutex.synchronize { @signal_handlers.delete(handler) }
end
end
module_function :register_handler
def wait_for_signals
t = Thread.new do
sleep 0.1 until GRPC::Core.signal_received? || @interpreter_exiting
unless @interpreter_exiting
@handlers_mutex.synchronize do
@signal_handlers.each(&:call)
end
end
end
at_exit do
@interpreter_exiting = true
t.join
end
end
module_function :wait_for_signals
end
end
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment