Skip to content
Snippets Groups Projects
Commit 42d858a9 authored by apolcyn's avatar apolcyn Committed by GitHub
Browse files

Merge pull request #11054 from apolcyn/ruby_dont_use_timeouts

fixes to connectivity state api and add in constant state watches
parents 0bb3986f 032f3985
No related branches found
No related tags found
No related merge requests found
...@@ -61,6 +61,11 @@ def main ...@@ -61,6 +61,11 @@ def main
'channel is closed while connectivity is watched' 'channel is closed while connectivity is watched'
end end
client_exit_code = $CHILD_STATUS
if client_exit_code != 0
fail "channel closing client failed, exit code #{client_exit_code}"
end
server_runner.stop server_runner.stop
end end
......
...@@ -58,6 +58,9 @@ def main ...@@ -58,6 +58,9 @@ def main
'It likely hangs when ended abruptly' 'It likely hangs when ended abruptly'
end end
# The interrupt in the child process should cause it to
# exit a non-zero status, so don't check it here.
# This test mainly tries to catch deadlock.
server_runner.stop server_runner.stop
end end
......
...@@ -34,44 +34,110 @@ ...@@ -34,44 +34,110 @@
require_relative './end2end_common' require_relative './end2end_common'
def main def construct_many(test_proc)
grpc_class = '' thds = []
OptionParser.new do |opts| 4.times do
opts.on('--grpc_class=P', String) do |p| thds << Thread.new do
grpc_class = p 20.times do
test_proc.call
end
end end
end.parse! end
20.times do
test_proc.call
end
thds.each(&:join)
end
def run_gc_stress_test(test_proc)
GC.disable
construct_many(test_proc)
test_proc = nil GC.enable
construct_many(test_proc)
GC.start(full_mark: true, immediate_sweep: true)
construct_many(test_proc)
end
def run_concurrency_stress_test(test_proc)
100.times do
Thread.new do
test_proc.call
end
end
test_proc.call
fail 'exception thrown while child thread initing class'
end
# default (no gc_stress and no concurrency_stress)
def run_default_test(test_proc)
thd = Thread.new do
test_proc.call
end
test_proc.call
thd.join
end
def get_test_proc(grpc_class)
case grpc_class case grpc_class
when 'channel' when 'channel'
test_proc = proc do return proc do
GRPC::Core::Channel.new('dummy_host', nil, :this_channel_is_insecure) GRPC::Core::Channel.new('dummy_host', nil, :this_channel_is_insecure)
end end
when 'server' when 'server'
test_proc = proc do return proc do
GRPC::Core::Server.new({}) GRPC::Core::Server.new({})
end end
when 'channel_credentials' when 'channel_credentials'
test_proc = proc do return proc do
GRPC::Core::ChannelCredentials.new GRPC::Core::ChannelCredentials.new
end end
when 'call_credentials' when 'call_credentials'
test_proc = proc do return proc do
GRPC::Core::CallCredentials.new(proc { |noop| noop }) GRPC::Core::CallCredentials.new(proc { |noop| noop })
end end
when 'compression_options' when 'compression_options'
test_proc = proc do return proc do
GRPC::Core::CompressionOptions.new GRPC::Core::CompressionOptions.new
end end
else else
fail "bad --grpc_class=#{grpc_class} param" fail "bad --grpc_class=#{grpc_class} param"
end end
end
th = Thread.new { test_proc.call } def main
test_proc.call grpc_class = ''
th.join stress_test = ''
OptionParser.new do |opts|
opts.on('--grpc_class=P', String) do |p|
grpc_class = p
end
opts.on('--stress_test=P') do |p|
stress_test = p
end
end.parse!
test_proc = get_test_proc(grpc_class)
# the different test configs need to be ran
# in separate processes, since each one tests
# clean shutdown in a different way
case stress_test
when 'gc'
p 'run gc stress'
run_gc_stress_test(test_proc)
when 'concurrency'
p 'run concurrency stress'
run_concurrency_stress_test(test_proc)
when ''
p 'run default'
run_default_test(test_proc)
else
fail "bad --stress_test=#{stress_test} param"
end
end end
main main
...@@ -38,29 +38,40 @@ def main ...@@ -38,29 +38,40 @@ def main
call_credentials call_credentials
compression_options ) compression_options )
native_grpc_classes.each do |grpc_class| # there is room for false positives in this test,
STDERR.puts 'start client' # do a few runs for each config
this_dir = File.expand_path(File.dirname(__FILE__)) 4.times do
client_path = File.join(this_dir, 'grpc_class_init_client.rb') native_grpc_classes.each do |grpc_class|
client_pid = Process.spawn(RbConfig.ruby, ['', 'gc', 'concurrency'].each do |stress_test_type|
client_path, STDERR.puts 'start client'
"--grpc_class=#{grpc_class}") this_dir = File.expand_path(File.dirname(__FILE__))
begin client_path = File.join(this_dir, 'grpc_class_init_client.rb')
Timeout.timeout(10) do client_pid = Process.spawn(RbConfig.ruby,
Process.wait(client_pid) client_path,
"--grpc_class=#{grpc_class}",
"--stress_test=#{stress_test_type}")
begin
Timeout.timeout(10) do
Process.wait(client_pid)
end
rescue Timeout::Error
STDERR.puts "timeout waiting for client pid #{client_pid}"
Process.kill('SIGKILL', client_pid)
Process.wait(client_pid)
STDERR.puts 'killed client child'
raise 'Timed out waiting for client process. ' \
'It likely hangs when the first constructed gRPC object has ' \
"type: #{grpc_class}"
end
client_exit_code = $CHILD_STATUS
# concurrency stress test type is expected to exit with a
# non-zero status due to an exception being raised
if client_exit_code != 0 && stress_test_type != 'concurrency'
fail "client failed, exit code #{client_exit_code}"
end
end end
rescue Timeout::Error
STDERR.puts "timeout waiting for client pid #{client_pid}"
Process.kill('SIGKILL', client_pid)
Process.wait(client_pid)
STDERR.puts 'killed client child'
raise 'Timed out waiting for client process. ' \
'It likely hangs when the first constructed gRPC object has ' \
"type: #{grpc_class}"
end end
client_exit_code = $CHILD_STATUS
fail "client failed, exit code #{client_exit_code}" if client_exit_code != 0
end end
end end
......
...@@ -46,6 +46,8 @@ def main ...@@ -46,6 +46,8 @@ def main
end end
end.parse! end.parse!
trap('SIGINT') { exit 0 }
thd = Thread.new do thd = Thread.new do
child_thread_channel = GRPC::Core::Channel.new("localhost:#{server_port}", child_thread_channel = GRPC::Core::Channel.new("localhost:#{server_port}",
{}, {},
......
...@@ -63,6 +63,11 @@ def main ...@@ -63,6 +63,11 @@ def main
'SIGINT is sent while there is an active connectivity_state call' 'SIGINT is sent while there is an active connectivity_state call'
end end
client_exit_code = $CHILD_STATUS
if client_exit_code != 0
fail "sig_int_during_channel_watch_client failed: #{client_exit_code}"
end
server_runner.stop server_runner.stop
end end
......
This diff is collapsed.
...@@ -106,17 +106,17 @@ static void *grpc_rb_wait_for_event_no_gil(void *param) { ...@@ -106,17 +106,17 @@ static void *grpc_rb_wait_for_event_no_gil(void *param) {
grpc_rb_event *event = NULL; grpc_rb_event *event = NULL;
(void)param; (void)param;
gpr_mu_lock(&event_queue.mu); gpr_mu_lock(&event_queue.mu);
while ((event = grpc_rb_event_queue_dequeue()) == NULL) { while (!event_queue.abort) {
if ((event = grpc_rb_event_queue_dequeue()) != NULL) {
gpr_mu_unlock(&event_queue.mu);
return event;
}
gpr_cv_wait(&event_queue.cv, gpr_cv_wait(&event_queue.cv,
&event_queue.mu, &event_queue.mu,
gpr_inf_future(GPR_CLOCK_REALTIME)); gpr_inf_future(GPR_CLOCK_REALTIME));
if (event_queue.abort) {
gpr_mu_unlock(&event_queue.mu);
return NULL;
}
} }
gpr_mu_unlock(&event_queue.mu); gpr_mu_unlock(&event_queue.mu);
return event; return NULL;
} }
static void grpc_rb_event_unblocking_func(void *arg) { static void grpc_rb_event_unblocking_func(void *arg) {
......
...@@ -295,11 +295,12 @@ static gpr_once g_once_init = GPR_ONCE_INIT; ...@@ -295,11 +295,12 @@ static gpr_once g_once_init = GPR_ONCE_INIT;
static void grpc_ruby_once_init_internal() { static void grpc_ruby_once_init_internal() {
grpc_init(); grpc_init();
grpc_rb_event_queue_thread_start();
grpc_rb_channel_polling_thread_start();
atexit(grpc_rb_shutdown); atexit(grpc_rb_shutdown);
} }
static VALUE bg_thread_init_rb_mu = Qundef;
static int bg_thread_init_done = 0;
void grpc_ruby_once_init() { void grpc_ruby_once_init() {
/* ruby_vm_at_exit doesn't seem to be working. It would crash once every /* ruby_vm_at_exit doesn't seem to be working. It would crash once every
* blue moon, and some users are getting it repeatedly. See the discussions * blue moon, and some users are getting it repeatedly. See the discussions
...@@ -312,6 +313,18 @@ void grpc_ruby_once_init() { ...@@ -312,6 +313,18 @@ void grpc_ruby_once_init() {
* schedule our initialization and destruction only once. * schedule our initialization and destruction only once.
*/ */
gpr_once_init(&g_once_init, grpc_ruby_once_init_internal); gpr_once_init(&g_once_init, grpc_ruby_once_init_internal);
// Avoid calling calling into ruby library (when creating threads here)
// in gpr_once_init. In general, it appears to be unsafe to call
// into the ruby library while holding a non-ruby mutex, because a gil yield
// could end up trying to lock onto that same mutex and deadlocking.
rb_mutex_lock(bg_thread_init_rb_mu);
if (!bg_thread_init_done) {
grpc_rb_event_queue_thread_start();
grpc_rb_channel_polling_thread_start();
bg_thread_init_done = 1;
}
rb_mutex_unlock(bg_thread_init_rb_mu);
} }
void Init_grpc_c() { void Init_grpc_c() {
...@@ -320,6 +333,9 @@ void Init_grpc_c() { ...@@ -320,6 +333,9 @@ void Init_grpc_c() {
return; return;
} }
bg_thread_init_rb_mu = rb_mutex_new();
rb_global_variable(&bg_thread_init_rb_mu);
grpc_rb_mGRPC = rb_define_module("GRPC"); grpc_rb_mGRPC = rb_define_module("GRPC");
grpc_rb_mGrpcCore = rb_define_module_under(grpc_rb_mGRPC, "Core"); grpc_rb_mGrpcCore = rb_define_module_under(grpc_rb_mGRPC, "Core");
grpc_rb_sNewServerRpc = grpc_rb_sNewServerRpc =
......
...@@ -28,6 +28,10 @@ ...@@ -28,6 +28,10 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
require 'grpc' require 'grpc'
require 'timeout'
include Timeout
include GRPC::Core
# A test message # A test message
class EchoMsg class EchoMsg
...@@ -62,7 +66,7 @@ end ...@@ -62,7 +66,7 @@ end
EchoStub = EchoService.rpc_stub_class EchoStub = EchoService.rpc_stub_class
def start_server(port = 0) def start_server(port = 0)
@srv = GRPC::RpcServer.new @srv = GRPC::RpcServer.new(pool_size: 1)
server_port = @srv.add_http2_port("localhost:#{port}", :this_port_is_insecure) server_port = @srv.add_http2_port("localhost:#{port}", :this_port_is_insecure)
@srv.handle(EchoService) @srv.handle(EchoService)
@server_thd = Thread.new { @srv.run } @server_thd = Thread.new { @srv.run }
...@@ -138,4 +142,32 @@ describe 'channel connection behavior' do ...@@ -138,4 +142,32 @@ describe 'channel connection behavior' do
stop_server stop_server
end end
it 'concurrent watches on the same channel' do
timeout(180) do
port = start_server
ch = GRPC::Core::Channel.new("localhost:#{port}", {},
:this_channel_is_insecure)
stop_server
thds = []
50.times do
thds << Thread.new do
while ch.connectivity_state(true) != ConnectivityStates::READY
ch.watch_connectivity_state(
ConnectivityStates::READY, Time.now + 60)
break
end
end
end
sleep 0.01
start_server(port)
thds.each(&:join)
stop_server
end
end
end end
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment