Skip to content
Snippets Groups Projects
Commit b2c0b7bc authored by Alexander Polcyn's avatar Alexander Polcyn
Browse files

constant state watch without timeouts

parent 6be93970
No related branches found
No related tags found
No related merge requests found
Showing with 472 additions and 220 deletions
......@@ -61,6 +61,11 @@ def main
'channel is closed while connectivity is watched'
end
client_exit_code = $CHILD_STATUS
if client_exit_code != 0
fail "channel closing client failed, exit code #{client_exit_code}"
end
server_runner.stop
end
......
......@@ -58,6 +58,9 @@ def main
'It likely hangs when ended abruptly'
end
# The interrupt in the child process should cause it to
# exit a non-zero status, so don't check it here.
# This test mainly tries to catch deadlock.
server_runner.stop
end
......
......@@ -34,44 +34,81 @@
require_relative './end2end_common'
def main
grpc_class = ''
OptionParser.new do |opts|
opts.on('--grpc_class=P', String) do |p|
grpc_class = p
def construct_many(test_proc)
thds = []
4.times do
thds << Thread.new do
20.times do
test_proc.call
end
end
end.parse!
end
20.times do
test_proc.call
end
thds.each(&:join)
end
def run_gc_stress_test(test_proc)
GC.disable
construct_many(test_proc)
test_proc = nil
GC.enable
construct_many(test_proc)
GC.start(full_mark: true, immediate_sweep: true)
construct_many(test_proc)
end
def get_test_proc(grpc_class)
case grpc_class
when 'channel'
test_proc = proc do
return proc do
GRPC::Core::Channel.new('dummy_host', nil, :this_channel_is_insecure)
end
when 'server'
test_proc = proc do
return proc do
GRPC::Core::Server.new({})
end
when 'channel_credentials'
test_proc = proc do
return proc do
GRPC::Core::ChannelCredentials.new
end
when 'call_credentials'
test_proc = proc do
return proc do
GRPC::Core::CallCredentials.new(proc { |noop| noop })
end
when 'compression_options'
test_proc = proc do
return proc do
GRPC::Core::CompressionOptions.new
end
else
fail "bad --grpc_class=#{grpc_class} param"
end
end
def main
grpc_class = ''
gc_stress = false
OptionParser.new do |opts|
opts.on('--grpc_class=P', String) do |p|
grpc_class = p
end
opts.on('--gc_stress=P') do |p|
gc_stress = p
end
end.parse!
test_proc = get_test_proc(grpc_class)
if gc_stress == 'true'
run_gc_stress_test(test_proc)
return
end
th = Thread.new { test_proc.call }
thd = Thread.new { test_proc.call }
test_proc.call
th.join
thd.join
end
main
......@@ -38,29 +38,38 @@ def main
call_credentials
compression_options )
native_grpc_classes.each do |grpc_class|
STDERR.puts 'start client'
this_dir = File.expand_path(File.dirname(__FILE__))
client_path = File.join(this_dir, 'grpc_class_init_client.rb')
client_pid = Process.spawn(RbConfig.ruby,
client_path,
"--grpc_class=#{grpc_class}")
begin
Timeout.timeout(10) do
Process.wait(client_pid)
# there is room for false positives in this test,
# do 10 runs for each config to reduce these.
[true, false].each do |gc_stress|
10.times do
native_grpc_classes.each do |grpc_class|
STDERR.puts 'start client'
this_dir = File.expand_path(File.dirname(__FILE__))
client_path = File.join(this_dir, 'grpc_class_init_client.rb')
client_pid = Process.spawn(RbConfig.ruby,
client_path,
"--grpc_class=#{grpc_class}",
"--gc_stress=#{gc_stress}")
begin
Timeout.timeout(10) do
Process.wait(client_pid)
end
rescue Timeout::Error
STDERR.puts "timeout waiting for client pid #{client_pid}"
Process.kill('SIGKILL', client_pid)
Process.wait(client_pid)
STDERR.puts 'killed client child'
raise 'Timed out waiting for client process. ' \
'It likely hangs when the first constructed gRPC object has ' \
"type: #{grpc_class}"
end
client_exit_code = $CHILD_STATUS
if client_exit_code != 0
fail "client failed, exit code #{client_exit_code}"
end
end
rescue Timeout::Error
STDERR.puts "timeout waiting for client pid #{client_pid}"
Process.kill('SIGKILL', client_pid)
Process.wait(client_pid)
STDERR.puts 'killed client child'
raise 'Timed out waiting for client process. ' \
'It likely hangs when the first constructed gRPC object has ' \
"type: #{grpc_class}"
end
client_exit_code = $CHILD_STATUS
fail "client failed, exit code #{client_exit_code}" if client_exit_code != 0
end
end
......
......@@ -46,6 +46,8 @@ def main
end
end.parse!
trap('SIGINT') { exit 0 }
thd = Thread.new do
child_thread_channel = GRPC::Core::Channel.new("localhost:#{server_port}",
{},
......
......@@ -63,6 +63,11 @@ def main
'SIGINT is sent while there is an active connectivity_state call'
end
client_exit_code = $CHILD_STATUS
if client_exit_code != 0
fail "sig_int_during_channel_watch_client failed: #{client_exit_code}"
end
server_runner.stop
end
......
......@@ -103,7 +103,7 @@ static void destroy_call(grpc_rb_call *call) {
if (call->wrapped != NULL) {
grpc_call_destroy(call->wrapped);
call->wrapped = NULL;
grpc_rb_completion_queue_destroy(call->queue);
grpc_rb_completion_queue_safe_destroy(call->queue);
call->queue = NULL;
}
}
......
This diff is collapsed.
......@@ -71,12 +71,16 @@ static void *grpc_rb_completion_queue_pluck_no_gil(void *param) {
}
/* Helper function to free a completion queue. */
void grpc_rb_completion_queue_destroy(grpc_completion_queue *cq) {
/* Every function that adds an event to a queue also synchronously plucks
that event from the queue, and holds a reference to the Ruby object that
holds the queue, so we only get to this point if all of those functions
have completed, and the queue is empty */
void grpc_rb_completion_queue_safe_destroy(grpc_completion_queue *cq) {
grpc_event ev;
grpc_completion_queue_shutdown(cq);
for(;;) {
ev = grpc_completion_queue_pluck(cq, NULL, gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
if (ev.type == GRPC_QUEUE_SHUTDOWN) {
break;
}
}
grpc_completion_queue_destroy(cq);
}
......
......@@ -38,7 +38,7 @@
#include <grpc/grpc.h>
void grpc_rb_completion_queue_destroy(grpc_completion_queue *cq);
void grpc_rb_completion_queue_safe_destroy(grpc_completion_queue *cq);
/**
* Makes the implementation of CompletionQueue#pluck available in other files
......
......@@ -106,17 +106,17 @@ static void *grpc_rb_wait_for_event_no_gil(void *param) {
grpc_rb_event *event = NULL;
(void)param;
gpr_mu_lock(&event_queue.mu);
while ((event = grpc_rb_event_queue_dequeue()) == NULL) {
while (!event_queue.abort) {
if ((event = grpc_rb_event_queue_dequeue()) != NULL) {
gpr_mu_unlock(&event_queue.mu);
return event;
}
gpr_cv_wait(&event_queue.cv,
&event_queue.mu,
gpr_inf_future(GPR_CLOCK_REALTIME));
if (event_queue.abort) {
gpr_mu_unlock(&event_queue.mu);
return NULL;
}
}
gpr_mu_unlock(&event_queue.mu);
return event;
return NULL;
}
static void grpc_rb_event_unblocking_func(void *arg) {
......
......@@ -77,7 +77,7 @@ static void destroy_server(grpc_rb_server *server, gpr_timespec deadline) {
gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
}
grpc_server_destroy(server->wrapped);
grpc_rb_completion_queue_destroy(server->queue);
grpc_rb_completion_queue_safe_destroy(server->queue);
server->wrapped = NULL;
server->queue = NULL;
}
......
......@@ -28,6 +28,10 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
require 'grpc'
require 'timeout'
include Timeout
include GRPC::Core
# A test message
class EchoMsg
......@@ -62,7 +66,7 @@ end
EchoStub = EchoService.rpc_stub_class
def start_server(port = 0)
@srv = GRPC::RpcServer.new
@srv = GRPC::RpcServer.new(pool_size: 1)
server_port = @srv.add_http2_port("localhost:#{port}", :this_port_is_insecure)
@srv.handle(EchoService)
@server_thd = Thread.new { @srv.run }
......@@ -138,4 +142,33 @@ describe 'channel connection behavior' do
stop_server
end
it 'observably connects and reconnects to transient server' \
' when using the channel state API' do
timeout(180) do
port = start_server
ch = GRPC::Core::Channel.new("localhost:#{port}", {},
:this_channel_is_insecure)
stop_server
thds = []
50.times do
thds << Thread.new do
while ch.connectivity_state(true) != ConnectivityStates::READY
ch.watch_connectivity_state(
ConnectivityStates::READY, Time.now + 60)
break
end
end
end
sleep 0.01
start_server(port)
thds.each(&:join)
stop_server
end
end
end
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment