From b2c0b7bc7411c0914e2f65d56096ecde1a207b53 Mon Sep 17 00:00:00 2001
From: Alexander Polcyn <apolcyn@google.com>
Date: Thu, 27 Apr 2017 00:26:25 -0700
Subject: [PATCH] constant state watch without timeouts

---
 src/ruby/end2end/channel_closing_driver.rb    |   5 +
 src/ruby/end2end/channel_state_driver.rb      |   3 +
 src/ruby/end2end/grpc_class_init_client.rb    |  65 ++-
 src/ruby/end2end/grpc_class_init_driver.rb    |  51 +-
 .../sig_int_during_channel_watch_client.rb    |   2 +
 .../sig_int_during_channel_watch_driver.rb    |   5 +
 src/ruby/ext/grpc/rb_call.c                   |   2 +-
 src/ruby/ext/grpc/rb_channel.c                | 494 ++++++++++++------
 src/ruby/ext/grpc/rb_completion_queue.c       |  14 +-
 src/ruby/ext/grpc/rb_completion_queue.h       |   2 +-
 src/ruby/ext/grpc/rb_event_thread.c           |  12 +-
 src/ruby/ext/grpc/rb_server.c                 |   2 +-
 src/ruby/spec/channel_connection_spec.rb      |  35 +-
 13 files changed, 472 insertions(+), 220 deletions(-)

diff --git a/src/ruby/end2end/channel_closing_driver.rb b/src/ruby/end2end/channel_closing_driver.rb
index d3e5373b0b..bed8c43405 100755
--- a/src/ruby/end2end/channel_closing_driver.rb
+++ b/src/ruby/end2end/channel_closing_driver.rb
@@ -61,6 +61,11 @@ def main
       'channel is closed while connectivity is watched'
   end
 
+  client_exit_code = $CHILD_STATUS
+  if client_exit_code != 0
+    fail "channel closing client failed, exit code #{client_exit_code}"
+  end
+
   server_runner.stop
 end
 
diff --git a/src/ruby/end2end/channel_state_driver.rb b/src/ruby/end2end/channel_state_driver.rb
index 80fb62899e..9910076dba 100755
--- a/src/ruby/end2end/channel_state_driver.rb
+++ b/src/ruby/end2end/channel_state_driver.rb
@@ -58,6 +58,9 @@ def main
            'It likely hangs when ended abruptly'
   end
 
+  # The interrupt in the child process should cause it to
+  # exit a non-zero status, so don't check it here.
+  # This test mainly tries to catch deadlock.
   server_runner.stop
 end
 
diff --git a/src/ruby/end2end/grpc_class_init_client.rb b/src/ruby/end2end/grpc_class_init_client.rb
index ee79292119..8e46907368 100755
--- a/src/ruby/end2end/grpc_class_init_client.rb
+++ b/src/ruby/end2end/grpc_class_init_client.rb
@@ -34,44 +34,81 @@
 
 require_relative './end2end_common'
 
-def main
-  grpc_class = ''
-  OptionParser.new do |opts|
-    opts.on('--grpc_class=P', String) do |p|
-      grpc_class = p
+def construct_many(test_proc)
+  thds = []
+  4.times do
+    thds << Thread.new do
+      20.times do
+        test_proc.call
+      end
     end
-  end.parse!
+  end
+  20.times do
+    test_proc.call
+  end
+  thds.each(&:join)
+end
+
+def run_gc_stress_test(test_proc)
+  GC.disable
+  construct_many(test_proc)
 
-  test_proc = nil
+  GC.enable
+  construct_many(test_proc)
 
+  GC.start(full_mark: true, immediate_sweep: true)
+  construct_many(test_proc)
+end
+
+def get_test_proc(grpc_class)
   case grpc_class
   when 'channel'
-    test_proc = proc do
+    return proc do
       GRPC::Core::Channel.new('dummy_host', nil, :this_channel_is_insecure)
     end
   when 'server'
-    test_proc = proc do
+    return proc do
       GRPC::Core::Server.new({})
     end
   when 'channel_credentials'
-    test_proc = proc do
+    return proc do
       GRPC::Core::ChannelCredentials.new
     end
   when 'call_credentials'
-    test_proc = proc do
+    return proc do
       GRPC::Core::CallCredentials.new(proc { |noop| noop })
     end
   when 'compression_options'
-    test_proc = proc do
+    return proc do
       GRPC::Core::CompressionOptions.new
     end
   else
     fail "bad --grpc_class=#{grpc_class} param"
   end
+end
+
+def main
+  grpc_class = ''
+  gc_stress = false
+  OptionParser.new do |opts|
+    opts.on('--grpc_class=P', String) do |p|
+      grpc_class = p
+    end
+    opts.on('--gc_stress=P') do |p|
+      gc_stress = p
+    end
+  end.parse!
+
+  test_proc = get_test_proc(grpc_class)
+
+  if gc_stress == 'true'
+    run_gc_stress_test(test_proc)
+    return
+  end
 
-  th = Thread.new { test_proc.call }
+  thd = Thread.new { test_proc.call }
   test_proc.call
-  th.join
+  thd.join
 end
 
 main
diff --git a/src/ruby/end2end/grpc_class_init_driver.rb b/src/ruby/end2end/grpc_class_init_driver.rb
index 764d029f14..0e330a493f 100755
--- a/src/ruby/end2end/grpc_class_init_driver.rb
+++ b/src/ruby/end2end/grpc_class_init_driver.rb
@@ -38,29 +38,38 @@ def main
                             call_credentials
                             compression_options )
 
-  native_grpc_classes.each do |grpc_class|
-    STDERR.puts 'start client'
-    this_dir = File.expand_path(File.dirname(__FILE__))
-    client_path = File.join(this_dir, 'grpc_class_init_client.rb')
-    client_pid = Process.spawn(RbConfig.ruby,
-                               client_path,
-                               "--grpc_class=#{grpc_class}")
-    begin
-      Timeout.timeout(10) do
-        Process.wait(client_pid)
+  # there is room for false positives in this test,
+  # do 10 runs for each config to reduce these.
+  [true, false].each do |gc_stress|
+    10.times do
+      native_grpc_classes.each do |grpc_class|
+        STDERR.puts 'start client'
+        this_dir = File.expand_path(File.dirname(__FILE__))
+        client_path = File.join(this_dir, 'grpc_class_init_client.rb')
+        client_pid = Process.spawn(RbConfig.ruby,
+                                   client_path,
+                                   "--grpc_class=#{grpc_class}",
+                                   "--gc_stress=#{gc_stress}")
+        begin
+          Timeout.timeout(10) do
+            Process.wait(client_pid)
+          end
+        rescue Timeout::Error
+          STDERR.puts "timeout waiting for client pid #{client_pid}"
+          Process.kill('SIGKILL', client_pid)
+          Process.wait(client_pid)
+          STDERR.puts 'killed client child'
+          raise 'Timed out waiting for client process. ' \
+            'It likely hangs when the first constructed gRPC object has ' \
+            "type: #{grpc_class}"
+        end
+
+        client_exit_code = $CHILD_STATUS
+        if client_exit_code != 0
+          fail "client failed, exit code #{client_exit_code}"
+        end
       end
-    rescue Timeout::Error
-      STDERR.puts "timeout waiting for client pid #{client_pid}"
-      Process.kill('SIGKILL', client_pid)
-      Process.wait(client_pid)
-      STDERR.puts 'killed client child'
-      raise 'Timed out waiting for client process. ' \
-        'It likely hangs when the first constructed gRPC object has ' \
-        "type: #{grpc_class}"
     end
-
-    client_exit_code = $CHILD_STATUS
-    fail "client failed, exit code #{client_exit_code}" if client_exit_code != 0
   end
 end
 
diff --git a/src/ruby/end2end/sig_int_during_channel_watch_client.rb b/src/ruby/end2end/sig_int_during_channel_watch_client.rb
index 389fc5ba33..0c6a374925 100755
--- a/src/ruby/end2end/sig_int_during_channel_watch_client.rb
+++ b/src/ruby/end2end/sig_int_during_channel_watch_client.rb
@@ -46,6 +46,8 @@ def main
     end
   end.parse!
 
+  trap('SIGINT') { exit 0 }
+
   thd = Thread.new do
     child_thread_channel = GRPC::Core::Channel.new("localhost:#{server_port}",
                                                    {},
diff --git a/src/ruby/end2end/sig_int_during_channel_watch_driver.rb b/src/ruby/end2end/sig_int_during_channel_watch_driver.rb
index 670cda0919..79a8c133fa 100755
--- a/src/ruby/end2end/sig_int_during_channel_watch_driver.rb
+++ b/src/ruby/end2end/sig_int_during_channel_watch_driver.rb
@@ -63,6 +63,11 @@ def main
       'SIGINT is sent while there is an active connectivity_state call'
   end
 
+  client_exit_code = $CHILD_STATUS
+  if client_exit_code != 0
+    fail "sig_int_during_channel_watch_client failed: #{client_exit_code}"
+  end
+
   server_runner.stop
 end
 
diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c
index 344cb941ff..6cb71870f5 100644
--- a/src/ruby/ext/grpc/rb_call.c
+++ b/src/ruby/ext/grpc/rb_call.c
@@ -103,7 +103,7 @@ static void destroy_call(grpc_rb_call *call) {
   if (call->wrapped != NULL) {
     grpc_call_destroy(call->wrapped);
     call->wrapped = NULL;
-    grpc_rb_completion_queue_destroy(call->queue);
+    grpc_rb_completion_queue_safe_destroy(call->queue);
     call->queue = NULL;
   }
 }
diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c
index fb610f548e..973a45adf5 100644
--- a/src/ruby/ext/grpc/rb_channel.c
+++ b/src/ruby/ext/grpc/rb_channel.c
@@ -52,75 +52,131 @@
 
 /* id_channel is the name of the hidden ivar that preserves a reference to the
  * channel on a call, so that calls are not GCed before their channel.  */
-static ID id_channel;
+ID id_channel;
 
 /* id_target is the name of the hidden ivar that preserves a reference to the
  * target string used to create the call, preserved so that it does not get
  * GCed before the channel */
-static ID id_target;
+ID id_target;
 
 /* id_insecure_channel is used to indicate that a channel is insecure */
-static VALUE id_insecure_channel;
+VALUE id_insecure_channel;
 
 /* grpc_rb_cChannel is the ruby class that proxies grpc_channel. */
-static VALUE grpc_rb_cChannel = Qnil;
+VALUE grpc_rb_cChannel = Qnil;
 
 /* Used during the conversion of a hash to channel args during channel setup */
-static VALUE grpc_rb_cChannelArgs;
+VALUE grpc_rb_cChannelArgs;
+
+typedef struct bg_watched_channel {
+  grpc_channel *channel;
+  struct bg_watched_channel *next;
+  int channel_destroyed;
+  int refcount; // must only be accessed under global_connection_polling_mu
+} bg_watched_channel;
 
 /* grpc_rb_channel wraps a grpc_channel. */
 typedef struct grpc_rb_channel {
   VALUE credentials;
 
-  /* The actual channel */
-  grpc_channel *wrapped;
-  int request_safe_destroy;
-  int safe_to_destroy;
-  grpc_connectivity_state current_connectivity_state;
-
-  int mu_init_done;
-  int abort_watch_connectivity_state;
-  gpr_mu channel_mu;
-  gpr_cv channel_cv;
+  /* The actual channel (protected in a wrapper to tell when it's safe to destroy) */
+  bg_watched_channel *bg_wrapped;
 } grpc_rb_channel;
 
-/* Forward declarations of functions involved in temporary fix to
- * https://github.com/grpc/grpc/issues/9941 */
-static void grpc_rb_channel_try_register_connection_polling(
-    grpc_rb_channel *wrapper);
-static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper);
-static void *wait_until_channel_polling_thread_started_no_gil(void*);
-static void wait_until_channel_polling_thread_started_unblocking_func(void*);
+typedef enum {
+  CONTINUOUS_WATCH,
+  WATCH_STATE_API
+} watch_state_op_type;
+
+typedef struct watch_state_op {
+  watch_state_op_type op_type;
+  // from event.success
+  union {
+    struct {
+      int success;
+      // has been called back due to a cq next call
+      int called_back;
+    } api_callback_args;
+    struct {
+      bg_watched_channel *bg;
+    } continuous_watch_callback_args;
+  } op;
+} watch_state_op;
+
+bg_watched_channel *bg_watched_channel_list_head = NULL;
+
+void grpc_rb_channel_try_register_connection_polling(bg_watched_channel *bg);
+void *wait_until_channel_polling_thread_started_no_gil(void*);
+void wait_until_channel_polling_thread_started_unblocking_func(void*);
+void *channel_init_try_register_connection_polling_without_gil(void *arg);
+
+typedef struct channel_init_try_register_stack {
+  grpc_channel *channel;
+  grpc_rb_channel *wrapper;
+} channel_init_try_register_stack;
+
+grpc_completion_queue *channel_polling_cq;
+gpr_mu global_connection_polling_mu;
+gpr_cv global_connection_polling_cv;
+int abort_channel_polling = 0;
+int channel_polling_thread_started = 0;
+
+int bg_watched_channel_list_lookup(bg_watched_channel *bg);
+bg_watched_channel *bg_watched_channel_list_create_and_add(grpc_channel *channel);
+void bg_watched_channel_list_free_and_remove(bg_watched_channel *bg);
+void run_poll_channels_loop_unblocking_func(void* arg);
+
+// Needs to be called under global_connection_polling_mu
+void grpc_rb_channel_watch_connection_state_op_complete(watch_state_op* op, int success) {
+  GPR_ASSERT(!op->op.api_callback_args.called_back);
+  op->op.api_callback_args.called_back = 1;
+  op->op.api_callback_args.success = success;
+  // wake up the watch API call thats waiting on this op
+  gpr_cv_broadcast(&global_connection_polling_cv);
+}
+
+/* Avoids destroying a channel twice. */
+void grpc_rb_channel_safe_destroy(bg_watched_channel *bg) {
+  gpr_mu_lock(&global_connection_polling_mu);
+  GPR_ASSERT(bg_watched_channel_list_lookup(bg));
+  if (!bg->channel_destroyed) {
+    grpc_channel_destroy(bg->channel);
+    bg->channel_destroyed = 1;
+  }
+  bg->refcount--;
+  if (bg->refcount == 0) {
+    bg_watched_channel_list_free_and_remove(bg);
+  }
+  gpr_mu_unlock(&global_connection_polling_mu);
+}
 
-static grpc_completion_queue *channel_polling_cq;
-static gpr_mu global_connection_polling_mu;
-static gpr_cv global_connection_polling_cv;
-static int abort_channel_polling = 0;
-static int channel_polling_thread_started = 0;
+void *channel_safe_destroy_without_gil(void *arg) {
+  grpc_rb_channel_safe_destroy((bg_watched_channel*)arg);
+  return NULL;
+}
 
 /* Destroys Channel instances. */
-static void grpc_rb_channel_free(void *p) {
+void grpc_rb_channel_free(void *p) {
   grpc_rb_channel *ch = NULL;
   if (p == NULL) {
     return;
   };
+  gpr_log(GPR_DEBUG, "channel GC function called!");
   ch = (grpc_rb_channel *)p;
 
-  if (ch->wrapped != NULL) {
-    grpc_rb_channel_safe_destroy(ch);
-    ch->wrapped = NULL;
-  }
-
-  if (ch->mu_init_done) {
-    gpr_mu_destroy(&ch->channel_mu);
-    gpr_cv_destroy(&ch->channel_cv);
+  if (ch->bg_wrapped != NULL) {
+    /* assumption made here: it's ok to directly gpr_mu_lock the global
+     * connection polling mutex becuse we're in a finalizer,
+     * and we can count on this thread to not be interrupted. */
+    grpc_rb_channel_safe_destroy(ch->bg_wrapped);
+    ch->bg_wrapped = NULL;
   }
 
   xfree(p);
 }
 
 /* Protects the mark object from GC */
-static void grpc_rb_channel_mark(void *p) {
+void grpc_rb_channel_mark(void *p) {
   grpc_rb_channel *channel = NULL;
   if (p == NULL) {
     return;
@@ -131,7 +187,7 @@ static void grpc_rb_channel_mark(void *p) {
   }
 }
 
-static rb_data_type_t grpc_channel_data_type = {"grpc_channel",
+rb_data_type_t grpc_channel_data_type = {"grpc_channel",
                                                 {grpc_rb_channel_mark,
                                                  grpc_rb_channel_free,
                                                  GRPC_RB_MEMSIZE_UNAVAILABLE,
@@ -144,9 +200,9 @@ static rb_data_type_t grpc_channel_data_type = {"grpc_channel",
 };
 
 /* Allocates grpc_rb_channel instances. */
-static VALUE grpc_rb_channel_alloc(VALUE cls) {
+VALUE grpc_rb_channel_alloc(VALUE cls) {
   grpc_rb_channel *wrapper = ALLOC(grpc_rb_channel);
-  wrapper->wrapped = NULL;
+  wrapper->bg_wrapped = NULL;
   wrapper->credentials = Qnil;
   return TypedData_Wrap_Struct(cls, &grpc_channel_data_type, wrapper);
 }
@@ -159,7 +215,7 @@ static VALUE grpc_rb_channel_alloc(VALUE cls) {
     secure_channel = Channel:new("myhost:443", {'arg1': 'value1'}, creds)
 
   Creates channel instances. */
-static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) {
+VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) {
   VALUE channel_args = Qnil;
   VALUE credentials = Qnil;
   VALUE target = Qnil;
@@ -168,6 +224,7 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) {
   grpc_channel_credentials *creds = NULL;
   char *target_chars = NULL;
   grpc_channel_args args;
+  channel_init_try_register_stack stack;
   MEMZERO(&args, grpc_channel_args, 1);
 
   grpc_ruby_once_init();
@@ -178,7 +235,6 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) {
   rb_scan_args(argc, argv, "3", &target, &channel_args, &credentials);
 
   TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
-  wrapper->mu_init_done = 0;
   target_chars = StringValueCStr(target);
   grpc_rb_hash_convert_to_channel_args(channel_args, &args);
   if (TYPE(credentials) == T_SYMBOL) {
@@ -195,24 +251,10 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) {
   }
 
   GPR_ASSERT(ch);
-
-  wrapper->wrapped = ch;
-
-  gpr_mu_init(&wrapper->channel_mu);
-  gpr_cv_init(&wrapper->channel_cv);
-  wrapper->mu_init_done = 1;
-
-  gpr_mu_lock(&wrapper->channel_mu);
-  wrapper->abort_watch_connectivity_state = 0;
-  wrapper->current_connectivity_state = grpc_channel_check_connectivity_state(wrapper->wrapped, 0);
-  wrapper->safe_to_destroy = 0;
-  wrapper->request_safe_destroy = 0;
-
-  gpr_cv_broadcast(&wrapper->channel_cv);
-  gpr_mu_unlock(&wrapper->channel_mu);
-
-
-  grpc_rb_channel_try_register_connection_polling(wrapper);
+  stack.channel = ch;
+  stack.wrapper = wrapper;
+  rb_thread_call_without_gvl(
+      channel_init_try_register_connection_polling_without_gil, &stack, NULL, NULL);
 
   if (args.args != NULL) {
     xfree(args.args); /* Allocated by grpc_rb_hash_convert_to_channel_args */
@@ -223,10 +265,32 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) {
     return Qnil;
   }
   rb_ivar_set(self, id_target, target);
-  wrapper->wrapped = ch;
   return self;
 }
 
+typedef struct get_state_stack {
+  grpc_channel *channel;
+  int try_to_connect;
+  int out;
+} get_state_stack;
+
+void *get_state_without_gil(void *arg) {
+  get_state_stack *stack = (get_state_stack*)arg;
+
+  gpr_mu_lock(&global_connection_polling_mu);
+  GPR_ASSERT(abort_channel_polling || channel_polling_thread_started);
+  if (abort_channel_polling) {
+    // the case in which the channel polling thread
+    // failed to start just always shows shutdown state.
+    stack->out = GRPC_CHANNEL_SHUTDOWN;
+  } else {
+    stack->out = grpc_channel_check_connectivity_state(stack->channel, stack->try_to_connect);
+  }
+  gpr_mu_unlock(&global_connection_polling_mu);
+
+  return NULL;
+}
+
 /*
   call-seq:
     ch.connectivity_state       -> state
@@ -236,62 +300,66 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) {
   constants defined in GRPC::Core::ConnectivityStates.
 
   It also tries to connect if the chennel is idle in the second form. */
-static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv,
+VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv,
                                                     VALUE self) {
   VALUE try_to_connect_param = Qfalse;
   int grpc_try_to_connect = 0;
   grpc_rb_channel *wrapper = NULL;
-  grpc_channel *ch = NULL;
+  get_state_stack stack;
 
   /* "01" == 0 mandatory args, 1 (try_to_connect) is optional */
   rb_scan_args(argc, argv, "01", &try_to_connect_param);
   grpc_try_to_connect = RTEST(try_to_connect_param) ? 1 : 0;
 
   TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
-  ch = wrapper->wrapped;
-  if (ch == NULL) {
+  if (wrapper->bg_wrapped == NULL) {
     rb_raise(rb_eRuntimeError, "closed!");
     return Qnil;
   }
-  return LONG2NUM(grpc_channel_check_connectivity_state(wrapper->wrapped, grpc_try_to_connect));
+
+  stack.channel = wrapper->bg_wrapped->channel;
+  stack.try_to_connect = grpc_try_to_connect;
+  rb_thread_call_without_gvl(get_state_without_gil, &stack, NULL, NULL);
+
+  return LONG2NUM(stack.out);
 }
 
 typedef struct watch_state_stack {
-  grpc_rb_channel *wrapper;
+  grpc_channel *channel;
   gpr_timespec deadline;
   int last_state;
 } watch_state_stack;
 
-static void *watch_channel_state_without_gvl(void *arg) {
+void *wait_for_watch_state_op_complete_without_gvl(void *arg) {
   watch_state_stack *stack = (watch_state_stack*)arg;
-  gpr_timespec deadline = stack->deadline;
-  grpc_rb_channel *wrapper = stack->wrapper;
-  int last_state = stack->last_state;
-  void *return_value = (void*)0;
+  watch_state_op *op = NULL;
+  void *success = (void*)0;
 
-  gpr_mu_lock(&wrapper->channel_mu);
-  while(wrapper->current_connectivity_state == last_state &&
-        !wrapper->request_safe_destroy &&
-        !wrapper->safe_to_destroy &&
-        !wrapper->abort_watch_connectivity_state &&
-        gpr_time_cmp(deadline, gpr_now(GPR_CLOCK_REALTIME)) > 0) {
-    gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, deadline);
+  gpr_mu_lock(&global_connection_polling_mu);
+  // its unsafe to do a "watch" after "channel polling abort" because the cq has
+  // been shut down.
+  if (abort_channel_polling) {
+    gpr_mu_unlock(&global_connection_polling_mu);
+    return (void*)0;
+  }
+  op = gpr_zalloc(sizeof(watch_state_op));
+  op->op_type = WATCH_STATE_API;
+  // one ref for this thread and another for the callback-running thread
+  grpc_channel_watch_connectivity_state(
+      stack->channel, stack->last_state, stack->deadline, channel_polling_cq, op);
+
+  while(!op->op.api_callback_args.called_back) {
+    gpr_cv_wait(&global_connection_polling_cv,
+                &global_connection_polling_mu,
+                gpr_inf_future(GPR_CLOCK_REALTIME));
   }
-  if (wrapper->current_connectivity_state != last_state) {
-    return_value = (void*)1;
+  if (op->op.api_callback_args.success) {
+    success = (void*)1;
   }
-  gpr_mu_unlock(&wrapper->channel_mu);
-
-  return return_value;
-}
+  gpr_free(op);
+  gpr_mu_unlock(&global_connection_polling_mu);
 
-static void watch_channel_state_unblocking_func(void *arg) {
-  grpc_rb_channel *wrapper = (grpc_rb_channel*)arg;
-  gpr_log(GPR_DEBUG, "GRPC_RUBY: watch channel state unblocking func called");
-  gpr_mu_lock(&wrapper->channel_mu);
-  wrapper->abort_watch_connectivity_state = 1;
-  gpr_cv_broadcast(&wrapper->channel_cv);
-  gpr_mu_unlock(&wrapper->channel_mu);
+  return success;
 }
 
 /* Wait until the channel's connectivity state becomes different from
@@ -301,16 +369,16 @@ static void watch_channel_state_unblocking_func(void *arg) {
  * Returns false if "deadline" expires before the channel's connectivity
  * state changes from "last_state".
  * */
-static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self,
+VALUE grpc_rb_channel_watch_connectivity_state(VALUE self,
                                                       VALUE last_state,
                                                       VALUE deadline) {
   grpc_rb_channel *wrapper = NULL;
   watch_state_stack stack;
-  void* out;
+  void* op_success = 0;
 
   TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
 
-  if (wrapper->wrapped == NULL) {
+  if (wrapper->bg_wrapped == NULL) {
     rb_raise(rb_eRuntimeError, "closed!");
     return Qnil;
   }
@@ -320,26 +388,25 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self,
     return Qnil;
   }
 
-  stack.wrapper = wrapper;
-  stack.deadline = grpc_rb_time_timeval(deadline, 0);
+  stack.channel = wrapper->bg_wrapped->channel;
+  stack.deadline = grpc_rb_time_timeval(deadline, 0),
   stack.last_state = NUM2LONG(last_state);
-  out = rb_thread_call_without_gvl(watch_channel_state_without_gvl, &stack, watch_channel_state_unblocking_func, wrapper);
-  if (out) {
-    return Qtrue;
-  }
-  return Qfalse;
+
+  op_success = rb_thread_call_without_gvl(
+      wait_for_watch_state_op_complete_without_gvl, &stack, run_poll_channels_loop_unblocking_func, NULL);
+
+  return op_success ? Qtrue : Qfalse;
 }
 
 /* Create a call given a grpc_channel, in order to call method. The request
    is not sent until grpc_call_invoke is called. */
-static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, VALUE mask,
+VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, VALUE mask,
                                          VALUE method, VALUE host,
                                          VALUE deadline) {
   VALUE res = Qnil;
   grpc_rb_channel *wrapper = NULL;
   grpc_call *call = NULL;
   grpc_call *parent_call = NULL;
-  grpc_channel *ch = NULL;
   grpc_completion_queue *cq = NULL;
   int flags = GRPC_PROPAGATE_DEFAULTS;
   grpc_slice method_slice;
@@ -361,8 +428,7 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, VALUE mask,
 
   cq = grpc_completion_queue_create(NULL);
   TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
-  ch = wrapper->wrapped;
-  if (ch == NULL) {
+  if (wrapper->bg_wrapped == NULL) {
     rb_raise(rb_eRuntimeError, "closed!");
     return Qnil;
   }
@@ -370,7 +436,7 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, VALUE mask,
   method_slice =
       grpc_slice_from_copied_buffer(RSTRING_PTR(method), RSTRING_LEN(method));
 
-  call = grpc_channel_create_call(ch, parent_call, flags, cq, method_slice,
+  call = grpc_channel_create_call(wrapper->bg_wrapped->channel, parent_call, flags, cq, method_slice,
                                   host_slice_ptr,
                                   grpc_rb_time_timeval(deadline,
                                                        /* absolute time */ 0),
@@ -396,85 +462,132 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, VALUE mask,
 }
 
 /* Closes the channel, calling it's destroy method */
-static VALUE grpc_rb_channel_destroy(VALUE self) {
+/* Note this is an API-level call; a wrapped channel's finalizer doesn't call
+ * this */
+VALUE grpc_rb_channel_destroy(VALUE self) {
   grpc_rb_channel *wrapper = NULL;
-  grpc_channel *ch = NULL;
 
   TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
-  ch = wrapper->wrapped;
-  if (ch != NULL) {
-    grpc_rb_channel_safe_destroy(wrapper);
-    wrapper->wrapped = NULL;
+  if (wrapper->bg_wrapped != NULL) {
+    rb_thread_call_without_gvl(
+        channel_safe_destroy_without_gil, wrapper->bg_wrapped, NULL, NULL);
+    wrapper->bg_wrapped = NULL;
   }
 
   return Qnil;
 }
 
 /* Called to obtain the target that this channel accesses. */
-static VALUE grpc_rb_channel_get_target(VALUE self) {
+VALUE grpc_rb_channel_get_target(VALUE self) {
   grpc_rb_channel *wrapper = NULL;
   VALUE res = Qnil;
   char *target = NULL;
 
   TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
-  target = grpc_channel_get_target(wrapper->wrapped);
+  target = grpc_channel_get_target(wrapper->bg_wrapped->channel);
   res = rb_str_new2(target);
   gpr_free(target);
 
   return res;
 }
 
-// Either start polling channel connection state or signal that it's free to
-// destroy.
-// Not safe to call while a channel's connection state is polled.
-static void grpc_rb_channel_try_register_connection_polling(
-  grpc_rb_channel *wrapper) {
-  grpc_connectivity_state conn_state;
-  gpr_timespec sleep_time = gpr_time_add(
-      gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(20, GPR_TIMESPAN));
-
-  GPR_ASSERT(wrapper);
-  GPR_ASSERT(wrapper->wrapped);
-  gpr_mu_lock(&wrapper->channel_mu);
-  if (wrapper->request_safe_destroy) {
-    wrapper->safe_to_destroy = 1;
-    gpr_cv_broadcast(&wrapper->channel_cv);
-    gpr_mu_unlock(&wrapper->channel_mu);
-    return;
+/* Needs to be called under global_connection_polling_mu */
+int bg_watched_channel_list_lookup(bg_watched_channel *target) {
+  bg_watched_channel *cur = bg_watched_channel_list_head;
+
+  gpr_log(GPR_DEBUG, "check contains");
+  while (cur != NULL) {
+    if (cur == target) {
+      return 1;
+    }
+    cur = cur->next;
   }
-  gpr_mu_lock(&global_connection_polling_mu);
 
-  GPR_ASSERT(channel_polling_thread_started || abort_channel_polling);
-  conn_state = grpc_channel_check_connectivity_state(wrapper->wrapped, 0);
-  if (conn_state != wrapper->current_connectivity_state) {
-    wrapper->current_connectivity_state = conn_state;
-    gpr_cv_broadcast(&wrapper->channel_cv);
-  }
-  // avoid posting work to the channel polling cq if it's been shutdown
-  if (!abort_channel_polling && conn_state != GRPC_CHANNEL_SHUTDOWN) {
-    grpc_channel_watch_connectivity_state(
-        wrapper->wrapped, conn_state, sleep_time, channel_polling_cq, wrapper);
-  } else {
-    wrapper->safe_to_destroy = 1;
-    gpr_cv_broadcast(&wrapper->channel_cv);
+  return 0;
+}
+
+/* Needs to be called under global_connection_polling_mu */
+bg_watched_channel *bg_watched_channel_list_create_and_add(grpc_channel *channel) {
+  bg_watched_channel *watched = gpr_zalloc(sizeof(bg_watched_channel));
+
+  gpr_log(GPR_DEBUG, "add bg");
+  watched->channel = channel;
+  watched->next = bg_watched_channel_list_head;
+  watched->refcount = 1;
+  bg_watched_channel_list_head = watched;
+  return watched;
+}
+
+/* Needs to be called under global_connection_polling_mu */
+void bg_watched_channel_list_free_and_remove(bg_watched_channel *target) {
+  bg_watched_channel *bg = NULL;
+
+  gpr_log(GPR_DEBUG, "remove bg");
+  GPR_ASSERT(bg_watched_channel_list_lookup(target));
+  GPR_ASSERT(target->channel_destroyed && target->refcount == 0);
+  if (bg_watched_channel_list_head == target) {
+    bg_watched_channel_list_head = target->next;
+    gpr_free(target);
+    return;
+  }
+  bg = bg_watched_channel_list_head;
+  while (bg != NULL && bg->next != NULL) {
+    if (bg->next == target) {
+      bg->next = bg->next->next;
+      gpr_free(target);
+      return;
+    }
+    bg = bg->next;
   }
+  GPR_ASSERT(0);
+}
+
+/* Initialize a grpc_rb_channel's "protected grpc_channel" and try to push
+ * it onto the background thread for constant watches. */
+void *channel_init_try_register_connection_polling_without_gil(void *arg) {
+  channel_init_try_register_stack *stack = (channel_init_try_register_stack*)arg;
+
+  gpr_mu_lock(&global_connection_polling_mu);
+  stack->wrapper->bg_wrapped = bg_watched_channel_list_create_and_add(stack->channel);
+  grpc_rb_channel_try_register_connection_polling(stack->wrapper->bg_wrapped);
   gpr_mu_unlock(&global_connection_polling_mu);
-  gpr_mu_unlock(&wrapper->channel_mu);
+  return NULL;
 }
 
-// Note requires wrapper->wrapped, wrapper->channel_mu/cv initialized
-static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper) {
-  gpr_mu_lock(&wrapper->channel_mu);
-  wrapper->request_safe_destroy = 1;
+// Needs to be called under global_connection_poolling_mu
+void grpc_rb_channel_try_register_connection_polling(bg_watched_channel *bg) {
+  grpc_connectivity_state conn_state;
+  watch_state_op *op = NULL;
 
-  while (!wrapper->safe_to_destroy) {
-    gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu,
-                gpr_inf_future(GPR_CLOCK_REALTIME));
+  GPR_ASSERT(channel_polling_thread_started || abort_channel_polling);
+
+  if (bg->refcount == 0) {
+    GPR_ASSERT(bg->channel_destroyed);
+    bg_watched_channel_list_free_and_remove(bg);
+    return;
+  }
+  GPR_ASSERT(bg->refcount == 1);
+  if (bg->channel_destroyed) {
+    GPR_ASSERT(abort_channel_polling);
+    return;
+  }
+  if (abort_channel_polling) {
+    return;
   }
-  GPR_ASSERT(wrapper->safe_to_destroy);
-  gpr_mu_unlock(&wrapper->channel_mu);
 
-  grpc_channel_destroy(wrapper->wrapped);
+  conn_state = grpc_channel_check_connectivity_state(bg->channel, 0);
+  if (conn_state == GRPC_CHANNEL_SHUTDOWN) {
+    return;
+  }
+  GPR_ASSERT(bg_watched_channel_list_lookup(bg));
+  // prevent bg from being free'd by GC while background thread is watching it
+  bg->refcount++;
+
+  op = gpr_zalloc(sizeof(watch_state_op));
+  op->op_type = CONTINUOUS_WATCH;
+  op->op.continuous_watch_callback_args.bg = bg;
+  grpc_channel_watch_connectivity_state(
+      bg->channel, conn_state, gpr_inf_future(GPR_CLOCK_REALTIME), channel_polling_cq, op);
 }
 
 // Note this loop breaks out with a single call of
@@ -483,8 +596,10 @@ static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper) {
 // indicates process shutdown.
 // In the worst case, this stops polling channel connectivity
 // early and falls back to current behavior.
-static void *run_poll_channels_loop_no_gil(void *arg) {
+void *run_poll_channels_loop_no_gil(void *arg) {
   grpc_event event;
+  watch_state_op *op = NULL;
+  bg_watched_channel *bg = NULL;
   (void)arg;
   gpr_log(GPR_DEBUG, "GRPC_RUBY: run_poll_channels_loop_no_gil - begin");
 
@@ -500,9 +615,21 @@ static void *run_poll_channels_loop_no_gil(void *arg) {
     if (event.type == GRPC_QUEUE_SHUTDOWN) {
       break;
     }
+    gpr_mu_lock(&global_connection_polling_mu);
     if (event.type == GRPC_OP_COMPLETE) {
-      grpc_rb_channel_try_register_connection_polling((grpc_rb_channel *)event.tag);
+      op = (watch_state_op*)event.tag;
+      if (op->op_type == CONTINUOUS_WATCH) {
+        bg = (bg_watched_channel*)op->op.continuous_watch_callback_args.bg;
+        bg->refcount--;
+        grpc_rb_channel_try_register_connection_polling(bg);
+        gpr_free(op);
+      } else if(op->op_type == WATCH_STATE_API) {
+        grpc_rb_channel_watch_connection_state_op_complete((watch_state_op*)event.tag, event.success);
+      } else {
+        GPR_ASSERT(0);
+      }
     }
+    gpr_mu_unlock(&global_connection_polling_mu);
   }
   grpc_completion_queue_destroy(channel_polling_cq);
   gpr_log(GPR_DEBUG, "GRPC_RUBY: run_poll_channels_loop_no_gil - exit connection polling loop");
@@ -510,17 +637,37 @@ static void *run_poll_channels_loop_no_gil(void *arg) {
 }
 
 // Notify the channel polling loop to cleanup and shutdown.
-static void run_poll_channels_loop_unblocking_func(void *arg) {
+void run_poll_channels_loop_unblocking_func(void *arg) {
+  bg_watched_channel *bg = NULL;
   (void)arg;
+
   gpr_mu_lock(&global_connection_polling_mu);
   gpr_log(GPR_DEBUG, "GRPC_RUBY: run_poll_channels_loop_unblocking_func - begin aborting connection polling");
+  // early out after first time through
+  if (abort_channel_polling) {
+    gpr_mu_unlock(&global_connection_polling_mu);
+    return;
+  }
   abort_channel_polling = 1;
+
+  // force pending watches to end by switching to shutdown state
+  bg = bg_watched_channel_list_head;
+  while(bg != NULL) {
+    if (!bg->channel_destroyed) {
+      grpc_channel_destroy(bg->channel);
+      bg->channel_destroyed = 1;
+    }
+    bg = bg->next;
+  }
+
   grpc_completion_queue_shutdown(channel_polling_cq);
+  gpr_cv_broadcast(&global_connection_polling_cv);
   gpr_mu_unlock(&global_connection_polling_mu);
+  gpr_log(GPR_DEBUG, "GRPC_RUBY: run_poll_channels_loop_unblocking_func - begin aborting connection polling 22222");
 }
 
 // Poll channel connectivity states in background thread without the GIL.
-static VALUE run_poll_channels_loop(VALUE arg) {
+VALUE run_poll_channels_loop(VALUE arg) {
   (void)arg;
   gpr_log(GPR_DEBUG, "GRPC_RUBY: run_poll_channels_loop - create connection polling thread");
   rb_thread_call_without_gvl(run_poll_channels_loop_no_gil, NULL,
@@ -529,7 +676,7 @@ static VALUE run_poll_channels_loop(VALUE arg) {
   return Qnil;
 }
 
-static void *wait_until_channel_polling_thread_started_no_gil(void *arg) {
+void *wait_until_channel_polling_thread_started_no_gil(void *arg) {
   (void)arg;
   gpr_log(GPR_DEBUG, "GRPC_RUBY: wait for channel polling thread to start");
   gpr_mu_lock(&global_connection_polling_mu);
@@ -542,7 +689,7 @@ static void *wait_until_channel_polling_thread_started_no_gil(void *arg) {
   return NULL;
 }
 
-static void wait_until_channel_polling_thread_started_unblocking_func(void* arg) {
+void wait_until_channel_polling_thread_started_unblocking_func(void* arg) {
   (void)arg;
   gpr_mu_lock(&global_connection_polling_mu);
   gpr_log(GPR_DEBUG, "GRPC_RUBY: wait_until_channel_polling_thread_started_unblocking_func - begin aborting connection polling");
@@ -551,6 +698,16 @@ static void wait_until_channel_polling_thread_started_unblocking_func(void* arg)
   gpr_mu_unlock(&global_connection_polling_mu);
 }
 
+static void *set_abort_channel_polling_without_gil(void *arg) {
+  (void)arg;
+  gpr_mu_lock(&global_connection_polling_mu);
+  abort_channel_polling = 1;
+  gpr_cv_broadcast(&global_connection_polling_cv);
+  gpr_mu_unlock(&global_connection_polling_mu);
+  return NULL;
+}
+
+
 /* Temporary fix for
  * https://github.com/GoogleCloudPlatform/google-cloud-ruby/issues/899.
  * Transports in idle channels can get destroyed. Normally c-core re-connects,
@@ -576,14 +733,11 @@ void grpc_rb_channel_polling_thread_start() {
 
   if (!RTEST(background_thread)) {
     gpr_log(GPR_DEBUG, "GRPC_RUBY: failed to spawn channel polling thread");
-    gpr_mu_lock(&global_connection_polling_mu);
-    abort_channel_polling = 1;
-    gpr_cv_broadcast(&global_connection_polling_cv);
-    gpr_mu_unlock(&global_connection_polling_mu);
+    rb_thread_call_without_gvl(set_abort_channel_polling_without_gil, NULL, NULL, NULL);
   }
 }
 
-static void Init_grpc_propagate_masks() {
+void Init_grpc_propagate_masks() {
   /* Constants representing call propagation masks in grpc.h */
   VALUE grpc_rb_mPropagateMasks =
       rb_define_module_under(grpc_rb_mGrpcCore, "PropagateMasks");
@@ -599,7 +753,7 @@ static void Init_grpc_propagate_masks() {
                   UINT2NUM(GRPC_PROPAGATE_DEFAULTS));
 }
 
-static void Init_grpc_connectivity_states() {
+void Init_grpc_connectivity_states() {
   /* Constants representing call propagation masks in grpc.h */
   VALUE grpc_rb_mConnectivityStates =
       rb_define_module_under(grpc_rb_mGrpcCore, "ConnectivityStates");
@@ -658,5 +812,5 @@ void Init_grpc_channel() {
 grpc_channel *grpc_rb_get_wrapped_channel(VALUE v) {
   grpc_rb_channel *wrapper = NULL;
   TypedData_Get_Struct(v, grpc_rb_channel, &grpc_channel_data_type, wrapper);
-  return wrapper->wrapped;
+  return wrapper->bg_wrapped->channel;
 }
diff --git a/src/ruby/ext/grpc/rb_completion_queue.c b/src/ruby/ext/grpc/rb_completion_queue.c
index fd75d2f691..9f3a81b1a8 100644
--- a/src/ruby/ext/grpc/rb_completion_queue.c
+++ b/src/ruby/ext/grpc/rb_completion_queue.c
@@ -71,12 +71,16 @@ static void *grpc_rb_completion_queue_pluck_no_gil(void *param) {
 }
 
 /* Helper function to free a completion queue. */
-void grpc_rb_completion_queue_destroy(grpc_completion_queue *cq) {
-  /* Every function that adds an event to a queue also synchronously plucks
-     that event from the queue, and holds a reference to the Ruby object that
-     holds the queue, so we only get to this point if all of those functions
-     have completed, and the queue is empty */
+void grpc_rb_completion_queue_safe_destroy(grpc_completion_queue *cq) {
+  grpc_event ev;
+
   grpc_completion_queue_shutdown(cq);
+  for(;;) {
+    ev = grpc_completion_queue_pluck(cq, NULL, gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
+    if (ev.type == GRPC_QUEUE_SHUTDOWN) {
+      break;
+    }
+  }
   grpc_completion_queue_destroy(cq);
 }
 
diff --git a/src/ruby/ext/grpc/rb_completion_queue.h b/src/ruby/ext/grpc/rb_completion_queue.h
index aa9dc6416a..eb041b28df 100644
--- a/src/ruby/ext/grpc/rb_completion_queue.h
+++ b/src/ruby/ext/grpc/rb_completion_queue.h
@@ -38,7 +38,7 @@
 
 #include <grpc/grpc.h>
 
-void grpc_rb_completion_queue_destroy(grpc_completion_queue *cq);
+void grpc_rb_completion_queue_safe_destroy(grpc_completion_queue *cq);
 
 /**
  * Makes the implementation of CompletionQueue#pluck available in other files
diff --git a/src/ruby/ext/grpc/rb_event_thread.c b/src/ruby/ext/grpc/rb_event_thread.c
index 9e85bbcfbf..f1a08a7b23 100644
--- a/src/ruby/ext/grpc/rb_event_thread.c
+++ b/src/ruby/ext/grpc/rb_event_thread.c
@@ -106,17 +106,17 @@ static void *grpc_rb_wait_for_event_no_gil(void *param) {
   grpc_rb_event *event = NULL;
   (void)param;
   gpr_mu_lock(&event_queue.mu);
-  while ((event = grpc_rb_event_queue_dequeue()) == NULL) {
+  while (!event_queue.abort) {
+    if ((event = grpc_rb_event_queue_dequeue()) != NULL) {
+      gpr_mu_unlock(&event_queue.mu);
+      return event;
+    }
     gpr_cv_wait(&event_queue.cv,
                 &event_queue.mu,
                 gpr_inf_future(GPR_CLOCK_REALTIME));
-    if (event_queue.abort) {
-      gpr_mu_unlock(&event_queue.mu);
-      return NULL;
-    }
   }
   gpr_mu_unlock(&event_queue.mu);
-  return event;
+  return NULL;
 }
 
 static void grpc_rb_event_unblocking_func(void *arg) {
diff --git a/src/ruby/ext/grpc/rb_server.c b/src/ruby/ext/grpc/rb_server.c
index 2286a99f24..2b0858c247 100644
--- a/src/ruby/ext/grpc/rb_server.c
+++ b/src/ruby/ext/grpc/rb_server.c
@@ -77,7 +77,7 @@ static void destroy_server(grpc_rb_server *server, gpr_timespec deadline) {
                                   gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
       }
       grpc_server_destroy(server->wrapped);
-      grpc_rb_completion_queue_destroy(server->queue);
+      grpc_rb_completion_queue_safe_destroy(server->queue);
       server->wrapped = NULL;
       server->queue = NULL;
     }
diff --git a/src/ruby/spec/channel_connection_spec.rb b/src/ruby/spec/channel_connection_spec.rb
index 940d68b9b0..b3edec8f93 100644
--- a/src/ruby/spec/channel_connection_spec.rb
+++ b/src/ruby/spec/channel_connection_spec.rb
@@ -28,6 +28,10 @@
 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
 require 'grpc'
+require 'timeout'
+
+include Timeout
+include GRPC::Core
 
 # A test message
 class EchoMsg
@@ -62,7 +66,7 @@ end
 EchoStub = EchoService.rpc_stub_class
 
 def start_server(port = 0)
-  @srv = GRPC::RpcServer.new
+  @srv = GRPC::RpcServer.new(pool_size: 1)
   server_port = @srv.add_http2_port("localhost:#{port}", :this_port_is_insecure)
   @srv.handle(EchoService)
   @server_thd = Thread.new { @srv.run }
@@ -138,4 +142,33 @@ describe 'channel connection behavior' do
 
     stop_server
   end
+
+  it 'observably connects and reconnects to transient server' \
+    ' when using the channel state API' do
+    timeout(180) do
+      port = start_server
+      ch = GRPC::Core::Channel.new("localhost:#{port}", {},
+                                   :this_channel_is_insecure)
+      stop_server
+
+      thds = []
+      50.times do
+        thds << Thread.new do
+          while ch.connectivity_state(true) != ConnectivityStates::READY
+            ch.watch_connectivity_state(
+              ConnectivityStates::READY, Time.now + 60)
+            break
+          end
+        end
+      end
+
+      sleep 0.01
+
+      start_server(port)
+
+      thds.each(&:join)
+
+      stop_server
+    end
+  end
 end
-- 
GitLab