Skip to content
Snippets Groups Projects
Commit be30114a authored by Alexander Polcyn's avatar Alexander Polcyn
Browse files

fix up tests and remove two unlocks in a row bug

parent fcad5799
No related branches found
No related tags found
No related merge requests found
...@@ -40,11 +40,39 @@ $LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir) ...@@ -40,11 +40,39 @@ $LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir)
require 'grpc' require 'grpc'
require 'helloworld_services_pb' require 'helloworld_services_pb'
$int_count = 0
def shut_down_term
puts "term sig"
$int_count += 1
if $int_count > 4
exit
end
end
def shut_down_kill
puts "kill sig"
$int_count += 1
if $int_count > 4
exit
end
end
def main def main
stub = Helloworld::Greeter::Stub.new('localhost:50051', :this_channel_is_insecure) stub = Helloworld::Greeter::Stub.new('localhost:50051', :this_channel_is_insecure)
user = ARGV.size > 0 ? ARGV[0] : 'world' user = ARGV.size > 0 ? ARGV[0] : 'world'
message = stub.say_hello(Helloworld::HelloRequest.new(name: user)).message Signal.trap("TERM") do
p "Greeting: #{message}" shut_down_term
end
Signal.trap("INT") do
shut_down_kill
end
loop do
message = stub.say_hello(Helloworld::HelloRequest.new(name: user)).message
p "Greeting: #{message}"
sleep 4
end
end end
main main
...@@ -89,7 +89,7 @@ static void grpc_rb_channel_try_register_connection_polling( ...@@ -89,7 +89,7 @@ static void grpc_rb_channel_try_register_connection_polling(
static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper); static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper);
static grpc_completion_queue *channel_polling_cq; static grpc_completion_queue *channel_polling_cq;
static gpr_mu channel_polling_mu; static gpr_mu global_connection_polling_mu;
static int abort_channel_polling = 0; static int abort_channel_polling = 0;
/* Destroys Channel instances. */ /* Destroys Channel instances. */
...@@ -188,13 +188,13 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) { ...@@ -188,13 +188,13 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) {
gpr_mu_lock(&wrapper->channel_mu); gpr_mu_lock(&wrapper->channel_mu);
wrapper->current_connectivity_state = grpc_channel_check_connectivity_state(wrapper->wrapped, 0); wrapper->current_connectivity_state = grpc_channel_check_connectivity_state(wrapper->wrapped, 0);
gpr_cv_signal(&wrapper->channel_cv);
gpr_mu_unlock(&wrapper->channel_mu);
gpr_mu_lock(&wrapper->channel_mu);
wrapper->safe_to_destroy = 0; wrapper->safe_to_destroy = 0;
wrapper->request_safe_destroy = 0; wrapper->request_safe_destroy = 0;
gpr_cv_signal(&wrapper->channel_cv);
gpr_mu_unlock(&wrapper->channel_mu); gpr_mu_unlock(&wrapper->channel_mu);
grpc_rb_channel_try_register_connection_polling(wrapper); grpc_rb_channel_try_register_connection_polling(wrapper);
if (args.args != NULL) { if (args.args != NULL) {
...@@ -277,7 +277,6 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self, ...@@ -277,7 +277,6 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self,
} }
if (wrapper->safe_to_destroy) { if (wrapper->safe_to_destroy) {
gpr_mu_unlock(&wrapper->channel_mu); gpr_mu_unlock(&wrapper->channel_mu);
gpr_log(GPR_DEBUG, "GRPC_RUBY_RB_CHANNEL: attempt to watch_connectivity_state on a non-state-polled channel");
return Qfalse; return Qfalse;
} }
gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, grpc_rb_time_timeval(deadline, /* absolute time */ 0)); gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, grpc_rb_time_timeval(deadline, /* absolute time */ 0));
...@@ -394,7 +393,7 @@ static VALUE grpc_rb_channel_get_target(VALUE self) { ...@@ -394,7 +393,7 @@ static VALUE grpc_rb_channel_get_target(VALUE self) {
// destroy. // destroy.
// Not safe to call while a channel's connection state is polled. // Not safe to call while a channel's connection state is polled.
static void grpc_rb_channel_try_register_connection_polling( static void grpc_rb_channel_try_register_connection_polling(
grpc_rb_channel *wrapper) { grpc_rb_channel *wrapper) {
grpc_connectivity_state conn_state; grpc_connectivity_state conn_state;
gpr_timespec sleep_time = gpr_time_add( gpr_timespec sleep_time = gpr_time_add(
gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(20, GPR_TIMESPAN)); gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(20, GPR_TIMESPAN));
...@@ -408,9 +407,8 @@ static void grpc_rb_channel_try_register_connection_polling( ...@@ -408,9 +407,8 @@ static void grpc_rb_channel_try_register_connection_polling(
gpr_mu_unlock(&wrapper->channel_mu); gpr_mu_unlock(&wrapper->channel_mu);
return; return;
} }
gpr_mu_lock(&channel_polling_mu); gpr_mu_lock(&global_connection_polling_mu);
gpr_mu_lock(&wrapper->channel_mu);
conn_state = grpc_channel_check_connectivity_state(wrapper->wrapped, 0); conn_state = grpc_channel_check_connectivity_state(wrapper->wrapped, 0);
if (conn_state != wrapper->current_connectivity_state) { if (conn_state != wrapper->current_connectivity_state) {
wrapper->current_connectivity_state = conn_state; wrapper->current_connectivity_state = conn_state;
...@@ -424,8 +422,7 @@ static void grpc_rb_channel_try_register_connection_polling( ...@@ -424,8 +422,7 @@ static void grpc_rb_channel_try_register_connection_polling(
wrapper->safe_to_destroy = 1; wrapper->safe_to_destroy = 1;
gpr_cv_signal(&wrapper->channel_cv); gpr_cv_signal(&wrapper->channel_cv);
} }
gpr_mu_unlock(&wrapper->channel_mu); gpr_mu_unlock(&global_connection_polling_mu);
gpr_mu_unlock(&channel_polling_mu);
gpr_mu_unlock(&wrapper->channel_mu); gpr_mu_unlock(&wrapper->channel_mu);
} }
...@@ -454,7 +451,6 @@ static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper) { ...@@ -454,7 +451,6 @@ static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper) {
// early and falls back to current behavior. // early and falls back to current behavior.
static void *run_poll_channels_loop_no_gil(void *arg) { static void *run_poll_channels_loop_no_gil(void *arg) {
grpc_event event; grpc_event event;
grpc_rb_channel *wrapper;
(void)arg; (void)arg;
for (;;) { for (;;) {
event = grpc_completion_queue_next( event = grpc_completion_queue_next(
...@@ -463,27 +459,28 @@ static void *run_poll_channels_loop_no_gil(void *arg) { ...@@ -463,27 +459,28 @@ static void *run_poll_channels_loop_no_gil(void *arg) {
break; break;
} }
if (event.type == GRPC_OP_COMPLETE) { if (event.type == GRPC_OP_COMPLETE) {
wrapper = (grpc_rb_channel *)event.tag; grpc_rb_channel_try_register_connection_polling((grpc_rb_channel *)event.tag);
grpc_rb_channel_try_register_connection_polling(wrapper);
} }
} }
grpc_completion_queue_destroy(channel_polling_cq); grpc_completion_queue_destroy(channel_polling_cq);
gpr_log(GPR_DEBUG, "GRPC_RUBY: run_poll_channels_loop_no_gil - exit connection polling loop");
return NULL; return NULL;
} }
// Notify the channel polling loop to cleanup and shutdown. // Notify the channel polling loop to cleanup and shutdown.
static void grpc_rb_event_unblocking_func(void *arg) { static void grpc_rb_event_unblocking_func(void *arg) {
(void)arg; (void)arg;
gpr_mu_lock(&channel_polling_mu); gpr_mu_lock(&global_connection_polling_mu);
gpr_log(GPR_DEBUG, "GRPC_RUBY: grpc_rb_event_unblocking_func - begin aborting connection polling");
abort_channel_polling = 1; abort_channel_polling = 1;
grpc_completion_queue_shutdown(channel_polling_cq); grpc_completion_queue_shutdown(channel_polling_cq);
gpr_mu_unlock(&channel_polling_mu); gpr_mu_unlock(&global_connection_polling_mu);
} }
// Poll channel connectivity states in background thread without the GIL. // Poll channel connectivity states in background thread without the GIL.
static VALUE run_poll_channels_loop(VALUE arg) { static VALUE run_poll_channels_loop(VALUE arg) {
(void)arg; (void)arg;
gpr_log(GPR_DEBUG, "GRPC_RUBY: run_poll_channels_loop - create connection polling thread");
rb_thread_call_without_gvl(run_poll_channels_loop_no_gil, NULL, rb_thread_call_without_gvl(run_poll_channels_loop_no_gil, NULL,
grpc_rb_event_unblocking_func, NULL); grpc_rb_event_unblocking_func, NULL);
return Qnil; return Qnil;
...@@ -501,7 +498,7 @@ static VALUE run_poll_channels_loop(VALUE arg) { ...@@ -501,7 +498,7 @@ static VALUE run_poll_channels_loop(VALUE arg) {
*/ */
static void start_poll_channels_loop() { static void start_poll_channels_loop() {
channel_polling_cq = grpc_completion_queue_create(NULL); channel_polling_cq = grpc_completion_queue_create(NULL);
gpr_mu_init(&channel_polling_mu); gpr_mu_init(&global_connection_polling_mu);
abort_channel_polling = 0; abort_channel_polling = 0;
rb_thread_create(run_poll_channels_loop, NULL); rb_thread_create(run_poll_channels_loop, NULL);
} }
......
...@@ -63,7 +63,7 @@ EchoStub = EchoService.rpc_stub_class ...@@ -63,7 +63,7 @@ EchoStub = EchoService.rpc_stub_class
def start_server(port = 0) def start_server(port = 0)
@srv = GRPC::RpcServer.new @srv = GRPC::RpcServer.new
server_port = @srv.add_http2_port("0.0.0.0:#{port}", :this_port_is_insecure) server_port = @srv.add_http2_port("localhost:#{port}", :this_port_is_insecure)
@srv.handle(EchoService) @srv.handle(EchoService)
@server_thd = Thread.new { @srv.run } @server_thd = Thread.new { @srv.run }
@srv.wait_till_running @srv.wait_till_running
...@@ -84,24 +84,25 @@ describe 'channel connection behavior' do ...@@ -84,24 +84,25 @@ describe 'channel connection behavior' do
req = EchoMsg.new req = EchoMsg.new
expect(stub.an_rpc(req)).to be_a(EchoMsg) expect(stub.an_rpc(req)).to be_a(EchoMsg)
stop_server stop_server
expect { stub.an_rpc(req) }.to raise_error(GRPC::Unavailable) sleep 1
# TODO(apolcyn) grabbing the same port might fail, is this stable enough? # TODO(apolcyn) grabbing the same port might fail, is this stable enough?
start_server(port) start_server(port)
expect(stub.an_rpc(req)).to be_a(EchoMsg) expect(stub.an_rpc(req)).to be_a(EchoMsg)
stop_server stop_server
end end
it 'observably connects and reconnects to transient server when using the channel state API', trial: true do it 'observably connects and reconnects to transient server' \
'when using the channel state API' do
port = start_server port = start_server
ch = GRPC::Core::Channel.new("localhost:#{port}", {}, :this_channel_is_insecure) ch = GRPC::Core::Channel.new("localhost:#{port}", {},
:this_channel_is_insecure)
expect(ch.connectivity_state).to be(GRPC::Core::ConnectivityStates::IDLE) expect(ch.connectivity_state).to be(GRPC::Core::ConnectivityStates::IDLE)
state = ch.connectivity_state(true) state = ch.connectivity_state(true)
count = 0 count = 0
while count < 20 and state != GRPC::Core::ConnectivityStates::READY do while count < 20 && state != GRPC::Core::ConnectivityStates::READY
STDERR.puts "first round of waiting for state to become READY"
ch.watch_connectivity_state(state, Time.now + 60) ch.watch_connectivity_state(state, Time.now + 60)
state = ch.connectivity_state(true) state = ch.connectivity_state(true)
count += 1 count += 1
...@@ -114,8 +115,7 @@ describe 'channel connection behavior' do ...@@ -114,8 +115,7 @@ describe 'channel connection behavior' do
state = ch.connectivity_state state = ch.connectivity_state
count = 0 count = 0
while count < 20 and state == GRPC::Core::ConnectivityStates::READY do while count < 20 && state == GRPC::Core::ConnectivityStates::READY
STDERR.puts "server shut down. waiting for state to not be READY"
ch.watch_connectivity_state(state, Time.now + 60) ch.watch_connectivity_state(state, Time.now + 60)
state = ch.connectivity_state state = ch.connectivity_state
count += 1 count += 1
...@@ -128,8 +128,7 @@ describe 'channel connection behavior' do ...@@ -128,8 +128,7 @@ describe 'channel connection behavior' do
state = ch.connectivity_state(true) state = ch.connectivity_state(true)
count = 0 count = 0
while count < 20 and state != GRPC::Core::ConnectivityStates::READY do while count < 20 && state != GRPC::Core::ConnectivityStates::READY
STDERR.puts "second round of waiting for state to become READY"
ch.watch_connectivity_state(state, Time.now + 60) ch.watch_connectivity_state(state, Time.now + 60)
state = ch.connectivity_state(true) state = ch.connectivity_state(true)
count += 1 count += 1
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment