Skip to content
Snippets Groups Projects
Commit 5b881460 authored by Alexander Polcyn's avatar Alexander Polcyn
Browse files

make fewer lock/unlock calls and loop on cv_wait in watch conn state

parent ea282e9c
No related branches found
No related tags found
No related merge requests found
...@@ -78,6 +78,7 @@ typedef struct grpc_rb_channel { ...@@ -78,6 +78,7 @@ typedef struct grpc_rb_channel {
grpc_connectivity_state current_connectivity_state; grpc_connectivity_state current_connectivity_state;
int mu_init_done; int mu_init_done;
int abort_watch_connectivity_state;
gpr_mu channel_mu; gpr_mu channel_mu;
gpr_cv channel_cv; gpr_cv channel_cv;
} grpc_rb_channel; } grpc_rb_channel;
...@@ -193,6 +194,7 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) { ...@@ -193,6 +194,7 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) {
wrapper->mu_init_done = 1; wrapper->mu_init_done = 1;
gpr_mu_lock(&wrapper->channel_mu); gpr_mu_lock(&wrapper->channel_mu);
wrapper->abort_watch_connectivity_state = 0;
wrapper->current_connectivity_state = grpc_channel_check_connectivity_state(wrapper->wrapped, 0); wrapper->current_connectivity_state = grpc_channel_check_connectivity_state(wrapper->wrapped, 0);
wrapper->safe_to_destroy = 0; wrapper->safe_to_destroy = 0;
wrapper->request_safe_destroy = 0; wrapper->request_safe_destroy = 0;
...@@ -242,8 +244,7 @@ static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv, ...@@ -242,8 +244,7 @@ static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv,
rb_raise(rb_eRuntimeError, "closed!"); rb_raise(rb_eRuntimeError, "closed!");
return Qnil; return Qnil;
} }
return LONG2NUM( return LONG2NUM(grpc_channel_check_connectivity_state(wrapper->wrapped, grpc_try_to_connect));
grpc_channel_check_connectivity_state(ch, grpc_try_to_connect));
} }
typedef struct watch_state_stack { typedef struct watch_state_stack {
...@@ -254,39 +255,35 @@ typedef struct watch_state_stack { ...@@ -254,39 +255,35 @@ typedef struct watch_state_stack {
static void *watch_channel_state_without_gvl(void *arg) { static void *watch_channel_state_without_gvl(void *arg) {
watch_state_stack *stack = (watch_state_stack*)arg; watch_state_stack *stack = (watch_state_stack*)arg;
gpr_timespec deadline = stack->deadline; gpr_timespec deadline = stack->deadline;
grpc_rb_channel *wrapper = stack->wrapper; grpc_rb_channel *wrapper = stack->wrapper;
int last_state = stack->last_state; int last_state = stack->last_state;
void *return_value = (void*)0;
gpr_timespec time_check_increment = gpr_time_add(
gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(20, GPR_TIMESPAN));
gpr_mu_lock(&wrapper->channel_mu); gpr_mu_lock(&wrapper->channel_mu);
if (wrapper->current_connectivity_state != last_state) { while(wrapper->current_connectivity_state == last_state &&
gpr_mu_unlock(&wrapper->channel_mu); !wrapper->request_safe_destroy &&
return (void*)0; !wrapper->safe_to_destroy &&
} !wrapper->abort_watch_connectivity_state &&
if (wrapper->request_safe_destroy) { gpr_time_cmp(deadline, gpr_now(GPR_CLOCK_REALTIME)) > 0) {
gpr_mu_unlock(&wrapper->channel_mu); gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, time_check_increment);
return (void*)0;
} }
if (wrapper->safe_to_destroy) {
gpr_mu_unlock(&wrapper->channel_mu);
return (void*)0;
}
gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, deadline);
if (wrapper->current_connectivity_state != last_state) { if (wrapper->current_connectivity_state != last_state) {
gpr_mu_unlock(&wrapper->channel_mu); return_value = (void*)1;
return (void*)1;
} }
gpr_mu_unlock(&wrapper->channel_mu); gpr_mu_unlock(&wrapper->channel_mu);
return (void*)0;
return return_value;
} }
static void watch_channel_state_unblocking_func(void *arg) { static void watch_channel_state_unblocking_func(void *arg) {
grpc_rb_channel *wrapper = (grpc_rb_channel*)arg; grpc_rb_channel *wrapper = (grpc_rb_channel*)arg;
gpr_log(GPR_DEBUG, "GRPC_RUBY: watch channel state unblocking func called"); gpr_log(GPR_DEBUG, "GRPC_RUBY: watch channel state unblocking func called");
gpr_mu_lock(&wrapper->channel_mu); gpr_mu_lock(&wrapper->channel_mu);
wrapper->abort_watch_connectivity_state = 1;
gpr_cv_broadcast(&wrapper->channel_cv); gpr_cv_broadcast(&wrapper->channel_cv);
gpr_mu_unlock(&wrapper->channel_mu); gpr_mu_unlock(&wrapper->channel_mu);
} }
...@@ -461,8 +458,9 @@ static void grpc_rb_channel_try_register_connection_polling( ...@@ -461,8 +458,9 @@ static void grpc_rb_channel_try_register_connection_polling(
// Note requires wrapper->wrapped, wrapper->channel_mu/cv initialized // Note requires wrapper->wrapped, wrapper->channel_mu/cv initialized
static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper) { static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper) {
gpr_mu_lock(&wrapper->channel_mu); gpr_mu_lock(&wrapper->channel_mu);
wrapper->request_safe_destroy = 1;
while (!wrapper->safe_to_destroy) { while (!wrapper->safe_to_destroy) {
wrapper->request_safe_destroy = 1;
gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu,
gpr_inf_future(GPR_CLOCK_REALTIME)); gpr_inf_future(GPR_CLOCK_REALTIME));
} }
......
...@@ -92,7 +92,7 @@ describe 'channel connection behavior' do ...@@ -92,7 +92,7 @@ describe 'channel connection behavior' do
end end
it 'observably connects and reconnects to transient server' \ it 'observably connects and reconnects to transient server' \
'when using the channel state API' do ' when using the channel state API' do
port = start_server port = start_server
ch = GRPC::Core::Channel.new("localhost:#{port}", {}, ch = GRPC::Core::Channel.new("localhost:#{port}", {},
:this_channel_is_insecure) :this_channel_is_insecure)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment