Skip to content
Snippets Groups Projects
Commit fcad5799 authored by Alexander Polcyn's avatar Alexander Polcyn
Browse files

in the middle of fixing watch and get connectivity state to work with new changes

parent 427ec5e4
No related branches found
No related tags found
No related merge requests found
......@@ -76,8 +76,10 @@ typedef struct grpc_rb_channel {
grpc_completion_queue *queue;
int request_safe_destroy;
int safe_to_destroy;
gpr_mu safe_destroy_mu;
gpr_cv safe_destroy_cv;
grpc_connectivity_state current_connectivity_state;
gpr_mu channel_mu;
gpr_cv channel_cv;
} grpc_rb_channel;
/* Forward declarations of functions involved in temporary fix to
......@@ -180,12 +182,19 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) {
GPR_ASSERT(ch);
wrapper->wrapped = ch;
gpr_mu_init(&wrapper->safe_destroy_mu);
gpr_cv_init(&wrapper->safe_destroy_cv);
gpr_mu_lock(&wrapper->safe_destroy_mu);
gpr_mu_init(&wrapper->channel_mu);
gpr_cv_init(&wrapper->channel_cv);
gpr_mu_lock(&wrapper->channel_mu);
wrapper->current_connectivity_state = grpc_channel_check_connectivity_state(wrapper->wrapped, 0);
gpr_cv_signal(&wrapper->channel_cv);
gpr_mu_unlock(&wrapper->channel_mu);
gpr_mu_lock(&wrapper->channel_mu);
wrapper->safe_to_destroy = 0;
wrapper->request_safe_destroy = 0;
gpr_mu_unlock(&wrapper->safe_destroy_mu);
gpr_mu_unlock(&wrapper->channel_mu);
grpc_rb_channel_try_register_connection_polling(wrapper);
if (args.args != NULL) {
......@@ -232,43 +241,57 @@ static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv,
grpc_channel_check_connectivity_state(ch, grpc_try_to_connect));
}
/* Watch for a change in connectivity state.
Once the channel connectivity state is different from the last observed
state, tag will be enqueued on cq with success=1
If deadline expires BEFORE the state is changed, tag will be enqueued on
the completion queue with success=0 */
/* Wait until the channel's connectivity state becomes different from
* "last_state", or "deadline" expires.
* Returns true if the the channel's connectivity state becomes
* different from "last_state" within "deadline".
* Returns false if "deadline" expires before the channel's connectivity
* state changes from "last_state".
* */
static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self,
VALUE last_state,
VALUE deadline) {
grpc_rb_channel *wrapper = NULL;
grpc_channel *ch = NULL;
grpc_completion_queue *cq = NULL;
void *tag = wrapper;
grpc_event event;
TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
ch = wrapper->wrapped;
cq = wrapper->queue;
if (ch == NULL) {
if (wrapper->wrapped == NULL) {
rb_raise(rb_eRuntimeError, "closed!");
return Qnil;
}
grpc_channel_watch_connectivity_state(
ch, (grpc_connectivity_state)NUM2LONG(last_state),
grpc_rb_time_timeval(deadline, /* absolute time */ 0), cq, tag);
event = rb_completion_queue_pluck(cq, tag, gpr_inf_future(GPR_CLOCK_REALTIME),
NULL);
if (!FIXNUM_P(last_state)) {
rb_raise(rb_eTypeError, "bad type for last_state. want a GRPC::Core::ChannelState constant");
return Qnil;
}
if (event.success) {
gpr_mu_lock(&wrapper->channel_mu);
if (wrapper->current_connectivity_state != NUM2LONG(last_state)) {
gpr_mu_unlock(&wrapper->channel_mu);
return Qtrue;
} else {
}
if (wrapper->request_safe_destroy) {
gpr_mu_unlock(&wrapper->channel_mu);
rb_raise(rb_eRuntimeError, "watch_connectivity_state called on closed channel");
return Qfalse;
}
if (wrapper->safe_to_destroy) {
gpr_mu_unlock(&wrapper->channel_mu);
gpr_log(GPR_DEBUG, "GRPC_RUBY_RB_CHANNEL: attempt to watch_connectivity_state on a non-state-polled channel");
return Qfalse;
}
gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, grpc_rb_time_timeval(deadline, /* absolute time */ 0));
if (wrapper->request_safe_destroy) {
gpr_mu_unlock(&wrapper->channel_mu);
rb_raise(rb_eRuntimeError, "channel closed during call to watch_connectivity_state");
return Qfalse;
}
if (wrapper->current_connectivity_state != NUM2LONG(last_state)) {
gpr_mu_unlock(&wrapper->channel_mu);
return Qtrue;
}
gpr_mu_unlock(&wrapper->channel_mu);
return Qfalse;
}
/* Create a call given a grpc_channel, in order to call method. The request
......@@ -378,40 +401,47 @@ static void grpc_rb_channel_try_register_connection_polling(
GPR_ASSERT(wrapper);
GPR_ASSERT(wrapper->wrapped);
gpr_mu_lock(&wrapper->safe_destroy_mu);
gpr_mu_lock(&wrapper->channel_mu);
if (wrapper->request_safe_destroy) {
wrapper->safe_to_destroy = 1;
gpr_cv_signal(&wrapper->safe_destroy_cv);
gpr_mu_unlock(&wrapper->safe_destroy_mu);
gpr_cv_signal(&wrapper->channel_cv);
gpr_mu_unlock(&wrapper->channel_mu);
return;
}
gpr_mu_lock(&channel_polling_mu);
gpr_mu_lock(&wrapper->channel_mu);
conn_state = grpc_channel_check_connectivity_state(wrapper->wrapped, 0);
if (conn_state != wrapper->current_connectivity_state) {
wrapper->current_connectivity_state = conn_state;
gpr_cv_signal(&wrapper->channel_cv);
}
// avoid posting work to the channel polling cq if it's been shutdown
if (!abort_channel_polling && conn_state != GRPC_CHANNEL_SHUTDOWN) {
grpc_channel_watch_connectivity_state(
wrapper->wrapped, conn_state, sleep_time, channel_polling_cq, wrapper);
} else {
wrapper->safe_to_destroy = 1;
gpr_cv_signal(&wrapper->safe_destroy_cv);
gpr_cv_signal(&wrapper->channel_cv);
}
gpr_mu_unlock(&wrapper->channel_mu);
gpr_mu_unlock(&channel_polling_mu);
gpr_mu_unlock(&wrapper->safe_destroy_mu);
gpr_mu_unlock(&wrapper->channel_mu);
}
// Note requires wrapper->wrapped, wrapper->safe_destroy_mu/cv initialized
// Note requires wrapper->wrapped, wrapper->channel_mu/cv initialized
static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper) {
gpr_mu_lock(&wrapper->safe_destroy_mu);
gpr_mu_lock(&wrapper->channel_mu);
while (!wrapper->safe_to_destroy) {
wrapper->request_safe_destroy = 1;
gpr_cv_wait(&wrapper->safe_destroy_cv, &wrapper->safe_destroy_mu,
gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu,
gpr_inf_future(GPR_CLOCK_REALTIME));
}
GPR_ASSERT(wrapper->safe_to_destroy);
gpr_mu_unlock(&wrapper->safe_destroy_mu);
gpr_mu_unlock(&wrapper->channel_mu);
gpr_mu_destroy(&wrapper->safe_destroy_mu);
gpr_cv_destroy(&wrapper->safe_destroy_cv);
gpr_mu_destroy(&wrapper->channel_mu);
gpr_cv_destroy(&wrapper->channel_cv);
grpc_channel_destroy(wrapper->wrapped);
}
......@@ -434,6 +464,7 @@ static void *run_poll_channels_loop_no_gil(void *arg) {
}
if (event.type == GRPC_OP_COMPLETE) {
wrapper = (grpc_rb_channel *)event.tag;
grpc_rb_channel_try_register_connection_polling(wrapper);
}
}
......@@ -524,7 +555,7 @@ void Init_grpc_channel() {
rb_define_method(grpc_rb_cChannel, "connectivity_state",
grpc_rb_channel_get_connectivity_state, -1);
rb_define_method(grpc_rb_cChannel, "watch_connectivity_state",
grpc_rb_channel_watch_connectivity_state, 4);
grpc_rb_channel_watch_connectivity_state, 2);
rb_define_method(grpc_rb_cChannel, "create_call", grpc_rb_channel_create_call,
5);
rb_define_method(grpc_rb_cChannel, "target", grpc_rb_channel_get_target, 0);
......
......@@ -90,4 +90,53 @@ describe 'channel connection behavior' do
expect(stub.an_rpc(req)).to be_a(EchoMsg)
stop_server
end
it 'observably connects and reconnects to transient server when using the channel state API', trial: true do
port = start_server
ch = GRPC::Core::Channel.new("localhost:#{port}", {}, :this_channel_is_insecure)
expect(ch.connectivity_state).to be(GRPC::Core::ConnectivityStates::IDLE)
state = ch.connectivity_state(true)
count = 0
while count < 20 and state != GRPC::Core::ConnectivityStates::READY do
STDERR.puts "first round of waiting for state to become READY"
ch.watch_connectivity_state(state, Time.now + 60)
state = ch.connectivity_state(true)
count += 1
end
expect(state).to be(GRPC::Core::ConnectivityStates::READY)
stop_server
state = ch.connectivity_state
count = 0
while count < 20 and state == GRPC::Core::ConnectivityStates::READY do
STDERR.puts "server shut down. waiting for state to not be READY"
ch.watch_connectivity_state(state, Time.now + 60)
state = ch.connectivity_state
count += 1
end
expect(state).to_not be(GRPC::Core::ConnectivityStates::READY)
start_server(port)
state = ch.connectivity_state(true)
count = 0
while count < 20 and state != GRPC::Core::ConnectivityStates::READY do
STDERR.puts "second round of waiting for state to become READY"
ch.watch_connectivity_state(state, Time.now + 60)
state = ch.connectivity_state(true)
count += 1
end
expect(state).to be(GRPC::Core::ConnectivityStates::READY)
stop_server
end
end
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment