diff --git a/src/ruby/end2end/multiple_killed_watching_threads_driver.rb b/src/ruby/end2end/multiple_killed_watching_threads_driver.rb new file mode 100755 index 0000000000000000000000000000000000000000..206ec8e801f189512ed867b096336f536ad032d9 --- /dev/null +++ b/src/ruby/end2end/multiple_killed_watching_threads_driver.rb @@ -0,0 +1,63 @@ +#!/usr/bin/env ruby + +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +require_relative './end2end_common' + +Thread.abort_on_exception = true + +include GRPC::Core::ConnectivityStates + +def watch_state(ch) + thd = Thread.new do + state = ch.connectivity_state(false) + fail "non-idle state: #{state}" unless state == IDLE + ch.watch_connectivity_state(IDLE, Time.now + 360) + end + sleep 0.1 + thd.kill +end + +def main + channels = [] + 10.times do + ch = GRPC::Core::Channel.new('dummy_host', + nil, :this_channel_is_insecure) + watch_state(ch) + channels << ch + end + + # checking state should still be safe to call + channels.each do |c| + fail unless c.connectivity_state(false) == FATAL_FAILURE + end +end + +main diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c index e02dd0805d358bbd8d80b98811e5c1b2b0c7ca69..7aaa71eeafc86fa2b7bc33e864f995c2362029bf 100644 --- a/src/ruby/ext/grpc/rb_channel.c +++ b/src/ruby/ext/grpc/rb_channel.c @@ -107,6 +107,7 @@ static bg_watched_channel *bg_watched_channel_list_head = NULL; static void grpc_rb_channel_try_register_connection_polling( bg_watched_channel *bg); static void *wait_until_channel_polling_thread_started_no_gil(void *); +static void wait_until_channel_polling_thread_started_unblocking_func(void *); static void *channel_init_try_register_connection_polling_without_gil( void *arg); @@ -227,12 +228,15 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) { char *target_chars = NULL; grpc_channel_args args; channel_init_try_register_stack stack; + int stop_waiting_for_thread_start = 0; MEMZERO(&args, grpc_channel_args, 1); grpc_ruby_once_init(); - rb_thread_call_without_gvl(wait_until_channel_polling_thread_started_no_gil, - NULL, run_poll_channels_loop_unblocking_func, - NULL); + rb_thread_call_without_gvl( + wait_until_channel_polling_thread_started_no_gil, + &stop_waiting_for_thread_start, + wait_until_channel_polling_thread_started_unblocking_func, + &stop_waiting_for_thread_start); /* "3" == 3 mandatory args */ rb_scan_args(argc, argv, "3", &target, &channel_args, &credentials); @@ -273,7 +277,7 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) { } typedef struct get_state_stack { - grpc_channel *channel; + bg_watched_channel *bg; int try_to_connect; int out; } get_state_stack; @@ -283,14 +287,10 @@ static void *get_state_without_gil(void *arg) { gpr_mu_lock(&global_connection_polling_mu); GPR_ASSERT(abort_channel_polling || channel_polling_thread_started); - if (abort_channel_polling) { - // Assume that this channel has been destroyed by the - // background thread. - // The case in which the channel polling thread - // failed to start just always shows shutdown state. + if (stack->bg->channel_destroyed) { stack->out = GRPC_CHANNEL_SHUTDOWN; } else { - stack->out = grpc_channel_check_connectivity_state(stack->channel, + stack->out = grpc_channel_check_connectivity_state(stack->bg->channel, stack->try_to_connect); } gpr_mu_unlock(&global_connection_polling_mu); @@ -322,7 +322,7 @@ static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv, return Qnil; } - stack.channel = wrapper->bg_wrapped->channel; + stack.bg = wrapper->bg_wrapped; stack.try_to_connect = RTEST(try_to_connect_param) ? 1 : 0; rb_thread_call_without_gvl(get_state_without_gil, &stack, NULL, NULL); @@ -366,6 +366,16 @@ static void *wait_for_watch_state_op_complete_without_gvl(void *arg) { return success; } +static void wait_for_watch_state_op_complete_unblocking_func(void *arg) { + bg_watched_channel *bg = (bg_watched_channel *)arg; + gpr_mu_lock(&global_connection_polling_mu); + if (!bg->channel_destroyed) { + grpc_channel_destroy(bg->channel); + bg->channel_destroyed = 1; + } + gpr_mu_unlock(&global_connection_polling_mu); +} + /* Wait until the channel's connectivity state becomes different from * "last_state", or "deadline" expires. * Returns true if the the channel's connectivity state becomes @@ -400,7 +410,7 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self, op_success = rb_thread_call_without_gvl( wait_for_watch_state_op_complete_without_gvl, &stack, - run_poll_channels_loop_unblocking_func, NULL); + wait_for_watch_state_op_complete_unblocking_func, wrapper->bg_wrapped); return op_success ? Qtrue : Qfalse; } @@ -577,11 +587,7 @@ static void grpc_rb_channel_try_register_connection_polling( return; } GPR_ASSERT(bg->refcount == 1); - if (bg->channel_destroyed) { - GPR_ASSERT(abort_channel_polling); - return; - } - if (abort_channel_polling) { + if (bg->channel_destroyed || abort_channel_polling) { return; } @@ -697,10 +703,11 @@ static VALUE run_poll_channels_loop(VALUE arg) { } static void *wait_until_channel_polling_thread_started_no_gil(void *arg) { - (void)arg; + int *stop_waiting = (int *)arg; gpr_log(GPR_DEBUG, "GRPC_RUBY: wait for channel polling thread to start"); gpr_mu_lock(&global_connection_polling_mu); - while (!channel_polling_thread_started && !abort_channel_polling) { + while (!channel_polling_thread_started && !abort_channel_polling && + !*stop_waiting) { gpr_cv_wait(&global_connection_polling_cv, &global_connection_polling_mu, gpr_inf_future(GPR_CLOCK_REALTIME)); } @@ -709,6 +716,17 @@ static void *wait_until_channel_polling_thread_started_no_gil(void *arg) { return NULL; } +static void wait_until_channel_polling_thread_started_unblocking_func( + void *arg) { + int *stop_waiting = (int *)arg; + gpr_mu_lock(&global_connection_polling_mu); + gpr_log(GPR_DEBUG, + "GRPC_RUBY: interrupt wait for channel polling thread to start"); + *stop_waiting = 1; + gpr_cv_broadcast(&global_connection_polling_cv); + gpr_mu_unlock(&global_connection_polling_mu); +} + static void *set_abort_channel_polling_without_gil(void *arg) { (void)arg; gpr_mu_lock(&global_connection_polling_mu); diff --git a/tools/run_tests/helper_scripts/run_ruby_end2end_tests.sh b/tools/run_tests/helper_scripts/run_ruby_end2end_tests.sh index 6688025260ff0db6e3bcc2378495992b96786788..ab882d62bc7268bda1d22cc22f468c27582e2625 100755 --- a/tools/run_tests/helper_scripts/run_ruby_end2end_tests.sh +++ b/tools/run_tests/helper_scripts/run_ruby_end2end_tests.sh @@ -41,4 +41,5 @@ ruby src/ruby/end2end/sig_int_during_channel_watch_driver.rb || EXIT_CODE=1 ruby src/ruby/end2end/killed_client_thread_driver.rb || EXIT_CODE=1 ruby src/ruby/end2end/forking_client_driver.rb || EXIT_CODE=1 ruby src/ruby/end2end/grpc_class_init_driver.rb || EXIT_CODE=1 +ruby src/ruby/end2end/multiple_killed_watching_threads_driver.rb || EXIT_CODE=1 exit $EXIT_CODE