From c0ecedba8379beee9480722ca1202e1cc44c124c Mon Sep 17 00:00:00 2001
From: murgatroid99 <mlumish@google.com>
Date: Mon, 16 May 2016 16:14:52 -0700
Subject: [PATCH] Made signal handling properly handle non-killing signals

---
 src/ruby/ext/grpc/rb_call.c             |  8 +---
 src/ruby/ext/grpc/rb_completion_queue.c | 62 +++++++++++++++++++++----
 src/ruby/ext/grpc/rb_completion_queue.h |  4 +-
 src/ruby/ext/grpc/rb_server.c           | 12 +----
 4 files changed, 57 insertions(+), 29 deletions(-)

diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c
index b43ad08eba..1b06273af9 100644
--- a/src/ruby/ext/grpc/rb_call.c
+++ b/src/ruby/ext/grpc/rb_call.c
@@ -722,10 +722,6 @@ static VALUE grpc_run_batch_stack_build_result(run_batch_stack *st) {
   return result;
 }
 
-static void run_batch_unblock_func(void *call) {
-  grpc_call_cancel((grpc_call*)call, NULL);
-}
-
 /* call-seq:
    cq = CompletionQueue.new
    ops = {
@@ -776,9 +772,7 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag,
              grpc_call_error_detail_of(err), err);
     return Qnil;
   }
-  ev = grpc_rb_completion_queue_pluck_event(cqueue, tag, timeout,
-                                            run_batch_unblock_func,
-                                            (void*)call);
+  ev = grpc_rb_completion_queue_pluck_event(cqueue, tag, timeout);
   if (ev.type == GRPC_QUEUE_TIMEOUT) {
     grpc_run_batch_stack_cleanup(&st);
     rb_raise(grpc_rb_eOutOfTime, "grpc_call_start_batch timed out");
diff --git a/src/ruby/ext/grpc/rb_completion_queue.c b/src/ruby/ext/grpc/rb_completion_queue.c
index 4f671807eb..605c7408b4 100644
--- a/src/ruby/ext/grpc/rb_completion_queue.c
+++ b/src/ruby/ext/grpc/rb_completion_queue.c
@@ -52,21 +52,47 @@ typedef struct next_call_stack {
   grpc_event event;
   gpr_timespec timeout;
   void *tag;
+  volatile int interrupted;
 } next_call_stack;
 
 /* Calls grpc_completion_queue_next without holding the ruby GIL */
 static void *grpc_rb_completion_queue_next_no_gil(void *param) {
   next_call_stack *const next_call = (next_call_stack*)param;
-  next_call->event =
-      grpc_completion_queue_next(next_call->cq, next_call->timeout, NULL);
+  gpr_timespec increment = gpr_time_from_millis(20, GPR_TIMESPAN);
+  gpr_timespec deadline;
+  do {
+    deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), increment);
+    if (gpr_time_cmp(deadline, next_call->timeout) > 0) {
+      // Then we have run out of time
+      break;
+    }
+    next_call->event = grpc_completion_queue_next(next_call->cq,
+                                                  deadline, NULL);
+    if (next_call->event.success) {
+      break;
+    }
+  } while (!next_call->interrupted);
   return NULL;
 }
 
 /* Calls grpc_completion_queue_pluck without holding the ruby GIL */
 static void *grpc_rb_completion_queue_pluck_no_gil(void *param) {
   next_call_stack *const next_call = (next_call_stack*)param;
-  next_call->event = grpc_completion_queue_pluck(next_call->cq, next_call->tag,
-                                                 next_call->timeout, NULL);
+  gpr_timespec increment = gpr_time_from_millis(20, GPR_TIMESPAN);
+  gpr_timespec deadline;
+  do {
+    deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), increment);
+    if (gpr_time_cmp(deadline, next_call->timeout) > 0) {
+      // Then we have run out of time
+      break;
+    }
+    next_call->event = grpc_completion_queue_pluck(next_call->cq,
+                                                   next_call->tag,
+                                                   deadline, NULL);
+    if (next_call->event.type != GRPC_QUEUE_TIMEOUT) {
+      break;
+    }
+  } while (!next_call->interrupted);
   return NULL;
 }
 
@@ -139,12 +165,15 @@ static VALUE grpc_rb_completion_queue_alloc(VALUE cls) {
   return TypedData_Wrap_Struct(cls, &grpc_rb_completion_queue_data_type, cq);
 }
 
+static void unblock_func(void *param) {
+  next_call_stack *const next_call = (next_call_stack*)param;
+  next_call->interrupted = 1;
+}
+
 /* Blocks until the next event for given tag is available, and returns the
  * event. */
 grpc_event grpc_rb_completion_queue_pluck_event(VALUE self, VALUE tag,
-                                                VALUE timeout,
-                                                rb_unblock_function_t *ubf,
-                                                void *unblock_arg) {
+                                                VALUE timeout) {
   next_call_stack next_call;
   MEMZERO(&next_call, next_call_stack, 1);
   TypedData_Get_Struct(self, grpc_completion_queue,
@@ -160,8 +189,23 @@ grpc_event grpc_rb_completion_queue_pluck_event(VALUE self, VALUE tag,
     next_call.tag = ROBJECT(tag);
   }
   next_call.event.type = GRPC_QUEUE_TIMEOUT;
-  rb_thread_call_without_gvl(grpc_rb_completion_queue_pluck_no_gil,
-                             (void *)&next_call, ubf, unblock_arg);
+  /* Loop until we finish a pluck without an interruption. The internal
+     pluck function runs either until it is interrupted or it gets an
+     event, or time runs out.
+
+     The basic reason we need this relatively complicated construction is that
+     we need to re-acquire the GVL when an interrupt comes in, so that the ruby
+     interpeter can do what it needs to do with the interrupt. But we also need
+     to get back to plucking when */
+  do {
+    next_call.interrupted = 0;
+    rb_thread_call_without_gvl(grpc_rb_completion_queue_pluck_no_gil,
+                               (void *)&next_call, unblock_func,
+                               (void *)&next_call);
+    /* If an interrupt prevented pluck from returning useful information, then
+       any plucks that did complete must have timed out */
+  } while (next_call.interrupted &&
+           next_call.event.type == GRPC_QUEUE_TIMEOUT);
   return next_call.event;
 }
 
diff --git a/src/ruby/ext/grpc/rb_completion_queue.h b/src/ruby/ext/grpc/rb_completion_queue.h
index 4bd5739869..42de43c3fb 100644
--- a/src/ruby/ext/grpc/rb_completion_queue.h
+++ b/src/ruby/ext/grpc/rb_completion_queue.h
@@ -47,9 +47,7 @@ grpc_completion_queue *grpc_rb_get_wrapped_completion_queue(VALUE v);
  * This avoids having code that holds the GIL repeated at multiple sites.
  */
 grpc_event grpc_rb_completion_queue_pluck_event(VALUE self, VALUE tag,
-                                                VALUE timeout,
-                                                rb_unblock_function_t *ubf,
-                                                void *unblock_arg);
+                                                VALUE timeout);
 
 /* Initializes the CompletionQueue class. */
 void Init_grpc_completion_queue();
diff --git a/src/ruby/ext/grpc/rb_server.c b/src/ruby/ext/grpc/rb_server.c
index aa7fa0af8e..0899feb685 100644
--- a/src/ruby/ext/grpc/rb_server.c
+++ b/src/ruby/ext/grpc/rb_server.c
@@ -207,11 +207,6 @@ static void grpc_request_call_stack_cleanup(request_call_stack* st) {
   grpc_call_details_destroy(&st->details);
 }
 
-static void request_call_unblock_func(void *ptr) {
-  grpc_rb_server *rb_srv = (grpc_rb_server*)ptr;
-  grpc_server_shutdown_and_notify(rb_srv->wrapped, rb_srv->queue, rb_srv);
-}
-
 /* call-seq:
    cq = CompletionQueue.new
    tag = Object.new
@@ -249,9 +244,7 @@ static VALUE grpc_rb_server_request_call(VALUE self, VALUE cqueue,
       return Qnil;
     }
 
-    ev = grpc_rb_completion_queue_pluck_event(cqueue, tag_new, timeout,
-                                              request_call_unblock_func,
-                                              (void*)s);
+    ev = grpc_rb_completion_queue_pluck_event(cqueue, tag_new, timeout);
     if (ev.type == GRPC_QUEUE_TIMEOUT) {
       grpc_request_call_stack_cleanup(&st);
       return Qnil;
@@ -314,8 +307,7 @@ static VALUE grpc_rb_server_destroy(int argc, VALUE *argv, VALUE self) {
 
   if (s->wrapped != NULL) {
     grpc_server_shutdown_and_notify(s->wrapped, cq, NULL);
-    ev = grpc_rb_completion_queue_pluck_event(cqueue, Qnil, timeout,
-                                              NULL, NULL);
+    ev = grpc_rb_completion_queue_pluck_event(cqueue, Qnil, timeout);
     if (!ev.success) {
       rb_warn("server shutdown failed, cancelling the calls, objects may leak");
       grpc_server_cancel_all_calls(s->wrapped);
-- 
GitLab