From ec1588ba87d665cf629a8893d6070688a73ea7ef Mon Sep 17 00:00:00 2001
From: murgatroid99 <mlumish@google.com>
Date: Mon, 6 Jun 2016 15:37:45 -0700
Subject: [PATCH] Ruby: Moved completion queue entirely into extension code

---
 src/ruby/ext/grpc/rb_call.c             | 100 ++++++++---------
 src/ruby/ext/grpc/rb_completion_queue.c |  83 ++------------
 src/ruby/ext/grpc/rb_completion_queue.h |   9 +-
 src/ruby/ext/grpc/rb_grpc.c             |   4 +-
 src/ruby/ext/grpc/rb_server.c           | 138 ++++++++++++------------
 5 files changed, 126 insertions(+), 208 deletions(-)

diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c
index b436057c16..f2c567c7da 100644
--- a/src/ruby/ext/grpc/rb_call.c
+++ b/src/ruby/ext/grpc/rb_call.c
@@ -63,23 +63,10 @@ static VALUE grpc_rb_sBatchResult;
  * grpc_metadata_array. */
 static VALUE grpc_rb_cMdAry;
 
-/* id_cq is the name of the hidden ivar that preserves a reference to a
- * completion queue */
-static ID id_cq;
-
-/* id_flags is the name of the hidden ivar that preserves the value of
- * the flags used to create metadata from a Hash */
-static ID id_flags;
-
 /* id_credentials is the name of the hidden ivar that preserves the value
  * of the credentials added to the call */
 static ID id_credentials;
 
-/* id_input_md is the name of the hidden ivar that preserves the hash used to
- * create metadata, so that references to the strings it contains last as long
- * as the call the metadata is added to. */
-static ID id_input_md;
-
 /* id_metadata is name of the attribute used to access the metadata hash
  * received by the call and subsequently saved on it. */
 static ID id_metadata;
@@ -101,14 +88,23 @@ static VALUE sym_message;
 static VALUE sym_status;
 static VALUE sym_cancelled;
 
+typedef struct grpc_rb_call {
+  grpc_call *wrapped;
+  grpc_completion_queue *queue;
+} grpc_rb_call;
+
+static void destroy_call(grpc_rb_call *call) {
+  call = (grpc_rb_call *)p;
+  grpc_call_destroy(call->wrapped);
+  grpc_rb_completion_queue_destroy(call->queue);
+}
+
 /* Destroys a Call. */
 static void grpc_rb_call_destroy(void *p) {
-  grpc_call* call = NULL;
   if (p == NULL) {
     return;
   }
-  call = (grpc_call *)p;
-  grpc_call_destroy(call);
+  destroy_call((grpc_rb_call*)p);
 }
 
 static size_t md_ary_datasize(const void *p) {
@@ -167,15 +163,15 @@ const char *grpc_call_error_detail_of(grpc_call_error err) {
 /* Called by clients to cancel an RPC on the server.
    Can be called multiple times, from any thread. */
 static VALUE grpc_rb_call_cancel(VALUE self) {
-  grpc_call *call = NULL;
+  grpc_rb_call *call = NULL;
   grpc_call_error err;
   if (RTYPEDDATA_DATA(self) == NULL) {
     //This call has been closed
     return Qnil;
   }
 
-  TypedData_Get_Struct(self, grpc_call, &grpc_call_data_type, call);
-  err = grpc_call_cancel(call, NULL);
+  TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call);
+  err = grpc_call_cancel(call->wrapped, NULL);
   if (err != GRPC_CALL_OK) {
     rb_raise(grpc_rb_eCallError, "cancel failed: %s (code=%d)",
              grpc_call_error_detail_of(err), err);
@@ -189,10 +185,10 @@ static VALUE grpc_rb_call_cancel(VALUE self) {
    processed.
 */
 static VALUE grpc_rb_call_close(VALUE self) {
-  grpc_call *call = NULL;
-  TypedData_Get_Struct(self, grpc_call, &grpc_call_data_type, call);
+  grpc_rb_call *call = NULL;
+  TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call);
   if(call != NULL) {
-    grpc_call_destroy(call);
+    destroy_call(call);
     RTYPEDDATA_DATA(self) = NULL;
   }
   return Qnil;
@@ -201,14 +197,14 @@ static VALUE grpc_rb_call_close(VALUE self) {
 /* Called to obtain the peer that this call is connected to. */
 static VALUE grpc_rb_call_get_peer(VALUE self) {
   VALUE res = Qnil;
-  grpc_call *call = NULL;
+  grpc_rb_call *call = NULL;
   char *peer = NULL;
   if (RTYPEDDATA_DATA(self) == NULL) {
     rb_raise(grpc_rb_eCallError, "Cannot get peer value on closed call");
     return Qnil;
   }
-  TypedData_Get_Struct(self, grpc_call, &grpc_call_data_type, call);
-  peer = grpc_call_get_peer(call);
+  TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call);
+  peer = grpc_call_get_peer(call->wrapped);
   res = rb_str_new2(peer);
   gpr_free(peer);
 
@@ -217,16 +213,16 @@ static VALUE grpc_rb_call_get_peer(VALUE self) {
 
 /* Called to obtain the x509 cert of an authenticated peer. */
 static VALUE grpc_rb_call_get_peer_cert(VALUE self) {
-  grpc_call *call = NULL;
+  grpc_rb_call *call = NULL;
   VALUE res = Qnil;
   grpc_auth_context *ctx = NULL;
   if (RTYPEDDATA_DATA(self) == NULL) {
     rb_raise(grpc_rb_eCallError, "Cannot get peer cert on closed call");
     return Qnil;
   }
-  TypedData_Get_Struct(self, grpc_call, &grpc_call_data_type, call);
+  TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call);
 
-  ctx = grpc_call_auth_context(call);
+  ctx = grpc_call_auth_context(call->wrapped);
 
   if (!ctx || !grpc_auth_context_peer_is_authenticated(ctx)) {
     return Qnil;
@@ -326,21 +322,23 @@ static VALUE grpc_rb_call_set_write_flag(VALUE self, VALUE write_flag) {
 
   Sets credentials on a call */
 static VALUE grpc_rb_call_set_credentials(VALUE self, VALUE credentials) {
-  grpc_call *call = NULL;
+  grpc_rb_call *call = NULL;
   grpc_call_credentials *creds;
   grpc_call_error err;
   if (RTYPEDDATA_DATA(self) == NULL) {
     rb_raise(grpc_rb_eCallError, "Cannot set credentials of closed call");
     return Qnil;
   }
-  TypedData_Get_Struct(self, grpc_call, &grpc_call_data_type, call);
+  TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call);
   creds = grpc_rb_get_wrapped_call_credentials(credentials);
-  err = grpc_call_set_credentials(call, creds);
+  err = grpc_call_set_credentials(call->wrapped, creds);
   if (err != GRPC_CALL_OK) {
     rb_raise(grpc_rb_eCallError,
              "grpc_call_set_credentials failed with %s (code=%d)",
              grpc_call_error_detail_of(err), err);
   }
+  /* We need the credentials to be alive for as long as the call is alive,
+     but we don't care about destruction order. */
   rb_ivar_set(self, id_credentials, credentials);
   return Qnil;
 }
@@ -733,7 +731,6 @@ static VALUE grpc_run_batch_stack_build_result(run_batch_stack *st) {
 }
 
 /* call-seq:
-   cq = CompletionQueue.new
    ops = {
      GRPC::Core::CallOps::SEND_INITIAL_METADATA => <op_value>,
      GRPC::Core::CallOps::SEND_MESSAGE => <op_value>,
@@ -741,7 +738,7 @@ static VALUE grpc_run_batch_stack_build_result(run_batch_stack *st) {
    }
    tag = Object.new
    timeout = 10
-   call.start_batch(cq, tag, timeout, ops)
+   call.start_batch(tag, timeout, ops)
 
    Start a batch of operations defined in the array ops; when complete, post a
    completion of type 'tag' to the completion queue bound to the call.
@@ -750,20 +747,20 @@ static VALUE grpc_run_batch_stack_build_result(run_batch_stack *st) {
    The order of ops specified in the batch has no significance.
    Only one operation of each type can be active at once in any given
    batch */
-static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag,
-                                    VALUE timeout, VALUE ops_hash) {
+static VALUE grpc_rb_call_run_batch(VALUE self, VALUE ops_hash) {
   run_batch_stack st;
-  grpc_call *call = NULL;
+  grpc_rb_call *call = NULL;
   grpc_event ev;
   grpc_call_error err;
   VALUE result = Qnil;
   VALUE rb_write_flag = rb_ivar_get(self, id_write_flag);
   unsigned write_flag = 0;
+  void *tag = (void*)&st;
   if (RTYPEDDATA_DATA(self) == NULL) {
     rb_raise(grpc_rb_eCallError, "Cannot run batch on closed call");
     return Qnil;
   }
-  TypedData_Get_Struct(self, grpc_call, &grpc_call_data_type, call);
+  TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call);
 
   /* Validate the ops args, adding them to a ruby array */
   if (TYPE(ops_hash) != T_HASH) {
@@ -778,7 +775,7 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag,
 
   /* call grpc_call_start_batch, then wait for it to complete using
    * pluck_event */
-  err = grpc_call_start_batch(call, st.ops, st.op_num, ROBJECT(tag), NULL);
+  err = grpc_call_start_batch(call->wrapped, st.ops, st.op_num, tag, NULL);
   if (err != GRPC_CALL_OK) {
     grpc_run_batch_stack_cleanup(&st);
     rb_raise(grpc_rb_eCallError,
@@ -786,12 +783,7 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag,
              grpc_call_error_detail_of(err), err);
     return Qnil;
   }
-  ev = grpc_rb_completion_queue_pluck_event(cqueue, tag, timeout);
-  if (ev.type == GRPC_QUEUE_TIMEOUT) {
-    grpc_run_batch_stack_cleanup(&st);
-    rb_raise(grpc_rb_eOutOfTime, "grpc_call_start_batch timed out");
-    return Qnil;
-  }
+  ev = grpc_rb_completion_queue_pluck(call->queue, tag, gpr_inf_future, NULL);
 
   /* Build and return the BatchResult struct result,
      if there is an error, it's reflected in the status */
@@ -900,7 +892,7 @@ void Init_grpc_call() {
                    1);
 
   /* Add ruby analogues of the Call methods. */
-  rb_define_method(grpc_rb_cCall, "run_batch", grpc_rb_call_run_batch, 4);
+  rb_define_method(grpc_rb_cCall, "run_batch", grpc_rb_call_run_batch, 1);
   rb_define_method(grpc_rb_cCall, "cancel", grpc_rb_call_cancel, 0);
   rb_define_method(grpc_rb_cCall, "close", grpc_rb_call_close, 0);
   rb_define_method(grpc_rb_cCall, "peer", grpc_rb_call_get_peer, 0);
@@ -921,9 +913,6 @@ void Init_grpc_call() {
   id_write_flag = rb_intern("write_flag");
 
   /* Ids used by the c wrapping internals. */
-  id_cq = rb_intern("__cq");
-  id_flags = rb_intern("__flags");
-  id_input_md = rb_intern("__input_md");
   id_credentials = rb_intern("__credentials");
 
   /* Ids used in constructing the batch result. */
@@ -947,15 +936,18 @@ void Init_grpc_call() {
 
 /* Gets the call from the ruby object */
 grpc_call *grpc_rb_get_wrapped_call(VALUE v) {
-  grpc_call *c = NULL;
-  TypedData_Get_Struct(v, grpc_call, &grpc_call_data_type, c);
-  return c;
+  grpc_rb_call *call = NULL;
+  TypedData_Get_Struct(v, grpc_rb_call, &grpc_call_data_type, call);
+  return call->wrapped;
 }
 
 /* Obtains the wrapped object for a given call */
-VALUE grpc_rb_wrap_call(grpc_call *c) {
-  if (c == NULL) {
+VALUE grpc_rb_wrap_call(grpc_call *c, grpc_completion_queue *q) {
+  if (c == NULL || q == NULL) {
     return Qnil;
   }
-  return TypedData_Wrap_Struct(grpc_rb_cCall, &grpc_call_data_type, c);
+  grpc_rb_call *wrapper = ALLOC(grpc_rb_call);
+  wrapper->wrapped = c;
+  wrapper->queue = q;
+  return TypedData_Wrap_Struct(grpc_rb_cCall, &grpc_call_data_type, wrapper);
 }
diff --git a/src/ruby/ext/grpc/rb_completion_queue.c b/src/ruby/ext/grpc/rb_completion_queue.c
index 9466402db0..1ac2ef2f33 100644
--- a/src/ruby/ext/grpc/rb_completion_queue.c
+++ b/src/ruby/ext/grpc/rb_completion_queue.c
@@ -42,10 +42,6 @@
 #include <grpc/support/time.h>
 #include "rb_grpc.h"
 
-/* grpc_rb_cCompletionQueue is the ruby class that proxies
- * grpc_completion_queue. */
-static VALUE grpc_rb_cCompletionQueue = Qnil;
-
 /* Used to allow grpc_completion_queue_next call to release the GIL */
 typedef struct next_call_stack {
   grpc_completion_queue *cq;
@@ -128,7 +124,7 @@ static void grpc_rb_completion_queue_shutdown_drain(grpc_completion_queue *cq) {
 }
 
 /* Helper function to free a completion queue. */
-static void grpc_rb_completion_queue_destroy(void *p) {
+void grpc_rb_completion_queue_destroy(grpc_completion_queue *cq) {
   grpc_completion_queue *cq = NULL;
   if (p == NULL) {
     return;
@@ -138,59 +134,22 @@ static void grpc_rb_completion_queue_destroy(void *p) {
   grpc_completion_queue_destroy(cq);
 }
 
-static rb_data_type_t grpc_rb_completion_queue_data_type = {
-    "grpc_completion_queue",
-    {GRPC_RB_GC_NOT_MARKED, grpc_rb_completion_queue_destroy,
-     GRPC_RB_MEMSIZE_UNAVAILABLE, {NULL, NULL}},
-    NULL, NULL,
-#ifdef RUBY_TYPED_FREE_IMMEDIATELY
-    /* cannot immediately free because grpc_rb_completion_queue_shutdown_drain
-     * calls rb_thread_call_without_gvl. */
-    0,
-#endif
-};
-
-/* Releases the c-level resources associated with a completion queue */
-static VALUE grpc_rb_completion_queue_close(VALUE self) {
-  grpc_completion_queue* cq = grpc_rb_get_wrapped_completion_queue(self);
-  grpc_rb_completion_queue_destroy(cq);
-  RTYPEDDATA_DATA(self) = NULL;
-  return Qnil;
-}
-
-/* Allocates a completion queue. */
-static VALUE grpc_rb_completion_queue_alloc(VALUE cls) {
-  grpc_completion_queue *cq = grpc_completion_queue_create(NULL);
-  if (cq == NULL) {
-    rb_raise(rb_eArgError, "could not create a completion queue: not sure why");
-  }
-  return TypedData_Wrap_Struct(cls, &grpc_rb_completion_queue_data_type, cq);
-}
-
 static void unblock_func(void *param) {
   next_call_stack *const next_call = (next_call_stack*)param;
   next_call->interrupted = 1;
 }
 
-/* Blocks until the next event for given tag is available, and returns the
- * event. */
-grpc_event grpc_rb_completion_queue_pluck_event(VALUE self, VALUE tag,
-                                                VALUE timeout) {
+/* Does the same thing as grpc_completion_queue_pluck, while properly releasing
+   the GVL and handling interrupts */
+grpc_event rb_completion_queue_pluck(grpc_completion_queue queue, void *tag,
+                                     gpr_timespec deadline, void *reserved) {
   next_call_stack next_call;
   MEMZERO(&next_call, next_call_stack, 1);
-  TypedData_Get_Struct(self, grpc_completion_queue,
-                       &grpc_rb_completion_queue_data_type, next_call.cq);
-  if (TYPE(timeout) == T_NIL) {
-    next_call.timeout = gpr_inf_future(GPR_CLOCK_REALTIME);
-  } else {
-    next_call.timeout = grpc_rb_time_timeval(timeout, /* absolute time*/ 0);
-  }
-  if (TYPE(tag) == T_NIL) {
-    next_call.tag = NULL;
-  } else {
-    next_call.tag = ROBJECT(tag);
-  }
+  next_call.cq = queue;
+  next_call.timeout = deadline;
+  next_call.tag = tag;
   next_call.event.type = GRPC_QUEUE_TIMEOUT;
+  (void)reserved;
   /* Loop until we finish a pluck without an interruption. The internal
      pluck function runs either until it is interrupted or it gets an
      event, or time runs out.
@@ -210,27 +169,3 @@ grpc_event grpc_rb_completion_queue_pluck_event(VALUE self, VALUE tag,
            next_call.event.type == GRPC_QUEUE_TIMEOUT);
   return next_call.event;
 }
-
-void Init_grpc_completion_queue() {
-  grpc_rb_cCompletionQueue =
-      rb_define_class_under(grpc_rb_mGrpcCore, "CompletionQueue", rb_cObject);
-
-  /* constructor: uses an alloc func without an initializer. Using a simple
-     alloc func works here as the grpc header does not specify any args for
-     this func, so no separate initialization step is necessary. */
-  rb_define_alloc_func(grpc_rb_cCompletionQueue,
-                       grpc_rb_completion_queue_alloc);
-
-  /* close: Provides a way to close the underlying file descriptor without
-     waiting for ruby garbage collection. */
-  rb_define_method(grpc_rb_cCompletionQueue, "close",
-                   grpc_rb_completion_queue_close, 0);
-}
-
-/* Gets the wrapped completion queue from the ruby wrapper */
-grpc_completion_queue *grpc_rb_get_wrapped_completion_queue(VALUE v) {
-  grpc_completion_queue *cq = NULL;
-  TypedData_Get_Struct(v, grpc_completion_queue,
-                       &grpc_rb_completion_queue_data_type, cq);
-  return cq;
-}
diff --git a/src/ruby/ext/grpc/rb_completion_queue.h b/src/ruby/ext/grpc/rb_completion_queue.h
index 42de43c3fb..3543e86275 100644
--- a/src/ruby/ext/grpc/rb_completion_queue.h
+++ b/src/ruby/ext/grpc/rb_completion_queue.h
@@ -41,15 +41,14 @@
 /* Gets the wrapped completion queue from the ruby wrapper */
 grpc_completion_queue *grpc_rb_get_wrapped_completion_queue(VALUE v);
 
+void grpc_rb_completion_queue_destroy(grpc_completion_queue *cq);
+
 /**
  * Makes the implementation of CompletionQueue#pluck available in other files
  *
  * This avoids having code that holds the GIL repeated at multiple sites.
  */
-grpc_event grpc_rb_completion_queue_pluck_event(VALUE self, VALUE tag,
-                                                VALUE timeout);
-
-/* Initializes the CompletionQueue class. */
-void Init_grpc_completion_queue();
+grpc_event rb_completion_queue_pluck(grpc_completion_queue queue, void *tag,
+                                     gpr_timespec deadline, void *reserved);
 
 #endif /* GRPC_RB_COMPLETION_QUEUE_H_ */
diff --git a/src/ruby/ext/grpc/rb_grpc.c b/src/ruby/ext/grpc/rb_grpc.c
index 8527da52d2..188a62475d 100644
--- a/src/ruby/ext/grpc/rb_grpc.c
+++ b/src/ruby/ext/grpc/rb_grpc.c
@@ -46,7 +46,6 @@
 #include "rb_call_credentials.h"
 #include "rb_channel.h"
 #include "rb_channel_credentials.h"
-#include "rb_completion_queue.h"
 #include "rb_loader.h"
 #include "rb_server.h"
 #include "rb_server_credentials.h"
@@ -318,7 +317,7 @@ void Init_grpc_c() {
   grpc_rb_mGrpcCore = rb_define_module_under(grpc_rb_mGRPC, "Core");
   grpc_rb_sNewServerRpc =
       rb_struct_define("NewServerRpc", "method", "host",
-                       "deadline", "metadata", "call", "cq", NULL);
+                       "deadline", "metadata", "call", NULL);
   grpc_rb_sStatus =
       rb_struct_define("Status", "code", "details", "metadata", NULL);
   sym_code = ID2SYM(rb_intern("code"));
@@ -326,7 +325,6 @@ void Init_grpc_c() {
   sym_metadata = ID2SYM(rb_intern("metadata"));
 
   Init_grpc_channel();
-  Init_grpc_completion_queue();
   Init_grpc_call();
   Init_grpc_call_credentials();
   Init_grpc_channel_credentials();
diff --git a/src/ruby/ext/grpc/rb_server.c b/src/ruby/ext/grpc/rb_server.c
index c92059d12c..cf430495c8 100644
--- a/src/ruby/ext/grpc/rb_server.c
+++ b/src/ruby/ext/grpc/rb_server.c
@@ -63,6 +63,22 @@ typedef struct grpc_rb_server {
   grpc_completion_queue *queue;
 } grpc_rb_server;
 
+static void destroy_server(grpc_rb_server *server, gpr_timespec deadline) {
+  grpc_event ev;
+  if (server->wrapped != NULL) {
+    grpc_server_shutdown_and_notify(server->wrapped, server->queue, NULL);
+    ev = rb_completion_queue_pluck(server->queue, NULL, deadline, NULL);
+    if (ev.type == GRPC_QUEUE_TIMEOUT) {
+      grpc_server_cancel_all_calls(server->wrapped);
+      rb_completion_queue_pluck(server->queue, NULL, gpr_inf_future, NULL);
+    }
+    grpc_server_destroy(server->wrapped);
+    grpc_rb_completion_queue_destroy(server->queue);
+    server->wrapped = NULL;
+    server->queue = NULL;
+  }
+}
+
 /* Destroys server instances. */
 static void grpc_rb_server_free(void *p) {
   grpc_rb_server *svr = NULL;
@@ -71,16 +87,8 @@ static void grpc_rb_server_free(void *p) {
   };
   svr = (grpc_rb_server *)p;
 
-  /* Deletes the wrapped object if the mark object is Qnil, which indicates
-     that no other object is the actual owner. */
-  /* grpc_server_shutdown does not exist. Change this to something that does
-     or delete it */
-  if (svr->wrapped != NULL && svr->mark == Qnil) {
-    // grpc_server_shutdown(svr->wrapped);
-    // Aborting to indicate a bug
-    abort();
-    grpc_server_destroy(svr->wrapped);
-  }
+  // TODO(murgatroid99): Maybe don't wait forever for the server to shutdown
+  destroy_server(svr, gpr_inf_future);
 
   xfree(p);
 }
@@ -122,17 +130,15 @@ static VALUE grpc_rb_server_alloc(VALUE cls) {
 
 /*
   call-seq:
-    cq = CompletionQueue.new
-    server = Server.new(cq, {'arg1': 'value1'})
+    server = Server.new({'arg1': 'value1'})
 
   Initializes server instances. */
-static VALUE grpc_rb_server_init(VALUE self, VALUE cqueue, VALUE channel_args) {
-  grpc_completion_queue *cq = NULL;
+static VALUE grpc_rb_server_init(VALUE self, VALUE channel_args) {
+  grpc_completion_queue *cq = grpc_completion_queue_create(NULL);
   grpc_rb_server *wrapper = NULL;
   grpc_server *srv = NULL;
   grpc_channel_args args;
   MEMZERO(&args, grpc_channel_args, 1);
-  cq = grpc_rb_get_wrapped_completion_queue(cqueue);
   TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type,
                        wrapper);
   grpc_rb_hash_convert_to_channel_args(channel_args, &args);
@@ -179,65 +185,57 @@ static void grpc_request_call_stack_cleanup(request_call_stack* st) {
 }
 
 /* call-seq:
-   cq = CompletionQueue.new
-   tag = Object.new
-   timeout = 10
-   server.request_call(cqueue, tag, timeout)
+   server.request_call
 
    Requests notification of a new call on a server. */
-static VALUE grpc_rb_server_request_call(VALUE self, VALUE cqueue,
-                                         VALUE tag_new, VALUE timeout) {
+static VALUE grpc_rb_server_request_call(VALUE self) {
   grpc_rb_server *s = NULL;
   grpc_call *call = NULL;
   grpc_event ev;
   grpc_call_error err;
   request_call_stack st;
   VALUE result;
+  void *tag = (void*)&st;
+  grpc_completion_queue *call_queue = grpc_completion_queue_create(NULL);
   gpr_timespec deadline;
   TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s);
   if (s->wrapped == NULL) {
     rb_raise(rb_eRuntimeError, "destroyed!");
     return Qnil;
-  } else {
-    grpc_request_call_stack_init(&st);
-    /* call grpc_server_request_call, then wait for it to complete using
-     * pluck_event */
-    err = grpc_server_request_call(
-        s->wrapped, &call, &st.details, &st.md_ary,
-        grpc_rb_get_wrapped_completion_queue(cqueue),
-        grpc_rb_get_wrapped_completion_queue(s->mark),
-        ROBJECT(tag_new));
-    if (err != GRPC_CALL_OK) {
-      grpc_request_call_stack_cleanup(&st);
-      rb_raise(grpc_rb_eCallError,
-              "grpc_server_request_call failed: %s (code=%d)",
-               grpc_call_error_detail_of(err), err);
-      return Qnil;
-    }
-
-    ev = grpc_rb_completion_queue_pluck_event(s->mark, tag_new, timeout);
-    if (ev.type == GRPC_QUEUE_TIMEOUT) {
-      grpc_request_call_stack_cleanup(&st);
-      return Qnil;
-    }
-    if (!ev.success) {
-      grpc_request_call_stack_cleanup(&st);
-      rb_raise(grpc_rb_eCallError, "request_call completion failed");
-      return Qnil;
-    }
+  }
+  grpc_request_call_stack_init(&st);
+  /* call grpc_server_request_call, then wait for it to complete using
+   * pluck_event */
+  err = grpc_server_request_call(
+      s->wrapped, &call, &st.details, &st.md_ary,
+      call_queue, s->queue, tag);
+  if (err != GRPC_CALL_OK) {
+    grpc_request_call_stack_cleanup(&st);
+    rb_raise(grpc_rb_eCallError,
+             "grpc_server_request_call failed: %s (code=%d)",
+             grpc_call_error_detail_of(err), err);
+    return Qnil;
+  }
 
-    /* build the NewServerRpc struct result */
-    deadline = gpr_convert_clock_type(st.details.deadline, GPR_CLOCK_REALTIME);
-    result = rb_struct_new(
-        grpc_rb_sNewServerRpc, rb_str_new2(st.details.method),
-        rb_str_new2(st.details.host),
-        rb_funcall(rb_cTime, id_at, 2, INT2NUM(deadline.tv_sec),
-                   INT2NUM(deadline.tv_nsec)),
-        grpc_rb_md_ary_to_h(&st.md_ary), grpc_rb_wrap_call(call), cqueue, NULL);
+  ev = rb_completion_queue_pluck(s->queue, tag,
+                                 gpr_inf_future, NULL);
+  if (!ev.success) {
     grpc_request_call_stack_cleanup(&st);
-    return result;
+    rb_raise(grpc_rb_eCallError, "request_call completion failed");
+    return Qnil;
   }
-  return Qnil;
+
+  /* build the NewServerRpc struct result */
+  deadline = gpr_convert_clock_type(st.details.deadline, GPR_CLOCK_REALTIME);
+  result = rb_struct_new(
+      grpc_rb_sNewServerRpc, rb_str_new2(st.details.method),
+      rb_str_new2(st.details.host),
+      rb_funcall(rb_cTime, id_at, 2, INT2NUM(deadline.tv_sec),
+                 INT2NUM(deadline.tv_nsec)),
+      grpc_rb_md_ary_to_h(&st.md_ary), grpc_rb_wrap_call(call, call_queue),
+      NULL);
+  grpc_request_call_stack_cleanup(&st);
+  return result;
 }
 
 static VALUE grpc_rb_server_start(VALUE self) {
@@ -275,19 +273,15 @@ static VALUE grpc_rb_server_destroy(int argc, VALUE *argv, VALUE self) {
   rb_scan_args(argc, argv, "11", &cqueue, &timeout);
   cq = grpc_rb_get_wrapped_completion_queue(cqueue);
   TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s);
-
-  if (s->wrapped != NULL) {
-    grpc_server_shutdown_and_notify(s->wrapped, cq, NULL);
-    ev = grpc_rb_completion_queue_pluck_event(cqueue, Qnil, timeout);
-    if (!ev.success) {
-      rb_warn("server shutdown failed, cancelling the calls, objects may leak");
-      grpc_server_cancel_all_calls(s->wrapped);
-      return Qfalse;
-    }
-    grpc_server_destroy(s->wrapped);
-    s->wrapped = NULL;
+  if (TYPE(timeout) == T_NIL) {
+    deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
+  } else {
+    deadline = grpc_rb_time_timeval(timeout, /* absolute time*/ 0);
   }
-  return Qtrue;
+
+  destroy_server(s, deadline);
+
+  return Qnil;
 }
 
 /*
@@ -347,13 +341,13 @@ void Init_grpc_server() {
   rb_define_alloc_func(grpc_rb_cServer, grpc_rb_server_alloc);
 
   /* Provides a ruby constructor and support for dup/clone. */
-  rb_define_method(grpc_rb_cServer, "initialize", grpc_rb_server_init, 2);
+  rb_define_method(grpc_rb_cServer, "initialize", grpc_rb_server_init, 1);
   rb_define_method(grpc_rb_cServer, "initialize_copy",
                    grpc_rb_cannot_init_copy, 1);
 
   /* Add the server methods. */
   rb_define_method(grpc_rb_cServer, "request_call",
-                   grpc_rb_server_request_call, 3);
+                   grpc_rb_server_request_call, 0);
   rb_define_method(grpc_rb_cServer, "start", grpc_rb_server_start, 0);
   rb_define_method(grpc_rb_cServer, "destroy", grpc_rb_server_destroy, -1);
   rb_define_alias(grpc_rb_cServer, "close", "destroy");
-- 
GitLab