From 5ea4a99d8ccb7f1bc19b78e8cd86b770f1ddcc8e Mon Sep 17 00:00:00 2001
From: murgatroid99 <mlumish@google.com>
Date: Mon, 13 Jun 2016 10:36:41 -0700
Subject: [PATCH] Finished removing CompletionQueue from Ruby API, made some
 changes for clarity

---
 src/ruby/ext/grpc/rb_call.c               |  11 +-
 src/ruby/ext/grpc/rb_call.h               |   2 +-
 src/ruby/ext/grpc/rb_channel.c            |  23 +---
 src/ruby/ext/grpc/rb_completion_queue.c   |   7 +-
 src/ruby/ext/grpc/rb_completion_queue.h   |   2 +-
 src/ruby/ext/grpc/rb_server.c             |  32 +++--
 src/ruby/lib/grpc/generic/active_call.rb  |  70 ++++------
 src/ruby/lib/grpc/generic/bidi_call.rb    |  34 ++---
 src/ruby/lib/grpc/generic/client_stub.rb  |  11 +-
 src/ruby/lib/grpc/generic/rpc_server.rb   |  40 ++----
 src/ruby/lib/grpc/generic/service.rb      |   2 +-
 src/ruby/spec/call_spec.rb                |   3 +-
 src/ruby/spec/channel_spec.rb             |   5 +-
 src/ruby/spec/client_server_spec.rb       |  90 +++++--------
 src/ruby/spec/completion_queue_spec.rb    |  42 ------
 src/ruby/spec/generic/active_call_spec.rb | 148 ++++++++++------------
 src/ruby/spec/generic/client_stub_spec.rb |  75 ++++-------
 src/ruby/spec/generic/rpc_server_spec.rb  |  34 +----
 src/ruby/spec/pb/health/checker_spec.rb   |   2 -
 src/ruby/spec/server_spec.rb              |  44 +++----
 20 files changed, 235 insertions(+), 442 deletions(-)
 delete mode 100644 src/ruby/spec/completion_queue_spec.rb

diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c
index f2c567c7da..c1f5075b52 100644
--- a/src/ruby/ext/grpc/rb_call.c
+++ b/src/ruby/ext/grpc/rb_call.c
@@ -94,7 +94,6 @@ typedef struct grpc_rb_call {
 } grpc_rb_call;
 
 static void destroy_call(grpc_rb_call *call) {
-  call = (grpc_rb_call *)p;
   grpc_call_destroy(call->wrapped);
   grpc_rb_completion_queue_destroy(call->queue);
 }
@@ -783,8 +782,11 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE ops_hash) {
              grpc_call_error_detail_of(err), err);
     return Qnil;
   }
-  ev = grpc_rb_completion_queue_pluck(call->queue, tag, gpr_inf_future, NULL);
-
+  ev = rb_completion_queue_pluck(call->queue, tag,
+                                 gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
+  if (!ev.success) {
+    rb_raise(grpc_rb_eCallError, "call#run_batch failed somehow");
+  }
   /* Build and return the BatchResult struct result,
      if there is an error, it's reflected in the status */
   result = grpc_run_batch_stack_build_result(&st);
@@ -943,10 +945,11 @@ grpc_call *grpc_rb_get_wrapped_call(VALUE v) {
 
 /* Obtains the wrapped object for a given call */
 VALUE grpc_rb_wrap_call(grpc_call *c, grpc_completion_queue *q) {
+  grpc_rb_call *wrapper;
   if (c == NULL || q == NULL) {
     return Qnil;
   }
-  grpc_rb_call *wrapper = ALLOC(grpc_rb_call);
+  wrapper = ALLOC(grpc_rb_call);
   wrapper->wrapped = c;
   wrapper->queue = q;
   return TypedData_Wrap_Struct(grpc_rb_cCall, &grpc_call_data_type, wrapper);
diff --git a/src/ruby/ext/grpc/rb_call.h b/src/ruby/ext/grpc/rb_call.h
index 24adb3477b..56becdc5a4 100644
--- a/src/ruby/ext/grpc/rb_call.h
+++ b/src/ruby/ext/grpc/rb_call.h
@@ -42,7 +42,7 @@
 grpc_call* grpc_rb_get_wrapped_call(VALUE v);
 
 /* Gets the VALUE corresponding to given grpc_call. */
-VALUE grpc_rb_wrap_call(grpc_call* c);
+VALUE grpc_rb_wrap_call(grpc_call *c, grpc_completion_queue *q);
 
 /* Provides the details of an call error */
 const char* grpc_call_error_detail_of(grpc_call_error err);
diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c
index 9d29b96a35..f5190b85f7 100644
--- a/src/ruby/ext/grpc/rb_channel.c
+++ b/src/ruby/ext/grpc/rb_channel.c
@@ -55,11 +55,6 @@ static ID id_channel;
  * GCed before the channel */
 static ID id_target;
 
-/* id_cqueue is the name of the hidden ivar that preserves a reference to the
- * completion queue used to create the call, preserved so that it does not get
- * GCed before the channel */
-static ID id_cqueue;
-
 /* id_insecure_channel is used to indicate that a channel is insecure */
 static VALUE id_insecure_channel;
 
@@ -233,10 +228,9 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self,
 
 /* Create a call given a grpc_channel, in order to call method. The request
    is not sent until grpc_call_invoke is called. */
-static VALUE grpc_rb_channel_create_call(VALUE self, VALUE cqueue,
-                                         VALUE parent, VALUE mask,
-                                         VALUE method, VALUE host,
-                                         VALUE deadline) {
+static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent,
+                                         VALUE mask, VALUE method,
+                                         VALUE host, VALUE deadline) {
   VALUE res = Qnil;
   grpc_rb_channel *wrapper = NULL;
   grpc_call *call = NULL;
@@ -256,7 +250,7 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE cqueue,
     parent_call = grpc_rb_get_wrapped_call(parent);
   }
 
-  cq = grpc_rb_get_wrapped_completion_queue(cqueue);
+  cq = grpc_completion_queue_create(NULL);
   TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
   ch = wrapper->wrapped;
   if (ch == NULL) {
@@ -273,15 +267,11 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE cqueue,
              method_chars);
     return Qnil;
   }
-  res = grpc_rb_wrap_call(call);
+  res = grpc_rb_wrap_call(call, cq);
 
   /* Make this channel an instance attribute of the call so that it is not GCed
    * before the call. */
   rb_ivar_set(res, id_channel, self);
-
-  /* Make the completion queue an instance attribute of the call so that it is
-   * not GCed before the call. */
-  rb_ivar_set(res, id_cqueue, cqueue);
   return res;
 }
 
@@ -368,13 +358,12 @@ void Init_grpc_channel() {
   rb_define_method(grpc_rb_cChannel, "watch_connectivity_state",
                    grpc_rb_channel_watch_connectivity_state, 4);
   rb_define_method(grpc_rb_cChannel, "create_call",
-                   grpc_rb_channel_create_call, 6);
+                   grpc_rb_channel_create_call, 5);
   rb_define_method(grpc_rb_cChannel, "target", grpc_rb_channel_get_target, 0);
   rb_define_method(grpc_rb_cChannel, "destroy", grpc_rb_channel_destroy, 0);
   rb_define_alias(grpc_rb_cChannel, "close", "destroy");
 
   id_channel = rb_intern("__channel");
-  id_cqueue = rb_intern("__cqueue");
   id_target = rb_intern("__target");
   rb_define_const(grpc_rb_cChannel, "SSL_TARGET",
                   ID2SYM(rb_intern(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG)));
diff --git a/src/ruby/ext/grpc/rb_completion_queue.c b/src/ruby/ext/grpc/rb_completion_queue.c
index 1ac2ef2f33..1a69a175c4 100644
--- a/src/ruby/ext/grpc/rb_completion_queue.c
+++ b/src/ruby/ext/grpc/rb_completion_queue.c
@@ -125,11 +125,6 @@ static void grpc_rb_completion_queue_shutdown_drain(grpc_completion_queue *cq) {
 
 /* Helper function to free a completion queue. */
 void grpc_rb_completion_queue_destroy(grpc_completion_queue *cq) {
-  grpc_completion_queue *cq = NULL;
-  if (p == NULL) {
-    return;
-  }
-  cq = (grpc_completion_queue *)p;
   grpc_rb_completion_queue_shutdown_drain(cq);
   grpc_completion_queue_destroy(cq);
 }
@@ -141,7 +136,7 @@ static void unblock_func(void *param) {
 
 /* Does the same thing as grpc_completion_queue_pluck, while properly releasing
    the GVL and handling interrupts */
-grpc_event rb_completion_queue_pluck(grpc_completion_queue queue, void *tag,
+grpc_event rb_completion_queue_pluck(grpc_completion_queue *queue, void *tag,
                                      gpr_timespec deadline, void *reserved) {
   next_call_stack next_call;
   MEMZERO(&next_call, next_call_stack, 1);
diff --git a/src/ruby/ext/grpc/rb_completion_queue.h b/src/ruby/ext/grpc/rb_completion_queue.h
index 3543e86275..9f8f6aa5ff 100644
--- a/src/ruby/ext/grpc/rb_completion_queue.h
+++ b/src/ruby/ext/grpc/rb_completion_queue.h
@@ -48,7 +48,7 @@ void grpc_rb_completion_queue_destroy(grpc_completion_queue *cq);
  *
  * This avoids having code that holds the GIL repeated at multiple sites.
  */
-grpc_event rb_completion_queue_pluck(grpc_completion_queue queue, void *tag,
+grpc_event rb_completion_queue_pluck(grpc_completion_queue *queue, void *tag,
                                      gpr_timespec deadline, void *reserved);
 
 #endif /* GRPC_RB_COMPLETION_QUEUE_H_ */
diff --git a/src/ruby/ext/grpc/rb_server.c b/src/ruby/ext/grpc/rb_server.c
index cf430495c8..1ed2c35da3 100644
--- a/src/ruby/ext/grpc/rb_server.c
+++ b/src/ruby/ext/grpc/rb_server.c
@@ -70,7 +70,8 @@ static void destroy_server(grpc_rb_server *server, gpr_timespec deadline) {
     ev = rb_completion_queue_pluck(server->queue, NULL, deadline, NULL);
     if (ev.type == GRPC_QUEUE_TIMEOUT) {
       grpc_server_cancel_all_calls(server->wrapped);
-      rb_completion_queue_pluck(server->queue, NULL, gpr_inf_future, NULL);
+      rb_completion_queue_pluck(server->queue, NULL,
+                                gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
     }
     grpc_server_destroy(server->wrapped);
     grpc_rb_completion_queue_destroy(server->queue);
@@ -82,13 +83,17 @@ static void destroy_server(grpc_rb_server *server, gpr_timespec deadline) {
 /* Destroys server instances. */
 static void grpc_rb_server_free(void *p) {
   grpc_rb_server *svr = NULL;
+  gpr_timespec deadline;
   if (p == NULL) {
     return;
   };
   svr = (grpc_rb_server *)p;
 
-  // TODO(murgatroid99): Maybe don't wait forever for the server to shutdown
-  destroy_server(svr, gpr_inf_future);
+  deadline = gpr_time_add(
+      gpr_now(GPR_CLOCK_REALTIME),
+      gpr_time_from_seconds(2, GPR_TIMESPAN));
+
+  destroy_server(svr, deadline);
 
   xfree(p);
 }
@@ -154,9 +159,6 @@ static VALUE grpc_rb_server_init(VALUE self, VALUE channel_args) {
   wrapper->wrapped = srv;
   wrapper->queue = cq;
 
-  /* Add the cq as the server's mark object. This ensures the ruby cq can't be
-     GCed before the server */
-  wrapper->mark = cqueue;
   return self;
 }
 
@@ -218,7 +220,7 @@ static VALUE grpc_rb_server_request_call(VALUE self) {
   }
 
   ev = rb_completion_queue_pluck(s->queue, tag,
-                                 gpr_inf_future, NULL);
+                                 gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
   if (!ev.success) {
     grpc_request_call_stack_cleanup(&st);
     rb_raise(grpc_rb_eCallError, "request_call completion failed");
@@ -251,27 +253,23 @@ static VALUE grpc_rb_server_start(VALUE self) {
 
 /*
   call-seq:
-    cq = CompletionQueue.new
-    server = Server.new(cq, {'arg1': 'value1'})
+    server = Server.new({'arg1': 'value1'})
     ... // do stuff with server
     ...
     ... // to shutdown the server
-    server.destroy(cq)
+    server.destroy()
 
     ... // to shutdown the server with a timeout
-    server.destroy(cq, timeout)
+    server.destroy(timeout)
 
   Destroys server instances. */
 static VALUE grpc_rb_server_destroy(int argc, VALUE *argv, VALUE self) {
-  VALUE cqueue = Qnil;
   VALUE timeout = Qnil;
-  grpc_completion_queue *cq = NULL;
-  grpc_event ev;
+  gpr_timespec deadline;
   grpc_rb_server *s = NULL;
 
-  /* "11" == 1 mandatory args, 1 (timeout) is optional */
-  rb_scan_args(argc, argv, "11", &cqueue, &timeout);
-  cq = grpc_rb_get_wrapped_completion_queue(cqueue);
+  /* "01" == 0 mandatory args, 1 (timeout) is optional */
+  rb_scan_args(argc, argv, "01", &timeout);
   TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s);
   if (TYPE(timeout) == T_NIL) {
     deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb
index b03ddbc193..18126c4432 100644
--- a/src/ruby/lib/grpc/generic/active_call.rb
+++ b/src/ruby/lib/grpc/generic/active_call.rb
@@ -75,17 +75,10 @@ module GRPC
     # if a keyword value is a list, multiple metadata for it's key are sent
     #
     # @param call [Call] a call on which to start and invocation
-    # @param q [CompletionQueue] the completion queue
     # @param metadata [Hash] the metadata
-    def self.client_invoke(call, q, metadata = {})
+    def self.client_invoke(call, metadata = {})
       fail(TypeError, '!Core::Call') unless call.is_a? Core::Call
-      unless q.is_a? Core::CompletionQueue
-        fail(TypeError, '!Core::CompletionQueue')
-      end
-      metadata_tag = Object.new
-      call.run_batch(q, metadata_tag, INFINITE_FUTURE,
-                     SEND_INITIAL_METADATA => metadata)
-      metadata_tag
+      call.run_batch(SEND_INITIAL_METADATA => metadata)
     end
 
     # Creates an ActiveCall.
@@ -102,26 +95,21 @@ module GRPC
     # deadline is the absolute deadline for the call.
     #
     # @param call [Call] the call used by the ActiveCall
-    # @param q [CompletionQueue] the completion queue used to accept
-    #          the call.  This queue will be closed on call completion.
     # @param marshal [Function] f(obj)->string that marshal requests
     # @param unmarshal [Function] f(string)->obj that unmarshals responses
     # @param deadline [Fixnum] the deadline for the call to complete
-    # @param metadata_tag [Object] the object use obtain metadata for clients
-    # @param started [true|false] indicates if the call has begun
-    def initialize(call, q, marshal, unmarshal, deadline, started: true,
-                   metadata_tag: nil)
+    # @param started [true|false] indicates that metadata was sent
+    # @param metadata_received [true|false] indicates if metadata has already
+    #     been received. Should always be true for server calls
+    def initialize(call, marshal, unmarshal, deadline, started: true,
+                   metadata_received: false)
       fail(TypeError, '!Core::Call') unless call.is_a? Core::Call
-      unless q.is_a? Core::CompletionQueue
-        fail(TypeError, '!Core::CompletionQueue')
-      end
       @call = call
-      @cq = q
       @deadline = deadline
       @marshal = marshal
-      @started = started
       @unmarshal = unmarshal
-      @metadata_tag = metadata_tag
+      @metadata_received = metadata_received
+      @metadata_sent = started
       @op_notifier = nil
     end
 
@@ -168,7 +156,7 @@ module GRPC
         SEND_CLOSE_FROM_CLIENT => nil
       }
       ops[RECV_STATUS_ON_CLIENT] = nil if assert_finished
-      batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, ops)
+      batch_result = @call.run_batch(ops)
       return unless assert_finished
       @call.status = batch_result.status
       op_is_done
@@ -179,8 +167,7 @@ module GRPC
     #
     # It blocks until the remote endpoint acknowledges by sending a status.
     def finished
-      batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE,
-                                     RECV_STATUS_ON_CLIENT => nil)
+      batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil)
       unless batch_result.status.nil?
         if @call.metadata.nil?
           @call.metadata = batch_result.status.metadata
@@ -192,7 +179,6 @@ module GRPC
       op_is_done
       batch_result.check_status
       @call.close
-      @cq.close
     end
 
     # remote_send sends a request to the remote endpoint.
@@ -203,9 +189,10 @@ module GRPC
     # @param marshalled [false, true] indicates if the object is already
     # marshalled.
     def remote_send(req, marshalled = false)
+      # TODO(murgatroid99): ensure metadata was sent
       GRPC.logger.debug("sending #{req}, marshalled? #{marshalled}")
       payload = marshalled ? req : @marshal.call(req)
-      @call.run_batch(@cq, self, INFINITE_FUTURE, SEND_MESSAGE => payload)
+      @call.run_batch(SEND_MESSAGE => payload)
     end
 
     # send_status sends a status to the remote endpoint.
@@ -222,7 +209,7 @@ module GRPC
         SEND_STATUS_FROM_SERVER => Struct::Status.new(code, details, metadata)
       }
       ops[RECV_CLOSE_ON_SERVER] = nil if assert_finished
-      @call.run_batch(@cq, self, INFINITE_FUTURE, ops)
+      @call.run_batch(ops)
       nil
     end
 
@@ -234,11 +221,11 @@ module GRPC
     # raising BadStatus
     def remote_read
       ops = { RECV_MESSAGE => nil }
-      ops[RECV_INITIAL_METADATA] = nil unless @metadata_tag.nil?
-      batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, ops)
-      unless @metadata_tag.nil?
+      ops[RECV_INITIAL_METADATA] = nil unless @metadata_received
+      batch_result = @call.run_batch(ops)
+      unless @metadata_received
         @call.metadata = batch_result.metadata
-        @metadata_tag = nil
+        @metadata_received = true
       end
       GRPC.logger.debug("received req: #{batch_result}")
       unless batch_result.nil? || batch_result.message.nil?
@@ -318,7 +305,7 @@ module GRPC
     # a list, multiple metadata for its key are sent
     # @return [Object] the response received from the server
     def request_response(req, metadata: {})
-      start_call(metadata) unless @started
+      start_call(metadata)
       remote_send(req)
       writes_done(false)
       response = remote_read
@@ -342,7 +329,7 @@ module GRPC
     # a list, multiple metadata for its key are sent
     # @return [Object] the response received from the server
     def client_streamer(requests, metadata: {})
-      start_call(metadata) unless @started
+      start_call(metadata)
       requests.each { |r| remote_send(r) }
       writes_done(false)
       response = remote_read
@@ -368,7 +355,7 @@ module GRPC
     # a list, multiple metadata for its key are sent
     # @return [Enumerator|nil] a response Enumerator
     def server_streamer(req, metadata: {})
-      start_call(metadata) unless @started
+      start_call(metadata)
       remote_send(req)
       writes_done(false)
       replies = enum_for(:each_remote_read_then_finish)
@@ -407,10 +394,9 @@ module GRPC
     # a list, multiple metadata for its key are sent
     # @return [Enumerator, nil] a response Enumerator
     def bidi_streamer(requests, metadata: {}, &blk)
-      start_call(metadata) unless @started
-      bd = BidiCall.new(@call, @cq, @marshal, @unmarshal,
-                        metadata_tag: @metadata_tag)
-      @metadata_tag = nil  # run_on_client ensures metadata is read
+      start_call(metadata)
+      bd = BidiCall.new(@call, @marshal, @unmarshal,
+                        metadata_received: @metadata_received)
       bd.run_on_client(requests, @op_notifier, &blk)
     end
 
@@ -426,7 +412,7 @@ module GRPC
     #
     # @param gen_each_reply [Proc] generates the BiDi stream replies
     def run_server_bidi(gen_each_reply)
-      bd = BidiCall.new(@call, @cq, @marshal, @unmarshal)
+      bd = BidiCall.new(@call, @marshal, @unmarshal)
       bd.run_on_server(gen_each_reply)
     end
 
@@ -449,9 +435,9 @@ module GRPC
     # @param metadata [Hash] metadata to be sent to the server. If a value is
     # a list, multiple metadata for its key are sent
     def start_call(metadata = {})
-      return if @started
-      @metadata_tag = ActiveCall.client_invoke(@call, @cq, metadata)
-      @started = true
+      return if @metadata_sent
+      @metadata_tag = ActiveCall.client_invoke(@call, metadata)
+      @metadata_sent = true
     end
 
     def self.view_class(*visible_methods)
diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb
index 238f409a1d..f4f9d1b3dd 100644
--- a/src/ruby/lib/grpc/generic/bidi_call.rb
+++ b/src/ruby/lib/grpc/generic/bidi_call.rb
@@ -52,23 +52,18 @@ module GRPC
     # deadline is the absolute deadline for the call.
     #
     # @param call [Call] the call used by the ActiveCall
-    # @param q [CompletionQueue] the completion queue used to accept
-    #          the call
     # @param marshal [Function] f(obj)->string that marshal requests
     # @param unmarshal [Function] f(string)->obj that unmarshals responses
-    # @param metadata_tag [Object] tag object used to collect metadata
-    def initialize(call, q, marshal, unmarshal, metadata_tag: nil)
+    # @param metadata_received [true|false] indicates if metadata has already
+    #     been received. Should always be true for server calls
+    def initialize(call, marshal, unmarshal, metadata_received: false)
       fail(ArgumentError, 'not a call') unless call.is_a? Core::Call
-      unless q.is_a? Core::CompletionQueue
-        fail(ArgumentError, 'not a CompletionQueue')
-      end
       @call = call
-      @cq = q
       @marshal = marshal
       @op_notifier = nil  # signals completion on clients
       @readq = Queue.new
       @unmarshal = unmarshal
-      @metadata_tag = metadata_tag
+      @metadata_received = metadata_received
       @reads_complete = false
       @writes_complete = false
       @complete = false
@@ -124,7 +119,6 @@ module GRPC
       @done_mutex.synchronize do
         return unless @reads_complete && @writes_complete && !@complete
         @call.close
-        @cq.close
         @complete = true
       end
     end
@@ -132,11 +126,11 @@ module GRPC
     # performs a read using @call.run_batch, ensures metadata is set up
     def read_using_run_batch
       ops = { RECV_MESSAGE => nil }
-      ops[RECV_INITIAL_METADATA] = nil unless @metadata_tag.nil?
-      batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, ops)
-      unless @metadata_tag.nil?
+      ops[RECV_INITIAL_METADATA] = nil unless @metadata_received
+      batch_result = @call.run_batch(ops)
+      unless @metadata_received
         @call.metadata = batch_result.metadata
-        @metadata_tag = nil
+        @metadata_received = true
       end
       batch_result
     end
@@ -161,20 +155,18 @@ module GRPC
 
     def write_loop(requests, is_client: true)
       GRPC.logger.debug('bidi-write-loop: starting')
-      write_tag = Object.new
       count = 0
       requests.each do |req|
         GRPC.logger.debug("bidi-write-loop: #{count}")
         count += 1
         payload = @marshal.call(req)
-        @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
-                        SEND_MESSAGE => payload)
+        # Fails if status already received
+        @call.run_batch(SEND_MESSAGE => payload)
       end
       GRPC.logger.debug("bidi-write-loop: #{count} writes done")
       if is_client
         GRPC.logger.debug("bidi-write-loop: client sent #{count}, waiting")
-        @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
-                        SEND_CLOSE_FROM_CLIENT => nil)
+        @call.run_batch(SEND_CLOSE_FROM_CLIENT => nil)
         GRPC.logger.debug('bidi-write-loop: done')
         notify_done
         @writes_complete = true
@@ -195,7 +187,6 @@ module GRPC
       Thread.new do
         GRPC.logger.debug('bidi-read-loop: starting')
         begin
-          read_tag = Object.new
           count = 0
           # queue the initial read before beginning the loop
           loop do
@@ -208,8 +199,7 @@ module GRPC
               GRPC.logger.debug("bidi-read-loop: null batch #{batch_result}")
 
               if is_client
-                batch_result = @call.run_batch(@cq, read_tag, INFINITE_FUTURE,
-                                               RECV_STATUS_ON_CLIENT => nil)
+                batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil)
                 @call.status = batch_result.status
                 batch_result.check_status
                 GRPC.logger.debug("bidi-read-loop: done status #{@call.status}")
diff --git a/src/ruby/lib/grpc/generic/client_stub.rb b/src/ruby/lib/grpc/generic/client_stub.rb
index cddca13d17..9d6bd3bf59 100644
--- a/src/ruby/lib/grpc/generic/client_stub.rb
+++ b/src/ruby/lib/grpc/generic/client_stub.rb
@@ -90,19 +90,16 @@ module GRPC
     # when present, this is the default timeout used for calls
     #
     # @param host [String] the host the stub connects to
-    # @param q [Core::CompletionQueue] used to wait for events - now deprecated
-    #        since each new active call gets its own separately
     # @param creds [Core::ChannelCredentials|Symbol] the channel credentials, or
     #     :this_channel_is_insecure
     # @param channel_override [Core::Channel] a pre-created channel
     # @param timeout [Number] the default timeout to use in requests
     # @param channel_args [Hash] the channel arguments
-    def initialize(host, q, creds,
+    def initialize(host, creds,
                    channel_override: nil,
                    timeout: nil,
                    propagate_mask: nil,
                    channel_args: {})
-      fail(TypeError, '!CompletionQueue') unless q.is_a?(Core::CompletionQueue)
       @ch = ClientStub.setup_channel(channel_override, host, creds,
                                      channel_args)
       alt_host = channel_args[Core::Channel::SSL_TARGET]
@@ -441,15 +438,13 @@ module GRPC
 
       deadline = from_relative_time(@timeout) if deadline.nil?
       # Provide each new client call with its own completion queue
-      call_queue = Core::CompletionQueue.new
-      call = @ch.create_call(call_queue,
-                             parent, # parent call
+      call = @ch.create_call(parent, # parent call
                              @propagate_mask, # propagation options
                              method,
                              nil, # host use nil,
                              deadline)
       call.set_credentials! credentials unless credentials.nil?
-      ActiveCall.new(call, call_queue, marshal, unmarshal, deadline,
+      ActiveCall.new(call, marshal, unmarshal, deadline,
                      started: false)
     end
   end
diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb
index ab7333d133..c92a532a50 100644
--- a/src/ruby/lib/grpc/generic/rpc_server.rb
+++ b/src/ruby/lib/grpc/generic/rpc_server.rb
@@ -159,16 +159,6 @@ module GRPC
     # Signal check period is 0.25s
     SIGNAL_CHECK_PERIOD = 0.25
 
-    # setup_cq is used by #initialize to constuct a Core::CompletionQueue from
-    # its arguments.
-    def self.setup_cq(alt_cq)
-      return Core::CompletionQueue.new if alt_cq.nil?
-      unless alt_cq.is_a? Core::CompletionQueue
-        fail(TypeError, '!CompletionQueue')
-      end
-      alt_cq
-    end
-
     # setup_connect_md_proc is used by #initialize to validate the
     # connect_md_proc.
     def self.setup_connect_md_proc(a_proc)
@@ -191,10 +181,6 @@ module GRPC
     # * pool_size: the size of the thread pool the server uses to run its
     # threads
     #
-    # * completion_queue_override: when supplied, this will be used as the
-    # completion_queue that the server uses to receive network events,
-    # otherwise its creates a new instance itself
-    #
     # * creds: [GRPC::Core::ServerCredentials]
     # the credentials used to secure the server
     #
@@ -212,11 +198,9 @@ module GRPC
     def initialize(pool_size:DEFAULT_POOL_SIZE,
                    max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS,
                    poll_period:DEFAULT_POLL_PERIOD,
-                   completion_queue_override:nil,
                    connect_md_proc:nil,
                    server_args:{})
       @connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc)
-      @cq = RpcServer.setup_cq(completion_queue_override)
       @max_waiting_requests = max_waiting_requests
       @poll_period = poll_period
       @pool_size = pool_size
@@ -226,7 +210,7 @@ module GRPC
       # running_state can take 4 values: :not_started, :running, :stopping, and
       # :stopped. State transitions can only proceed in that order.
       @running_state = :not_started
-      @server = Core::Server.new(@cq, server_args)
+      @server = Core::Server.new(server_args)
     end
 
     # stops a running server
@@ -240,7 +224,7 @@ module GRPC
         transition_running_state(:stopping)
       end
       deadline = from_relative_time(@poll_period)
-      @server.close(@cq, deadline)
+      @server.close(deadline)
       @pool.stop
     end
 
@@ -355,7 +339,8 @@ module GRPC
       return an_rpc if @pool.jobs_waiting <= @max_waiting_requests
       GRPC.logger.warn("NOT AVAILABLE: too many jobs_waiting: #{an_rpc}")
       noop = proc { |x| x }
-      c = ActiveCall.new(an_rpc.call, an_rpc.cq, noop, noop, an_rpc.deadline)
+      c = ActiveCall.new(an_rpc.call, noop, noop, an_rpc.deadline,
+                         metadata_received: true)
       c.send_status(GRPC::Core::StatusCodes::RESOURCE_EXHAUSTED, '')
       nil
     end
@@ -366,7 +351,8 @@ module GRPC
       return an_rpc if rpc_descs.key?(mth)
       GRPC.logger.warn("UNIMPLEMENTED: #{an_rpc}")
       noop = proc { |x| x }
-      c = ActiveCall.new(an_rpc.call, an_rpc.cq, noop, noop, an_rpc.deadline)
+      c = ActiveCall.new(an_rpc.call, noop, noop, an_rpc.deadline,
+                         metadata_received: true)
       c.send_status(GRPC::Core::StatusCodes::UNIMPLEMENTED, '')
       nil
     end
@@ -374,11 +360,9 @@ module GRPC
     # handles calls to the server
     def loop_handle_server_calls
       fail 'not started' if running_state == :not_started
-      loop_tag = Object.new
       while running_state == :running
         begin
-          comp_queue = Core::CompletionQueue.new
-          an_rpc = @server.request_call(comp_queue, loop_tag, INFINITE_FUTURE)
+          an_rpc = @server.request_call
           break if (!an_rpc.nil?) && an_rpc.call.nil?
           active_call = new_active_server_call(an_rpc)
           unless active_call.nil?
@@ -410,15 +394,13 @@ module GRPC
       return nil if an_rpc.nil? || an_rpc.call.nil?
 
       # allow the metadata to be accessed from the call
-      handle_call_tag = Object.new
       an_rpc.call.metadata = an_rpc.metadata  # attaches md to call for handlers
       GRPC.logger.debug("call md is #{an_rpc.metadata}")
       connect_md = nil
       unless @connect_md_proc.nil?
         connect_md = @connect_md_proc.call(an_rpc.method, an_rpc.metadata)
       end
-      an_rpc.call.run_batch(an_rpc.cq, handle_call_tag, INFINITE_FUTURE,
-                            SEND_INITIAL_METADATA => connect_md)
+      an_rpc.call.run_batch(SEND_INITIAL_METADATA => connect_md)
 
       return nil unless available?(an_rpc)
       return nil unless implemented?(an_rpc)
@@ -426,9 +408,9 @@ module GRPC
       # Create the ActiveCall
       GRPC.logger.info("deadline is #{an_rpc.deadline}; (now=#{Time.now})")
       rpc_desc = rpc_descs[an_rpc.method.to_sym]
-      c = ActiveCall.new(an_rpc.call, an_rpc.cq,
-                         rpc_desc.marshal_proc, rpc_desc.unmarshal_proc(:input),
-                         an_rpc.deadline)
+      c = ActiveCall.new(an_rpc.call, rpc_desc.marshal_proc,
+                         rpc_desc.unmarshal_proc(:input), an_rpc.deadline,
+                         metadata_received: true)
       mth = an_rpc.method.to_sym
       [c, mth]
     end
diff --git a/src/ruby/lib/grpc/generic/service.rb b/src/ruby/lib/grpc/generic/service.rb
index f30242ee80..7cb9f1cc99 100644
--- a/src/ruby/lib/grpc/generic/service.rb
+++ b/src/ruby/lib/grpc/generic/service.rb
@@ -168,7 +168,7 @@ module GRPC
           # @param kw [KeywordArgs] the channel arguments, plus any optional
           #                         args for configuring the client's channel
           def initialize(host, creds, **kw)
-            super(host, Core::CompletionQueue.new, creds, **kw)
+            super(host, creds, **kw)
           end
 
           # Used define_method to add a method for each rpc_desc.  Each method
diff --git a/src/ruby/spec/call_spec.rb b/src/ruby/spec/call_spec.rb
index ae3ce0748a..1c44b333de 100644
--- a/src/ruby/spec/call_spec.rb
+++ b/src/ruby/spec/call_spec.rb
@@ -96,7 +96,6 @@ describe GRPC::Core::CallOps do
 end
 
 describe GRPC::Core::Call do
-  let(:client_queue) { GRPC::Core::CompletionQueue.new }
   let(:test_tag)  { Object.new }
   let(:fake_host) { 'localhost:10101' }
 
@@ -154,7 +153,7 @@ describe GRPC::Core::Call do
   end
 
   def make_test_call
-    @ch.create_call(client_queue, nil, nil, 'dummy_method', nil, deadline)
+    @ch.create_call(nil, nil, 'dummy_method', nil, deadline)
   end
 
   def deadline
diff --git a/src/ruby/spec/channel_spec.rb b/src/ruby/spec/channel_spec.rb
index 355f95c9d7..740eac631a 100644
--- a/src/ruby/spec/channel_spec.rb
+++ b/src/ruby/spec/channel_spec.rb
@@ -37,7 +37,6 @@ end
 
 describe GRPC::Core::Channel do
   let(:fake_host) { 'localhost:0' }
-  let(:cq) { GRPC::Core::CompletionQueue.new }
 
   def create_test_cert
     GRPC::Core::ChannelCredentials.new(load_test_certs[0])
@@ -122,7 +121,7 @@ describe GRPC::Core::Channel do
       deadline = Time.now + 5
 
       blk = proc do
-        ch.create_call(cq, nil, nil, 'dummy_method', nil, deadline)
+        ch.create_call(nil, nil, 'dummy_method', nil, deadline)
       end
       expect(&blk).to_not raise_error
     end
@@ -133,7 +132,7 @@ describe GRPC::Core::Channel do
 
       deadline = Time.now + 5
       blk = proc do
-        ch.create_call(cq, nil, nil, 'dummy_method', nil, deadline)
+        ch.create_call(nil, nil, 'dummy_method', nil, deadline)
       end
       expect(&blk).to raise_error(RuntimeError)
     end
diff --git a/src/ruby/spec/client_server_spec.rb b/src/ruby/spec/client_server_spec.rb
index aedeca272d..5440e6d023 100644
--- a/src/ruby/spec/client_server_spec.rb
+++ b/src/ruby/spec/client_server_spec.rb
@@ -34,27 +34,23 @@ include GRPC::Core
 shared_context 'setup: tags' do
   let(:sent_message) { 'sent message' }
   let(:reply_text) { 'the reply' }
-  before(:example) do
-    @client_tag = Object.new
-    @server_tag = Object.new
-  end
 
   def deadline
     Time.now + 5
   end
 
   def server_allows_client_to_proceed
-    recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline)
+    recvd_rpc = @server.request_call
     expect(recvd_rpc).to_not eq nil
     server_call = recvd_rpc.call
     ops = { CallOps::SEND_INITIAL_METADATA => {} }
-    svr_batch = server_call.run_batch(@server_queue, @server_tag, deadline, ops)
+    svr_batch = server_call.run_batch(ops)
     expect(svr_batch.send_metadata).to be true
     server_call
   end
 
   def new_client_call
-    @ch.create_call(@client_queue, nil, nil, '/method', nil, deadline)
+    @ch.create_call(nil, nil, '/method', nil, deadline)
   end
 end
 
@@ -91,8 +87,7 @@ shared_examples 'basic GRPC message delivery is OK' do
       CallOps::SEND_INITIAL_METADATA => {},
       CallOps::SEND_MESSAGE => sent_message
     }
-    batch_result = call.run_batch(@client_queue, @client_tag, deadline,
-                                  client_ops)
+    batch_result = call.run_batch(client_ops)
     expect(batch_result.send_metadata).to be true
     expect(batch_result.send_message).to be true
 
@@ -101,8 +96,7 @@ shared_examples 'basic GRPC message delivery is OK' do
     server_ops = {
       CallOps::RECV_MESSAGE => nil
     }
-    svr_batch = server_call.run_batch(@server_queue, @server_tag, deadline,
-                                      server_ops)
+    svr_batch = server_call.run_batch(server_ops)
     expect(svr_batch.message).to eq(sent_message)
   end
 
@@ -118,8 +112,7 @@ shared_examples 'basic GRPC message delivery is OK' do
       CallOps::SEND_INITIAL_METADATA => {},
       CallOps::SEND_MESSAGE => sent_message
     }
-    batch_result = call.run_batch(@client_queue, @client_tag, deadline,
-                                  client_ops)
+    batch_result = call.run_batch(client_ops)
     expect(batch_result.send_metadata).to be true
     expect(batch_result.send_message).to be true
 
@@ -129,8 +122,7 @@ shared_examples 'basic GRPC message delivery is OK' do
       CallOps::RECV_MESSAGE => nil,
       CallOps::SEND_MESSAGE => reply_text
     }
-    svr_batch = server_call.run_batch(@server_queue, @server_tag, deadline,
-                                      server_ops)
+    svr_batch = server_call.run_batch(server_ops)
     expect(svr_batch.message).to eq(sent_message)
     expect(svr_batch.send_message).to be true
   end
@@ -147,8 +139,7 @@ shared_examples 'basic GRPC message delivery is OK' do
       CallOps::SEND_INITIAL_METADATA => {},
       CallOps::SEND_MESSAGE => sent_message
     }
-    batch_result = call.run_batch(@client_queue, @client_tag, deadline,
-                                  client_ops)
+    batch_result = call.run_batch(client_ops)
     expect(batch_result.send_metadata).to be true
     expect(batch_result.send_message).to be true
 
@@ -158,8 +149,7 @@ shared_examples 'basic GRPC message delivery is OK' do
     server_ops = {
       CallOps::SEND_STATUS_FROM_SERVER => the_status
     }
-    svr_batch = server_call.run_batch(@server_queue, @server_tag, deadline,
-                                      server_ops)
+    svr_batch = server_call.run_batch(server_ops)
     expect(svr_batch.message).to eq nil
     expect(svr_batch.send_status).to be true
   end
@@ -176,8 +166,7 @@ shared_examples 'basic GRPC message delivery is OK' do
       CallOps::SEND_INITIAL_METADATA => {},
       CallOps::SEND_MESSAGE => sent_message
     }
-    batch_result = call.run_batch(@client_queue, @client_tag, deadline,
-                                  client_ops)
+    batch_result = call.run_batch(client_ops)
     expect(batch_result.send_metadata).to be true
     expect(batch_result.send_message).to be true
 
@@ -189,8 +178,7 @@ shared_examples 'basic GRPC message delivery is OK' do
       CallOps::SEND_MESSAGE => reply_text,
       CallOps::SEND_STATUS_FROM_SERVER => the_status
     }
-    svr_batch = server_call.run_batch(@server_queue, @server_tag, deadline,
-                                      server_ops)
+    svr_batch = server_call.run_batch(server_ops)
     expect(svr_batch.message).to eq sent_message
     expect(svr_batch.send_status).to be true
     expect(svr_batch.send_message).to be true
@@ -202,8 +190,7 @@ shared_examples 'basic GRPC message delivery is OK' do
       CallOps::RECV_MESSAGE => nil,
       CallOps::RECV_STATUS_ON_CLIENT => nil
     }
-    batch_result = call.run_batch(@client_queue, @client_tag, deadline,
-                                  client_ops)
+    batch_result = call.run_batch(client_ops)
     expect(batch_result.send_close).to be true
     expect(batch_result.message).to eq reply_text
     expect(batch_result.status).to eq the_status
@@ -212,8 +199,7 @@ shared_examples 'basic GRPC message delivery is OK' do
     server_ops = {
       CallOps::RECV_CLOSE_ON_SERVER => nil
     }
-    svr_batch = server_call.run_batch(@server_queue, @server_tag, deadline,
-                                      server_ops)
+    svr_batch = server_call.run_batch(server_ops)
     expect(svr_batch.send_close).to be true
   end
 end
@@ -244,8 +230,7 @@ shared_examples 'GRPC metadata delivery works OK' do
           CallOps::SEND_INITIAL_METADATA => md
         }
         blk = proc do
-          call.run_batch(@client_queue, @client_tag, deadline,
-                         client_ops)
+          call.run_batch(client_ops)
         end
         expect(&blk).to raise_error
       end
@@ -255,15 +240,14 @@ shared_examples 'GRPC metadata delivery works OK' do
       @valid_metadata.each do |md|
         recvd_rpc = nil
         rcv_thread = Thread.new do
-          recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline)
+          recvd_rpc = @server.request_call
         end
 
         call = new_client_call
         client_ops = {
           CallOps::SEND_INITIAL_METADATA => md
         }
-        batch_result = call.run_batch(@client_queue, @client_tag, deadline,
-                                      client_ops)
+        batch_result = call.run_batch(client_ops)
         expect(batch_result.send_metadata).to be true
 
         # confirm the server can receive the client metadata
@@ -296,7 +280,7 @@ shared_examples 'GRPC metadata delivery works OK' do
       @bad_keys.each do |md|
         recvd_rpc = nil
         rcv_thread = Thread.new do
-          recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline)
+          recvd_rpc = @server.request_call
         end
 
         call = new_client_call
@@ -305,7 +289,7 @@ shared_examples 'GRPC metadata delivery works OK' do
         client_ops = {
           CallOps::SEND_INITIAL_METADATA => nil
         }
-        call.run_batch(@client_queue, @client_tag, deadline, client_ops)
+        call.run_batch(client_ops)
 
         # server gets the invocation
         rcv_thread.join
@@ -314,8 +298,7 @@ shared_examples 'GRPC metadata delivery works OK' do
           CallOps::SEND_INITIAL_METADATA => md
         }
         blk = proc do
-          recvd_rpc.call.run_batch(@server_queue, @server_tag, deadline,
-                                   server_ops)
+          recvd_rpc.call.run_batch(server_ops)
         end
         expect(&blk).to raise_error
       end
@@ -324,7 +307,7 @@ shared_examples 'GRPC metadata delivery works OK' do
     it 'sends an empty hash if no metadata is added' do
       recvd_rpc = nil
       rcv_thread = Thread.new do
-        recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline)
+        recvd_rpc = @server.request_call
       end
 
       call = new_client_call
@@ -333,7 +316,7 @@ shared_examples 'GRPC metadata delivery works OK' do
       client_ops = {
         CallOps::SEND_INITIAL_METADATA => nil
       }
-      call.run_batch(@client_queue, @client_tag, deadline, client_ops)
+      call.run_batch(client_ops)
 
       # server gets the invocation but sends no metadata back
       rcv_thread.join
@@ -342,14 +325,13 @@ shared_examples 'GRPC metadata delivery works OK' do
       server_ops = {
         CallOps::SEND_INITIAL_METADATA => nil
       }
-      server_call.run_batch(@server_queue, @server_tag, deadline, server_ops)
+      server_call.run_batch(server_ops)
 
       # client receives nothing as expected
       client_ops = {
         CallOps::RECV_INITIAL_METADATA => nil
       }
-      batch_result = call.run_batch(@client_queue, @client_tag, deadline,
-                                    client_ops)
+      batch_result = call.run_batch(client_ops)
       expect(batch_result.metadata).to eq({})
     end
 
@@ -357,7 +339,7 @@ shared_examples 'GRPC metadata delivery works OK' do
       @valid_metadata.each do |md|
         recvd_rpc = nil
         rcv_thread = Thread.new do
-          recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline)
+          recvd_rpc = @server.request_call
         end
 
         call = new_client_call
@@ -366,7 +348,7 @@ shared_examples 'GRPC metadata delivery works OK' do
         client_ops = {
           CallOps::SEND_INITIAL_METADATA => nil
         }
-        call.run_batch(@client_queue, @client_tag, deadline, client_ops)
+        call.run_batch(client_ops)
 
         # server gets the invocation but sends no metadata back
         rcv_thread.join
@@ -375,14 +357,13 @@ shared_examples 'GRPC metadata delivery works OK' do
         server_ops = {
           CallOps::SEND_INITIAL_METADATA => md
         }
-        server_call.run_batch(@server_queue, @server_tag, deadline, server_ops)
+        server_call.run_batch(server_ops)
 
         # client receives nothing as expected
         client_ops = {
           CallOps::RECV_INITIAL_METADATA => nil
         }
-        batch_result = call.run_batch(@client_queue, @client_tag, deadline,
-                                      client_ops)
+        batch_result = call.run_batch(client_ops)
         replace_symbols = Hash[md.each_pair.collect { |x, y| [x.to_s, y] }]
         expect(batch_result.metadata).to eq(replace_symbols)
       end
@@ -393,9 +374,7 @@ end
 describe 'the http client/server' do
   before(:example) do
     server_host = '0.0.0.0:0'
-    @client_queue = GRPC::Core::CompletionQueue.new
-    @server_queue = GRPC::Core::CompletionQueue.new
-    @server = GRPC::Core::Server.new(@server_queue, nil)
+    @server = GRPC::Core::Server.new(nil)
     server_port = @server.add_http2_port(server_host, :this_port_is_insecure)
     @server.start
     @ch = Channel.new("0.0.0.0:#{server_port}", nil, :this_channel_is_insecure)
@@ -403,7 +382,7 @@ describe 'the http client/server' do
 
   after(:example) do
     @ch.close
-    @server.close(@server_queue, deadline)
+    @server.close(deadline)
   end
 
   it_behaves_like 'basic GRPC message delivery is OK' do
@@ -425,11 +404,9 @@ describe 'the secure http client/server' do
   before(:example) do
     certs = load_test_certs
     server_host = '0.0.0.0:0'
-    @client_queue = GRPC::Core::CompletionQueue.new
-    @server_queue = GRPC::Core::CompletionQueue.new
     server_creds = GRPC::Core::ServerCredentials.new(
       nil, [{ private_key: certs[1], cert_chain: certs[2] }], false)
-    @server = GRPC::Core::Server.new(@server_queue, nil)
+    @server = GRPC::Core::Server.new(nil)
     server_port = @server.add_http2_port(server_host, server_creds)
     @server.start
     args = { Channel::SSL_TARGET => 'foo.test.google.fr' }
@@ -438,7 +415,7 @@ describe 'the secure http client/server' do
   end
 
   after(:example) do
-    @server.close(@server_queue, deadline)
+    @server.close(deadline)
   end
 
   it_behaves_like 'basic GRPC message delivery is OK' do
@@ -454,7 +431,7 @@ describe 'the secure http client/server' do
     expected_md = { 'k1' => 'updated-v1', 'k2' => 'v2' }
     recvd_rpc = nil
     rcv_thread = Thread.new do
-      recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline)
+      recvd_rpc = @server.request_call
     end
 
     call = new_client_call
@@ -462,8 +439,7 @@ describe 'the secure http client/server' do
     client_ops = {
       CallOps::SEND_INITIAL_METADATA => md
     }
-    batch_result = call.run_batch(@client_queue, @client_tag, deadline,
-                                  client_ops)
+    batch_result = call.run_batch(client_ops)
     expect(batch_result.send_metadata).to be true
 
     # confirm the server can receive the client metadata
diff --git a/src/ruby/spec/completion_queue_spec.rb b/src/ruby/spec/completion_queue_spec.rb
deleted file mode 100644
index 886a7f263b..0000000000
--- a/src/ruby/spec/completion_queue_spec.rb
+++ /dev/null
@@ -1,42 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-#     * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-#     * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-#     * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-require 'grpc'
-
-describe GRPC::Core::CompletionQueue do
-  before(:example) do
-    @cq = GRPC::Core::CompletionQueue.new
-  end
-
-  describe '#new' do
-    it 'is constructed successufully' do
-      expect { GRPC::Core::CompletionQueue.new }.not_to raise_error
-    end
-  end
-end
diff --git a/src/ruby/spec/generic/active_call_spec.rb b/src/ruby/spec/generic/active_call_spec.rb
index d9c9780c93..b4e6f9ee02 100644
--- a/src/ruby/spec/generic/active_call_spec.rb
+++ b/src/ruby/spec/generic/active_call_spec.rb
@@ -39,13 +39,8 @@ describe GRPC::ActiveCall do
 
   before(:each) do
     @pass_through = proc { |x| x }
-    @server_tag = Object.new
-    @tag = Object.new
-
-    @client_queue = GRPC::Core::CompletionQueue.new
-    @server_queue = GRPC::Core::CompletionQueue.new
     host = '0.0.0.0:0'
-    @server = GRPC::Core::Server.new(@server_queue, nil)
+    @server = GRPC::Core::Server.new(nil)
     server_port = @server.add_http2_port(host, :this_port_is_insecure)
     @server.start
     @ch = GRPC::Core::Channel.new("0.0.0.0:#{server_port}", nil,
@@ -53,16 +48,15 @@ describe GRPC::ActiveCall do
   end
 
   after(:each) do
-    @server.close(@server_queue, deadline)
+    @server.close(deadline)
   end
 
   describe 'restricted view methods' do
     before(:each) do
       call = make_test_call
-      md_tag = ActiveCall.client_invoke(call, @client_queue)
-      @client_call = ActiveCall.new(call, @client_queue, @pass_through,
-                                    @pass_through, deadline,
-                                    metadata_tag: md_tag)
+      ActiveCall.client_invoke(call)
+      @client_call = ActiveCall.new(call, @pass_through,
+                                    @pass_through, deadline)
     end
 
     describe '#multi_req_view' do
@@ -89,46 +83,42 @@ describe GRPC::ActiveCall do
   describe '#remote_send' do
     it 'allows a client to send a payload to the server' do
       call = make_test_call
-      md_tag = ActiveCall.client_invoke(call, @client_queue)
-      @client_call = ActiveCall.new(call, @client_queue, @pass_through,
-                                    @pass_through, deadline,
-                                    metadata_tag: md_tag)
+      ActiveCall.client_invoke(call)
+      @client_call = ActiveCall.new(call, @pass_through,
+                                    @pass_through, deadline)
       msg = 'message is a string'
       @client_call.remote_send(msg)
 
       # check that server rpc new was received
-      recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline)
+      recvd_rpc = @server.request_call
       expect(recvd_rpc).to_not eq nil
       recvd_call = recvd_rpc.call
 
       # Accept the call, and verify that the server reads the response ok.
-      server_ops = {
-        CallOps::SEND_INITIAL_METADATA => {}
-      }
-      recvd_call.run_batch(@server_queue, @server_tag, deadline, server_ops)
-      server_call = ActiveCall.new(recvd_call, @server_queue, @pass_through,
-                                   @pass_through, deadline)
+      server_call = ActiveCall.new(recvd_call, @pass_through,
+                                   @pass_through, deadline,
+                                   metadata_received: true)
       expect(server_call.remote_read).to eq(msg)
     end
 
     it 'marshals the payload using the marshal func' do
       call = make_test_call
-      ActiveCall.client_invoke(call, @client_queue)
+      ActiveCall.client_invoke(call)
       marshal = proc { |x| 'marshalled:' + x }
-      client_call = ActiveCall.new(call, @client_queue, marshal,
-                                   @pass_through, deadline)
+      client_call = ActiveCall.new(call, marshal, @pass_through, deadline)
       msg = 'message is a string'
       client_call.remote_send(msg)
 
       # confirm that the message was marshalled
-      recvd_rpc =  @server.request_call(@server_queue, @server_tag, deadline)
+      recvd_rpc =  @server.request_call
       recvd_call = recvd_rpc.call
       server_ops = {
         CallOps::SEND_INITIAL_METADATA => nil
       }
-      recvd_call.run_batch(@server_queue, @server_tag, deadline, server_ops)
-      server_call = ActiveCall.new(recvd_call, @server_queue, @pass_through,
-                                   @pass_through, deadline)
+      recvd_call.run_batch(server_ops)
+      server_call = ActiveCall.new(recvd_call, @pass_through,
+                                   @pass_through, deadline,
+                                   metadata_received: true)
       expect(server_call.remote_read).to eq('marshalled:' + msg)
     end
 
@@ -136,23 +126,24 @@ describe GRPC::ActiveCall do
     TEST_WRITE_FLAGS.each do |f|
       it "successfully makes calls with write_flag set to #{f}" do
         call = make_test_call
-        ActiveCall.client_invoke(call, @client_queue)
+        ActiveCall.client_invoke(call)
         marshal = proc { |x| 'marshalled:' + x }
-        client_call = ActiveCall.new(call, @client_queue, marshal,
+        client_call = ActiveCall.new(call, marshal,
                                      @pass_through, deadline)
         msg = 'message is a string'
         client_call.write_flag = f
         client_call.remote_send(msg)
 
         # confirm that the message was marshalled
-        recvd_rpc =  @server.request_call(@server_queue, @server_tag, deadline)
+        recvd_rpc =  @server.request_call
         recvd_call = recvd_rpc.call
         server_ops = {
           CallOps::SEND_INITIAL_METADATA => nil
         }
-        recvd_call.run_batch(@server_queue, @server_tag, deadline, server_ops)
-        server_call = ActiveCall.new(recvd_call, @server_queue, @pass_through,
-                                     @pass_through, deadline)
+        recvd_call.run_batch(server_ops)
+        server_call = ActiveCall.new(recvd_call, @pass_through,
+                                     @pass_through, deadline,
+                                     metadata_received: true)
         expect(server_call.remote_read).to eq('marshalled:' + msg)
       end
     end
@@ -162,8 +153,8 @@ describe GRPC::ActiveCall do
     it 'sends metadata to the server when present' do
       call = make_test_call
       metadata = { k1: 'v1', k2: 'v2' }
-      ActiveCall.client_invoke(call, @client_queue, metadata)
-      recvd_rpc =  @server.request_call(@server_queue, @server_tag, deadline)
+      ActiveCall.client_invoke(call, metadata)
+      recvd_rpc =  @server.request_call
       recvd_call = recvd_rpc.call
       expect(recvd_call).to_not be_nil
       expect(recvd_rpc.metadata).to_not be_nil
@@ -175,10 +166,9 @@ describe GRPC::ActiveCall do
   describe '#remote_read' do
     it 'reads the response sent by a server' do
       call = make_test_call
-      md_tag = ActiveCall.client_invoke(call, @client_queue)
-      client_call = ActiveCall.new(call, @client_queue, @pass_through,
-                                   @pass_through, deadline,
-                                   metadata_tag: md_tag)
+      ActiveCall.client_invoke(call)
+      client_call = ActiveCall.new(call, @pass_through,
+                                   @pass_through, deadline)
       msg = 'message is a string'
       client_call.remote_send(msg)
       server_call = expect_server_to_receive(msg)
@@ -188,10 +178,9 @@ describe GRPC::ActiveCall do
 
     it 'saves no metadata when the server adds no metadata' do
       call = make_test_call
-      md_tag = ActiveCall.client_invoke(call, @client_queue)
-      client_call = ActiveCall.new(call, @client_queue, @pass_through,
-                                   @pass_through, deadline,
-                                   metadata_tag: md_tag)
+      ActiveCall.client_invoke(call)
+      client_call = ActiveCall.new(call, @pass_through,
+                                   @pass_through, deadline)
       msg = 'message is a string'
       client_call.remote_send(msg)
       server_call = expect_server_to_receive(msg)
@@ -203,10 +192,9 @@ describe GRPC::ActiveCall do
 
     it 'saves metadata add by the server' do
       call = make_test_call
-      md_tag = ActiveCall.client_invoke(call, @client_queue)
-      client_call = ActiveCall.new(call, @client_queue, @pass_through,
-                                   @pass_through, deadline,
-                                   metadata_tag: md_tag)
+      ActiveCall.client_invoke(call)
+      client_call = ActiveCall.new(call, @pass_through,
+                                   @pass_through, deadline)
       msg = 'message is a string'
       client_call.remote_send(msg)
       server_call = expect_server_to_receive(msg, k1: 'v1', k2: 'v2')
@@ -219,10 +207,9 @@ describe GRPC::ActiveCall do
 
     it 'get a nil msg before a status when an OK status is sent' do
       call = make_test_call
-      md_tag = ActiveCall.client_invoke(call, @client_queue)
-      client_call = ActiveCall.new(call, @client_queue, @pass_through,
-                                   @pass_through, deadline,
-                                   metadata_tag: md_tag)
+      ActiveCall.client_invoke(call)
+      client_call = ActiveCall.new(call, @pass_through,
+                                   @pass_through, deadline)
       msg = 'message is a string'
       client_call.remote_send(msg)
       client_call.writes_done(false)
@@ -236,11 +223,10 @@ describe GRPC::ActiveCall do
 
     it 'unmarshals the response using the unmarshal func' do
       call = make_test_call
-      md_tag = ActiveCall.client_invoke(call, @client_queue)
+      ActiveCall.client_invoke(call)
       unmarshal = proc { |x| 'unmarshalled:' + x }
-      client_call = ActiveCall.new(call, @client_queue, @pass_through,
-                                   unmarshal, deadline,
-                                   metadata_tag: md_tag)
+      client_call = ActiveCall.new(call, @pass_through,
+                                   unmarshal, deadline)
 
       # confirm the client receives the unmarshalled message
       msg = 'message is a string'
@@ -254,17 +240,16 @@ describe GRPC::ActiveCall do
   describe '#each_remote_read' do
     it 'creates an Enumerator' do
       call = make_test_call
-      client_call = ActiveCall.new(call, @client_queue, @pass_through,
+      client_call = ActiveCall.new(call, @pass_through,
                                    @pass_through, deadline)
       expect(client_call.each_remote_read).to be_a(Enumerator)
     end
 
     it 'the returns an enumerator that can read n responses' do
       call = make_test_call
-      md_tag = ActiveCall.client_invoke(call, @client_queue)
-      client_call = ActiveCall.new(call, @client_queue, @pass_through,
-                                   @pass_through, deadline,
-                                   metadata_tag: md_tag)
+      ActiveCall.client_invoke(call)
+      client_call = ActiveCall.new(call, @pass_through,
+                                   @pass_through, deadline)
       msg = 'message is a string'
       reply = 'server_response'
       client_call.remote_send(msg)
@@ -279,10 +264,9 @@ describe GRPC::ActiveCall do
 
     it 'the returns an enumerator that stops after an OK Status' do
       call = make_test_call
-      md_tag = ActiveCall.client_invoke(call, @client_queue)
-      client_call = ActiveCall.new(call, @client_queue, @pass_through,
-                                   @pass_through, deadline,
-                                   metadata_tag: md_tag)
+      ActiveCall.client_invoke(call)
+      client_call = ActiveCall.new(call, @pass_through,
+                                   @pass_through, deadline)
       msg = 'message is a string'
       reply = 'server_response'
       client_call.remote_send(msg)
@@ -302,10 +286,9 @@ describe GRPC::ActiveCall do
   describe '#writes_done' do
     it 'finishes ok if the server sends a status response' do
       call = make_test_call
-      md_tag = ActiveCall.client_invoke(call, @client_queue)
-      client_call = ActiveCall.new(call, @client_queue, @pass_through,
-                                   @pass_through, deadline,
-                                   metadata_tag: md_tag)
+      ActiveCall.client_invoke(call)
+      client_call = ActiveCall.new(call, @pass_through,
+                                   @pass_through, deadline)
       msg = 'message is a string'
       client_call.remote_send(msg)
       expect { client_call.writes_done(false) }.to_not raise_error
@@ -318,10 +301,9 @@ describe GRPC::ActiveCall do
 
     it 'finishes ok if the server sends an early status response' do
       call = make_test_call
-      md_tag = ActiveCall.client_invoke(call, @client_queue)
-      client_call = ActiveCall.new(call, @client_queue, @pass_through,
-                                   @pass_through, deadline,
-                                   metadata_tag: md_tag)
+      ActiveCall.client_invoke(call)
+      client_call = ActiveCall.new(call, @pass_through,
+                                   @pass_through, deadline)
       msg = 'message is a string'
       client_call.remote_send(msg)
       server_call = expect_server_to_receive(msg)
@@ -334,10 +316,9 @@ describe GRPC::ActiveCall do
 
     it 'finishes ok if writes_done is true' do
       call = make_test_call
-      md_tag = ActiveCall.client_invoke(call, @client_queue)
-      client_call = ActiveCall.new(call, @client_queue, @pass_through,
-                                   @pass_through, deadline,
-                                   metadata_tag: md_tag)
+      ActiveCall.client_invoke(call)
+      client_call = ActiveCall.new(call, @pass_through,
+                                   @pass_through, deadline)
       msg = 'message is a string'
       client_call.remote_send(msg)
       server_call = expect_server_to_receive(msg)
@@ -355,17 +336,16 @@ describe GRPC::ActiveCall do
   end
 
   def expect_server_to_be_invoked(**kw)
-    recvd_rpc =  @server.request_call(@server_queue, @server_tag, deadline)
+    recvd_rpc =  @server.request_call
     expect(recvd_rpc).to_not eq nil
     recvd_call = recvd_rpc.call
-    recvd_call.run_batch(@server_queue, @server_tag, deadline,
-                         CallOps::SEND_INITIAL_METADATA => kw)
-    ActiveCall.new(recvd_call, @server_queue, @pass_through,
-                   @pass_through, deadline)
+    recvd_call.run_batch(CallOps::SEND_INITIAL_METADATA => kw)
+    ActiveCall.new(recvd_call, @pass_through, @pass_through, deadline,
+                   metadata_received: true, started: true)
   end
 
   def make_test_call
-    @ch.create_call(@client_queue, nil, nil, '/method', nil, deadline)
+    @ch.create_call(nil, nil, '/method', nil, deadline)
   end
 
   def deadline
diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb
index 168e7fb791..6034b5419c 100644
--- a/src/ruby/spec/generic/client_stub_spec.rb
+++ b/src/ruby/spec/generic/client_stub_spec.rb
@@ -29,11 +29,14 @@
 
 require 'grpc'
 
+Thread.abort_on_exception = true
+
 def wakey_thread(&blk)
   n = GRPC::Notifier.new
   t = Thread.new do
     blk.call(n)
   end
+  t.abort_on_exception = true
   n.wait
   t
 end
@@ -54,15 +57,13 @@ describe 'ClientStub' do
   before(:each) do
     Thread.abort_on_exception = true
     @server = nil
-    @server_queue = nil
     @method = 'an_rpc_method'
     @pass = OK
     @fail = INTERNAL
-    @cq = GRPC::Core::CompletionQueue.new
   end
 
   after(:each) do
-    @server.close(@server_queue) unless @server_queue.nil?
+    @server.close(from_relative_time(2)) unless @server.nil?
   end
 
   describe '#new' do
@@ -70,7 +71,7 @@ describe 'ClientStub' do
     it 'can be created from a host and args' do
       opts = { channel_args: { a_channel_arg: 'an_arg' } }
       blk = proc do
-        GRPC::ClientStub.new(fake_host, @cq, :this_channel_is_insecure, **opts)
+        GRPC::ClientStub.new(fake_host, :this_channel_is_insecure, **opts)
       end
       expect(&blk).not_to raise_error
     end
@@ -81,7 +82,7 @@ describe 'ClientStub' do
         channel_override: @ch
       }
       blk = proc do
-        GRPC::ClientStub.new(fake_host, @cq, :this_channel_is_insecure, **opts)
+        GRPC::ClientStub.new(fake_host, :this_channel_is_insecure, **opts)
       end
       expect(&blk).not_to raise_error
     end
@@ -92,7 +93,7 @@ describe 'ClientStub' do
           channel_args: { a_channel_arg: 'an_arg' },
           channel_override: Object.new
         }
-        GRPC::ClientStub.new(fake_host, @cq, :this_channel_is_insecure, **opts)
+        GRPC::ClientStub.new(fake_host, :this_channel_is_insecure, **opts)
       end
       expect(&blk).to raise_error
     end
@@ -100,7 +101,7 @@ describe 'ClientStub' do
     it 'cannot be created with bad credentials' do
       blk = proc do
         opts = { channel_args: { a_channel_arg: 'an_arg' } }
-        GRPC::ClientStub.new(fake_host, @cq, Object.new, **opts)
+        GRPC::ClientStub.new(fake_host, Object.new, **opts)
       end
       expect(&blk).to raise_error
     end
@@ -115,7 +116,7 @@ describe 'ClientStub' do
           }
         }
         creds = GRPC::Core::ChannelCredentials.new(certs[0], nil, nil)
-        GRPC::ClientStub.new(fake_host, @cq, creds,  **opts)
+        GRPC::ClientStub.new(fake_host, creds,  **opts)
       end
       expect(&blk).to_not raise_error
     end
@@ -130,7 +131,7 @@ describe 'ClientStub' do
       it 'should send a request to/receive a reply from a server' do
         server_port = create_test_server
         th = run_request_response(@sent_msg, @resp, @pass)
-        stub = GRPC::ClientStub.new("localhost:#{server_port}", @cq,
+        stub = GRPC::ClientStub.new("localhost:#{server_port}",
                                     :this_channel_is_insecure)
         expect(get_response(stub)).to eq(@resp)
         th.join
@@ -141,7 +142,7 @@ describe 'ClientStub' do
         host = "localhost:#{server_port}"
         th = run_request_response(@sent_msg, @resp, @pass,
                                   k1: 'v1', k2: 'v2')
-        stub = GRPC::ClientStub.new(host, @cq, :this_channel_is_insecure)
+        stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
         expect(get_response(stub)).to eq(@resp)
         th.join
       end
@@ -151,7 +152,7 @@ describe 'ClientStub' do
         alt_host = "localhost:#{server_port}"
         th = run_request_response(@sent_msg, @resp, @pass)
         ch = GRPC::Core::Channel.new(alt_host, nil, :this_channel_is_insecure)
-        stub = GRPC::ClientStub.new('ignored-host', @cq,
+        stub = GRPC::ClientStub.new('ignored-host',
                                     :this_channel_is_insecure,
                                     channel_override: ch)
         expect(get_response(stub)).to eq(@resp)
@@ -162,7 +163,7 @@ describe 'ClientStub' do
         server_port = create_test_server
         host = "localhost:#{server_port}"
         th = run_request_response(@sent_msg, @resp, @fail)
-        stub = GRPC::ClientStub.new(host, @cq, :this_channel_is_insecure)
+        stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
         blk = proc { get_response(stub) }
         expect(&blk).to raise_error(GRPC::BadStatus)
         th.join
@@ -182,7 +183,8 @@ describe 'ClientStub' do
       def get_response(stub)
         op = stub.request_response(@method, @sent_msg, noop, noop,
                                    return_op: true,
-                                   metadata: { k1: 'v1', k2: 'v2' })
+                                   metadata: { k1: 'v1', k2: 'v2' },
+                                   deadline: from_relative_time(2))
         expect(op).to be_a(GRPC::ActiveCall::Operation)
         op.execute
       end
@@ -196,7 +198,7 @@ describe 'ClientStub' do
       before(:each) do
         server_port = create_test_server
         host = "localhost:#{server_port}"
-        @stub = GRPC::ClientStub.new(host, @cq, :this_channel_is_insecure)
+        @stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
         @metadata = { k1: 'v1', k2: 'v2' }
         @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s }
         @resp = 'a_reply'
@@ -262,7 +264,7 @@ describe 'ClientStub' do
         server_port = create_test_server
         host = "localhost:#{server_port}"
         th = run_server_streamer(@sent_msg, @replys, @pass)
-        stub = GRPC::ClientStub.new(host, @cq, :this_channel_is_insecure)
+        stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
         expect(get_responses(stub).collect { |r| r }).to eq(@replys)
         th.join
       end
@@ -271,7 +273,7 @@ describe 'ClientStub' do
         server_port = create_test_server
         host = "localhost:#{server_port}"
         th = run_server_streamer(@sent_msg, @replys, @fail)
-        stub = GRPC::ClientStub.new(host, @cq, :this_channel_is_insecure)
+        stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
         e = get_responses(stub)
         expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)
         th.join
@@ -282,7 +284,7 @@ describe 'ClientStub' do
         host = "localhost:#{server_port}"
         th = run_server_streamer(@sent_msg, @replys, @fail,
                                  k1: 'v1', k2: 'v2')
-        stub = GRPC::ClientStub.new(host, @cq, :this_channel_is_insecure)
+        stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
         e = get_responses(stub)
         expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)
         th.join
@@ -327,7 +329,7 @@ describe 'ClientStub' do
       it 'supports sending all the requests first', bidi: true do
         th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys,
                                                    @pass)
-        stub = GRPC::ClientStub.new(@host, @cq, :this_channel_is_insecure)
+        stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
         e = get_responses(stub)
         expect(e.collect { |r| r }).to eq(@replys)
         th.join
@@ -335,7 +337,7 @@ describe 'ClientStub' do
 
       it 'supports client-initiated ping pong', bidi: true do
         th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true)
-        stub = GRPC::ClientStub.new(@host, @cq, :this_channel_is_insecure)
+        stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
         e = get_responses(stub)
         expect(e.collect { |r| r }).to eq(@sent_msgs)
         th.join
@@ -343,7 +345,7 @@ describe 'ClientStub' do
 
       it 'supports a server-initiated ping pong', bidi: true do
         th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, false)
-        stub = GRPC::ClientStub.new(@host, @cq, :this_channel_is_insecure)
+        stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
         e = get_responses(stub)
         expect(e.collect { |r| r }).to eq(@sent_msgs)
         th.join
@@ -372,26 +374,6 @@ describe 'ClientStub' do
 
       it_behaves_like 'bidi streaming'
     end
-
-    describe 'without enough time to run' do
-      before(:each) do
-        @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s }
-        @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s }
-        server_port = create_test_server
-        @host = "localhost:#{server_port}"
-      end
-
-      it 'should fail with DeadlineExceeded', bidi: true do
-        @server.start
-        stub = GRPC::ClientStub.new(@host, @cq, :this_channel_is_insecure)
-        blk = proc do
-          e = stub.bidi_streamer(@method, @sent_msgs, noop, noop,
-                                 deadline: from_relative_time(0.001))
-          e.collect { |r| r }
-        end
-        expect(&blk).to raise_error GRPC::BadStatus, /Deadline Exceeded/
-      end
-    end
   end
 
   def run_server_streamer(expected_input, replys, status, **kw)
@@ -460,21 +442,18 @@ describe 'ClientStub' do
   end
 
   def create_test_server
-    @server_queue = GRPC::Core::CompletionQueue.new
-    @server = GRPC::Core::Server.new(@server_queue, nil)
+    @server = GRPC::Core::Server.new(nil)
     @server.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
   end
 
   def expect_server_to_be_invoked(notifier)
     @server.start
     notifier.notify(nil)
-    server_tag = Object.new
-    recvd_rpc = @server.request_call(@server_queue, server_tag,
-                                     INFINITE_FUTURE)
+    recvd_rpc = @server.request_call
     recvd_call = recvd_rpc.call
     recvd_call.metadata = recvd_rpc.metadata
-    recvd_call.run_batch(@server_queue, server_tag, Time.now + 2,
-                         SEND_INITIAL_METADATA => nil)
-    GRPC::ActiveCall.new(recvd_call, @server_queue, noop, noop, INFINITE_FUTURE)
+    recvd_call.run_batch(SEND_INITIAL_METADATA => nil)
+    GRPC::ActiveCall.new(recvd_call, noop, noop, INFINITE_FUTURE,
+                         metadata_received: true)
   end
 end
diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb
index 943502cea2..901c84fc78 100644
--- a/src/ruby/spec/generic/rpc_server_spec.rb
+++ b/src/ruby/spec/generic/rpc_server_spec.rb
@@ -135,8 +135,6 @@ describe GRPC::RpcServer do
     @pass = 0
     @fail = 1
     @noop = proc { |x| x }
-
-    @server_queue = GRPC::Core::CompletionQueue.new
   end
 
   describe '#new' do
@@ -148,28 +146,6 @@ describe GRPC::RpcServer do
       expect(&blk).not_to raise_error
     end
 
-    it 'can be created with a completion queue override' do
-      opts = {
-        server_args: { a_channel_arg: 'an_arg' },
-        completion_queue_override: @server_queue
-      }
-      blk = proc do
-        RpcServer.new(**opts)
-      end
-      expect(&blk).not_to raise_error
-    end
-
-    it 'cannot be created with a bad completion queue override' do
-      blk = proc do
-        opts = {
-          server_args: { a_channel_arg: 'an_arg' },
-          completion_queue_override: Object.new
-        }
-        RpcServer.new(**opts)
-      end
-      expect(&blk).to raise_error
-    end
-
     it 'cannot be created with invalid ServerCredentials' do
       blk = proc do
         opts = {
@@ -294,7 +270,6 @@ describe GRPC::RpcServer do
     context 'with no connect_metadata' do
       before(:each) do
         server_opts = {
-          completion_queue_override: @server_queue,
           poll_period: 1
         }
         @srv = RpcServer.new(**server_opts)
@@ -309,8 +284,7 @@ describe GRPC::RpcServer do
         @srv.wait_till_running
         req = EchoMsg.new
         blk = proc do
-          cq = GRPC::Core::CompletionQueue.new
-          stub = GRPC::ClientStub.new(@host, cq, :this_channel_is_insecure,
+          stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure,
                                       **client_opts)
           stub.request_response('/unknown', req, marshal, unmarshal)
         end
@@ -325,8 +299,7 @@ describe GRPC::RpcServer do
         @srv.wait_till_running
         req = EchoMsg.new
         blk = proc do
-          cq = GRPC::Core::CompletionQueue.new
-          stub = GRPC::ClientStub.new(@host, cq, :this_channel_is_insecure,
+          stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure,
                                       **client_opts)
           stub.request_response('/an_rpc', req, marshal, unmarshal)
         end
@@ -422,7 +395,6 @@ describe GRPC::RpcServer do
       it 'should return RESOURCE_EXHAUSTED on too many jobs', server: true do
         opts = {
           server_args: { a_channel_arg: 'an_arg' },
-          completion_queue_override: @server_queue,
           pool_size: 1,
           poll_period: 1,
           max_waiting_requests: 0
@@ -466,7 +438,6 @@ describe GRPC::RpcServer do
       end
       before(:each) do
         server_opts = {
-          completion_queue_override: @server_queue,
           poll_period: 1,
           connect_md_proc: test_md_proc
         }
@@ -502,7 +473,6 @@ describe GRPC::RpcServer do
     context 'with trailing metadata' do
       before(:each) do
         server_opts = {
-          completion_queue_override: @server_queue,
           poll_period: 1
         }
         @srv = RpcServer.new(**server_opts)
diff --git a/src/ruby/spec/pb/health/checker_spec.rb b/src/ruby/spec/pb/health/checker_spec.rb
index f3d121a31e..de11c9fedf 100644
--- a/src/ruby/spec/pb/health/checker_spec.rb
+++ b/src/ruby/spec/pb/health/checker_spec.rb
@@ -168,11 +168,9 @@ describe Grpc::Health::Checker do
     CheckerStub = Grpc::Health::Checker.rpc_stub_class
 
     before(:each) do
-      @server_queue = GRPC::Core::CompletionQueue.new
       server_host = '0.0.0.0:0'
       @client_opts = { channel_override: @ch }
       server_opts = {
-        completion_queue_override: @server_queue,
         poll_period: 1
       }
       @srv = RpcServer.new(**server_opts)
diff --git a/src/ruby/spec/server_spec.rb b/src/ruby/spec/server_spec.rb
index 439b19fb8d..003d8f69d5 100644
--- a/src/ruby/spec/server_spec.rb
+++ b/src/ruby/spec/server_spec.rb
@@ -43,19 +43,15 @@ describe Server do
     GRPC::Core::ServerCredentials.new(*load_test_certs)
   end
 
-  before(:each) do
-    @cq = GRPC::Core::CompletionQueue.new
-  end
-
   describe '#start' do
     it 'runs without failing' do
-      blk = proc { Server.new(@cq, nil).start }
+      blk = proc { Server.new(nil).start }
       expect(&blk).to_not raise_error
     end
 
     it 'fails if the server is closed' do
-      s = Server.new(@cq, nil)
-      s.close(@cq)
+      s = Server.new(nil)
+      s.close
       expect { s.start }.to raise_error(RuntimeError)
     end
   end
@@ -63,19 +59,19 @@ describe Server do
   describe '#destroy' do
     it 'destroys a server ok' do
       s = start_a_server
-      blk = proc { s.destroy(@cq) }
+      blk = proc { s.destroy }
       expect(&blk).to_not raise_error
     end
 
     it 'can be called more than once without error' do
       s = start_a_server
       begin
-        blk = proc { s.destroy(@cq) }
+        blk = proc { s.destroy }
         expect(&blk).to_not raise_error
         blk.call
         expect(&blk).to_not raise_error
       ensure
-        s.close(@cq)
+        s.close
       end
     end
   end
@@ -84,7 +80,7 @@ describe Server do
     it 'closes a server ok' do
       s = start_a_server
       begin
-        blk = proc { s.close(@cq) }
+        blk = proc { s.close }
         expect(&blk).to_not raise_error
       ensure
         s.close(@cq)
@@ -93,7 +89,7 @@ describe Server do
 
     it 'can be called more than once without error' do
       s = start_a_server
-      blk = proc { s.close(@cq) }
+      blk = proc { s.close }
       expect(&blk).to_not raise_error
       blk.call
       expect(&blk).to_not raise_error
@@ -104,16 +100,16 @@ describe Server do
     describe 'for insecure servers' do
       it 'runs without failing' do
         blk = proc do
-          s = Server.new(@cq, nil)
+          s = Server.new(nil)
           s.add_http2_port('localhost:0', :this_port_is_insecure)
-          s.close(@cq)
+          s.close
         end
         expect(&blk).to_not raise_error
       end
 
       it 'fails if the server is closed' do
-        s = Server.new(@cq, nil)
-        s.close(@cq)
+        s = Server.new(nil)
+        s.close
         blk = proc do
           s.add_http2_port('localhost:0', :this_port_is_insecure)
         end
@@ -125,16 +121,16 @@ describe Server do
       let(:cert) { create_test_cert }
       it 'runs without failing' do
         blk = proc do
-          s = Server.new(@cq, nil)
+          s = Server.new(nil)
           s.add_http2_port('localhost:0', cert)
-          s.close(@cq)
+          s.close
         end
         expect(&blk).to_not raise_error
       end
 
       it 'fails if the server is closed' do
-        s = Server.new(@cq, nil)
-        s.close(@cq)
+        s = Server.new(nil)
+        s.close
         blk = proc { s.add_http2_port('localhost:0', cert) }
         expect(&blk).to raise_error(RuntimeError)
       end
@@ -142,8 +138,8 @@ describe Server do
   end
 
   shared_examples '#new' do
-    it 'takes a completion queue with nil channel args' do
-      expect { Server.new(@cq, nil) }.to_not raise_error
+    it 'takes nil channel args' do
+      expect { Server.new(nil) }.to_not raise_error
     end
 
     it 'does not take a hash with bad keys as channel args' do
@@ -194,14 +190,14 @@ describe Server do
 
   describe '#new with an insecure channel' do
     def construct_with_args(a)
-      proc { Server.new(@cq, a) }
+      proc { Server.new(a) }
     end
 
     it_behaves_like '#new'
   end
 
   def start_a_server
-    s = Server.new(@cq, nil)
+    s = Server.new(nil)
     s.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
     s.start
     s
-- 
GitLab