From fcad5799b4c785d428ed30340d79581a5d97026c Mon Sep 17 00:00:00 2001
From: Alexander Polcyn <apolcyn@google.com>
Date: Tue, 14 Mar 2017 12:26:17 -0700
Subject: [PATCH] in the middle of fixing watch and get connectivity state to
 work with new changes

---
 src/ruby/ext/grpc/rb_channel.c           | 113 +++++++++++++++--------
 src/ruby/spec/channel_connection_spec.rb |  49 ++++++++++
 2 files changed, 121 insertions(+), 41 deletions(-)

diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c
index 2489ec2fef..8cd489345d 100644
--- a/src/ruby/ext/grpc/rb_channel.c
+++ b/src/ruby/ext/grpc/rb_channel.c
@@ -76,8 +76,10 @@ typedef struct grpc_rb_channel {
   grpc_completion_queue *queue;
   int request_safe_destroy;
   int safe_to_destroy;
-  gpr_mu safe_destroy_mu;
-  gpr_cv safe_destroy_cv;
+  grpc_connectivity_state current_connectivity_state;
+
+  gpr_mu channel_mu;
+  gpr_cv channel_cv;
 } grpc_rb_channel;
 
 /* Forward declarations of functions involved in temporary fix to
@@ -180,12 +182,19 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) {
   GPR_ASSERT(ch);
 
   wrapper->wrapped = ch;
-  gpr_mu_init(&wrapper->safe_destroy_mu);
-  gpr_cv_init(&wrapper->safe_destroy_cv);
-  gpr_mu_lock(&wrapper->safe_destroy_mu);
+
+  gpr_mu_init(&wrapper->channel_mu);
+  gpr_cv_init(&wrapper->channel_cv);
+
+  gpr_mu_lock(&wrapper->channel_mu);
+  wrapper->current_connectivity_state = grpc_channel_check_connectivity_state(wrapper->wrapped, 0);
+  gpr_cv_signal(&wrapper->channel_cv);
+  gpr_mu_unlock(&wrapper->channel_mu);
+
+  gpr_mu_lock(&wrapper->channel_mu);
   wrapper->safe_to_destroy = 0;
   wrapper->request_safe_destroy = 0;
-  gpr_mu_unlock(&wrapper->safe_destroy_mu);
+  gpr_mu_unlock(&wrapper->channel_mu);
   grpc_rb_channel_try_register_connection_polling(wrapper);
 
   if (args.args != NULL) {
@@ -232,43 +241,57 @@ static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv,
       grpc_channel_check_connectivity_state(ch, grpc_try_to_connect));
 }
 
-/* Watch for a change in connectivity state.
-
-   Once the channel connectivity state is different from the last observed
-   state, tag will be enqueued on cq with success=1
-
-   If deadline expires BEFORE the state is changed, tag will be enqueued on
-   the completion queue with success=0 */
+/* Wait until the channel's connectivity state becomes different from
+ * "last_state", or "deadline" expires.
+ * Returns true if the the channel's connectivity state becomes
+ * different from "last_state" within "deadline".
+ * Returns false if "deadline" expires before the channel's connectivity
+ * state changes from "last_state".
+ * */
 static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self,
                                                       VALUE last_state,
                                                       VALUE deadline) {
   grpc_rb_channel *wrapper = NULL;
-  grpc_channel *ch = NULL;
-  grpc_completion_queue *cq = NULL;
-
-  void *tag = wrapper;
-
-  grpc_event event;
 
   TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
-  ch = wrapper->wrapped;
-  cq = wrapper->queue;
-  if (ch == NULL) {
+
+  if (wrapper->wrapped == NULL) {
     rb_raise(rb_eRuntimeError, "closed!");
     return Qnil;
   }
-  grpc_channel_watch_connectivity_state(
-      ch, (grpc_connectivity_state)NUM2LONG(last_state),
-      grpc_rb_time_timeval(deadline, /* absolute time */ 0), cq, tag);
 
-  event = rb_completion_queue_pluck(cq, tag, gpr_inf_future(GPR_CLOCK_REALTIME),
-                                    NULL);
+  if (!FIXNUM_P(last_state)) {
+    rb_raise(rb_eTypeError, "bad type for last_state. want a GRPC::Core::ChannelState constant");
+    return Qnil;
+  }
 
-  if (event.success) {
+  gpr_mu_lock(&wrapper->channel_mu);
+  if (wrapper->current_connectivity_state != NUM2LONG(last_state)) {
+    gpr_mu_unlock(&wrapper->channel_mu);
     return Qtrue;
-  } else {
+  }
+  if (wrapper->request_safe_destroy) {
+    gpr_mu_unlock(&wrapper->channel_mu);
+    rb_raise(rb_eRuntimeError, "watch_connectivity_state called on closed channel");
+    return Qfalse;
+  }
+  if (wrapper->safe_to_destroy) {
+    gpr_mu_unlock(&wrapper->channel_mu);
+    gpr_log(GPR_DEBUG, "GRPC_RUBY_RB_CHANNEL: attempt to watch_connectivity_state on a non-state-polled channel");
+    return Qfalse;
+  }
+  gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, grpc_rb_time_timeval(deadline, /* absolute time */ 0));
+  if (wrapper->request_safe_destroy) {
+    gpr_mu_unlock(&wrapper->channel_mu);
+    rb_raise(rb_eRuntimeError, "channel closed during call to watch_connectivity_state");
     return Qfalse;
   }
+  if (wrapper->current_connectivity_state != NUM2LONG(last_state)) {
+    gpr_mu_unlock(&wrapper->channel_mu);
+    return Qtrue;
+  }
+  gpr_mu_unlock(&wrapper->channel_mu);
+  return Qfalse;
 }
 
 /* Create a call given a grpc_channel, in order to call method. The request
@@ -378,40 +401,47 @@ static void grpc_rb_channel_try_register_connection_polling(
 
   GPR_ASSERT(wrapper);
   GPR_ASSERT(wrapper->wrapped);
-  gpr_mu_lock(&wrapper->safe_destroy_mu);
+  gpr_mu_lock(&wrapper->channel_mu);
   if (wrapper->request_safe_destroy) {
     wrapper->safe_to_destroy = 1;
-    gpr_cv_signal(&wrapper->safe_destroy_cv);
-    gpr_mu_unlock(&wrapper->safe_destroy_mu);
+    gpr_cv_signal(&wrapper->channel_cv);
+    gpr_mu_unlock(&wrapper->channel_mu);
     return;
   }
   gpr_mu_lock(&channel_polling_mu);
+
+  gpr_mu_lock(&wrapper->channel_mu);
   conn_state = grpc_channel_check_connectivity_state(wrapper->wrapped, 0);
+  if (conn_state != wrapper->current_connectivity_state) {
+    wrapper->current_connectivity_state = conn_state;
+    gpr_cv_signal(&wrapper->channel_cv);
+  }
   // avoid posting work to the channel polling cq if it's been shutdown
   if (!abort_channel_polling && conn_state != GRPC_CHANNEL_SHUTDOWN) {
     grpc_channel_watch_connectivity_state(
         wrapper->wrapped, conn_state, sleep_time, channel_polling_cq, wrapper);
   } else {
     wrapper->safe_to_destroy = 1;
-    gpr_cv_signal(&wrapper->safe_destroy_cv);
+    gpr_cv_signal(&wrapper->channel_cv);
   }
+  gpr_mu_unlock(&wrapper->channel_mu);
   gpr_mu_unlock(&channel_polling_mu);
-  gpr_mu_unlock(&wrapper->safe_destroy_mu);
+  gpr_mu_unlock(&wrapper->channel_mu);
 }
 
-// Note requires wrapper->wrapped, wrapper->safe_destroy_mu/cv initialized
+// Note requires wrapper->wrapped, wrapper->channel_mu/cv initialized
 static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper) {
-  gpr_mu_lock(&wrapper->safe_destroy_mu);
+  gpr_mu_lock(&wrapper->channel_mu);
   while (!wrapper->safe_to_destroy) {
     wrapper->request_safe_destroy = 1;
-    gpr_cv_wait(&wrapper->safe_destroy_cv, &wrapper->safe_destroy_mu,
+    gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu,
                 gpr_inf_future(GPR_CLOCK_REALTIME));
   }
   GPR_ASSERT(wrapper->safe_to_destroy);
-  gpr_mu_unlock(&wrapper->safe_destroy_mu);
+  gpr_mu_unlock(&wrapper->channel_mu);
 
-  gpr_mu_destroy(&wrapper->safe_destroy_mu);
-  gpr_cv_destroy(&wrapper->safe_destroy_cv);
+  gpr_mu_destroy(&wrapper->channel_mu);
+  gpr_cv_destroy(&wrapper->channel_cv);
 
   grpc_channel_destroy(wrapper->wrapped);
 }
@@ -434,6 +464,7 @@ static void *run_poll_channels_loop_no_gil(void *arg) {
     }
     if (event.type == GRPC_OP_COMPLETE) {
       wrapper = (grpc_rb_channel *)event.tag;
+
       grpc_rb_channel_try_register_connection_polling(wrapper);
     }
   }
@@ -524,7 +555,7 @@ void Init_grpc_channel() {
   rb_define_method(grpc_rb_cChannel, "connectivity_state",
                    grpc_rb_channel_get_connectivity_state, -1);
   rb_define_method(grpc_rb_cChannel, "watch_connectivity_state",
-                   grpc_rb_channel_watch_connectivity_state, 4);
+                   grpc_rb_channel_watch_connectivity_state, 2);
   rb_define_method(grpc_rb_cChannel, "create_call", grpc_rb_channel_create_call,
                    5);
   rb_define_method(grpc_rb_cChannel, "target", grpc_rb_channel_get_target, 0);
diff --git a/src/ruby/spec/channel_connection_spec.rb b/src/ruby/spec/channel_connection_spec.rb
index 58ab37d7bc..d8e10f7b76 100644
--- a/src/ruby/spec/channel_connection_spec.rb
+++ b/src/ruby/spec/channel_connection_spec.rb
@@ -90,4 +90,53 @@ describe 'channel connection behavior' do
     expect(stub.an_rpc(req)).to be_a(EchoMsg)
     stop_server
   end
+
+  it 'observably connects and reconnects to transient server when using the channel state API', trial: true do
+    port = start_server
+    ch = GRPC::Core::Channel.new("localhost:#{port}", {}, :this_channel_is_insecure)
+
+    expect(ch.connectivity_state).to be(GRPC::Core::ConnectivityStates::IDLE)
+
+    state = ch.connectivity_state(true)
+
+    count = 0
+    while count < 20 and state != GRPC::Core::ConnectivityStates::READY do
+      STDERR.puts "first round of waiting for state to become READY"
+      ch.watch_connectivity_state(state, Time.now + 60)
+      state = ch.connectivity_state(true)
+      count += 1
+    end
+
+    expect(state).to be(GRPC::Core::ConnectivityStates::READY)
+
+    stop_server
+
+    state = ch.connectivity_state
+
+    count = 0
+    while count < 20 and state == GRPC::Core::ConnectivityStates::READY do
+      STDERR.puts "server shut down. waiting for state to not be READY"
+      ch.watch_connectivity_state(state, Time.now + 60)
+      state = ch.connectivity_state
+      count += 1
+    end
+
+    expect(state).to_not be(GRPC::Core::ConnectivityStates::READY)
+
+    start_server(port)
+
+    state = ch.connectivity_state(true)
+
+    count = 0
+    while count < 20 and state != GRPC::Core::ConnectivityStates::READY do
+      STDERR.puts "second round of waiting for state to become READY"
+      ch.watch_connectivity_state(state, Time.now + 60)
+      state = ch.connectivity_state(true)
+      count += 1
+    end
+
+    expect(state).to be(GRPC::Core::ConnectivityStates::READY)
+
+    stop_server
+  end
 end
-- 
GitLab