diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb
index 6256330e88e420a9d1c270b8760e0bdf7a9ffb45..489349c2c996fb4578c0fc0a13563b51d4613413 100644
--- a/src/ruby/lib/grpc/generic/active_call.rb
+++ b/src/ruby/lib/grpc/generic/active_call.rb
@@ -30,20 +30,14 @@
 require 'forwardable'
 require 'grpc/generic/bidi_call'
 
-def assert_event_type(ev, want)
-  fail OutOfTime if ev.nil?
-  got = ev.type
-  fail "Unexpected rpc event: got #{got}, want #{want}" unless got == want
-end
-
 # GRPC contains the General RPC module.
 module GRPC
   # The ActiveCall class provides simple methods for sending marshallable
   # data to a call
   class ActiveCall
-    include Core::CompletionType
     include Core::StatusCodes
     include Core::TimeConsts
+    include Core::CallOps
     attr_reader(:deadline)
 
     # client_invoke begins a client invocation.
@@ -61,15 +55,14 @@ module GRPC
     # @param q [CompletionQueue] the completion queue
     # @param deadline [Fixnum,TimeSpec] the deadline
     def self.client_invoke(call, q, _deadline, **kw)
-      fail(ArgumentError, 'not a call') unless call.is_a? Core::Call
+      fail(TypeError, '!Core::Call') unless call.is_a? Core::Call
       unless q.is_a? Core::CompletionQueue
-        fail(ArgumentError, 'not a CompletionQueue')
+        fail(TypeError, '!Core::CompletionQueue')
       end
-      call.add_metadata(kw) if kw.length > 0
-      client_metadata_read = Object.new
-      finished_tag = Object.new
-      call.invoke(q, client_metadata_read, finished_tag)
-      [finished_tag, client_metadata_read]
+      metadata_tag = Object.new
+      call.run_batch(q, metadata_tag, INFINITE_FUTURE,
+                     SEND_INITIAL_METADATA => kw)
+      metadata_tag
     end
 
     # Creates an ActiveCall.
@@ -91,25 +84,21 @@ module GRPC
     # @param marshal [Function] f(obj)->string that marshal requests
     # @param unmarshal [Function] f(string)->obj that unmarshals responses
     # @param deadline [Fixnum] the deadline for the call to complete
-    # @param finished_tag [Object] the object used as the call's finish tag,
-    #                              if the call has begun
-    # @param read_metadata_tag [Object] the object used as the call's finish
-    #                                   tag, if the call has begun
+    # @param metadata_tag [Object] the object use obtain metadata for clients
     # @param started [true|false] indicates if the call has begun
-    def initialize(call, q, marshal, unmarshal, deadline, finished_tag: nil,
-                   read_metadata_tag: nil, started: true)
-      fail(ArgumentError, 'not a call') unless call.is_a? Core::Call
+    def initialize(call, q, marshal, unmarshal, deadline, started: true,
+                   metadata_tag: nil)
+      fail(TypeError, '!Core::Call') unless call.is_a? Core::Call
       unless q.is_a? Core::CompletionQueue
-        fail(ArgumentError, 'not a CompletionQueue')
+        fail(TypeError, '!Core::CompletionQueue')
       end
       @call = call
       @cq = q
       @deadline = deadline
-      @finished_tag = finished_tag
-      @read_metadata_tag = read_metadata_tag
       @marshal = marshal
       @started = started
       @unmarshal = unmarshal
+      @metadata_tag = metadata_tag
     end
 
     # Obtains the status of the call.
@@ -176,51 +165,38 @@ module GRPC
 
     # writes_done indicates that all writes are completed.
     #
-    # It blocks until the remote endpoint acknowledges by sending a FINISHED
-    # event, unless assert_finished is set to false.  Any calls to
-    # #remote_send after this call will fail.
+    # It blocks until the remote endpoint acknowledges with at status unless
+    # assert_finished is set to false.  Any calls to #remote_send after this
+    # call will fail.
     #
     # @param assert_finished [true, false] when true(default), waits for
     # FINISHED.
     def writes_done(assert_finished = true)
-      @call.writes_done(self)
-      ev = @cq.pluck(self, INFINITE_FUTURE)
-      begin
-        assert_event_type(ev, FINISH_ACCEPTED)
-        logger.debug("Writes done: waiting for finish? #{assert_finished}")
-      ensure
-        ev.close
-      end
-
+      ops = {
+        SEND_CLOSE_FROM_CLIENT => nil
+      }
+      ops[RECV_STATUS_ON_CLIENT] = nil if assert_finished
+      @call.run_batch(@cq, self, INFINITE_FUTURE, ops)
       return unless assert_finished
-      ev = @cq.pluck(@finished_tag, INFINITE_FUTURE)
-      fail 'unexpected nil event' if ev.nil?
-      ev.close
       @call.status
     end
 
-    # finished waits until the call is completed.
+    # finished waits until a client call is completed.
     #
-    # It blocks until the remote endpoint acknowledges by sending a FINISHED
-    # event.
+    # It blocks until the remote endpoint acknowledges by sending a status.
     def finished
-      ev = @cq.pluck(@finished_tag, INFINITE_FUTURE)
-      begin
-        fail "unexpected event: #{ev.inspect}" unless ev.type == FINISHED
-        if @call.metadata.nil?
-          @call.metadata = ev.result.metadata
-        else
-          @call.metadata.merge!(ev.result.metadata)
-        end
-
-        if ev.result.code != Core::StatusCodes::OK
-          fail BadStatus.new(ev.result.code, ev.result.details)
-        end
-        res = ev.result
-      ensure
-        ev.close
+      batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE,
+                                     RECV_STATUS_ON_CLIENT => nil)
+      if @call.metadata.nil?
+        @call.metadata = batch_result.metadata
+      elsif !batch_result.metadata.nil?
+        @call.metadata.merge!(batch_result.metadata)
       end
-      res
+      if batch_result.status.code != Core::StatusCodes::OK
+        fail BadStatus.new(batch_result.status.code,
+                           batch_result.status.details)
+      end
+      batch_result
     end
 
     # remote_send sends a request to the remote endpoint.
@@ -232,72 +208,50 @@ module GRPC
     # @param marshalled [false, true] indicates if the object is already
     # marshalled.
     def remote_send(req, marshalled = false)
-      assert_queue_is_ready
       logger.debug("sending #{req.inspect}, marshalled? #{marshalled}")
       if marshalled
         payload = req
       else
         payload = @marshal.call(req)
       end
-      @call.start_write(Core::ByteBuffer.new(payload), self)
-
-      # call queue#pluck, and wait for WRITE_ACCEPTED, so as not to return
-      # until the flow control allows another send on this call.
-      ev = @cq.pluck(self, INFINITE_FUTURE)
-      begin
-        assert_event_type(ev, WRITE_ACCEPTED)
-      ensure
-        ev.close
-      end
+      @call.run_batch(@cq, self, INFINITE_FUTURE, SEND_MESSAGE => payload)
     end
 
-    # send_status sends a status to the remote endpoint
+    # send_status sends a status to the remote endpoint.
     #
     # @param code [int] the status code to send
     # @param details [String] details
     # @param assert_finished [true, false] when true(default), waits for
     # FINISHED.
     def send_status(code = OK, details = '', assert_finished = false)
-      assert_queue_is_ready
-      @call.start_write_status(code, details, self)
-      ev = @cq.pluck(self, INFINITE_FUTURE)
-      begin
-        assert_event_type(ev, FINISH_ACCEPTED)
-      ensure
-        ev.close
-      end
-      logger.debug("Status sent: #{code}:'#{details}'")
-      return finished if assert_finished
+      ops = {
+        SEND_STATUS_FROM_SERVER => Struct::Status.new(code, details)
+      }
+      ops[RECV_CLOSE_ON_SERVER] = nil if assert_finished
+      @call.run_batch(@cq, self, INFINITE_FUTURE, ops)
       nil
     end
 
     # remote_read reads a response from the remote endpoint.
     #
-    # It blocks until the remote endpoint sends a READ or FINISHED event.  On
-    # a READ, it returns the response after unmarshalling it. On
-    # FINISHED, it returns nil if the status is OK, otherwise raising
-    # BadStatus
+    # It blocks until the remote endpoint replies with a message or status.
+    # On receiving a message, it returns the response after unmarshalling it.
+    # On receiving a status, it returns nil if the status is OK, otherwise
+    # raising BadStatus
     def remote_read
-      if @call.metadata.nil? && !@read_metadata_tag.nil?
-        ev = @cq.pluck(@read_metadata_tag, INFINITE_FUTURE)
-        assert_event_type(ev, CLIENT_METADATA_READ)
-        @call.metadata = ev.result
-        @read_metadata_tag = nil
+      ops = { RECV_MESSAGE => nil }
+      ops[RECV_INITIAL_METADATA] = nil unless @metadata_tag.nil?
+      batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, ops)
+      unless @metadata_tag.nil?
+        @call.metadata = batch_result.metadata
+        @metadata_tag = nil
       end
-
-      @call.start_read(self)
-      ev = @cq.pluck(self, INFINITE_FUTURE)
-      begin
-        assert_event_type(ev, READ)
-        logger.debug("received req: #{ev.result.inspect}")
-        unless ev.result.nil?
-          logger.debug("received req.to_s: #{ev.result}")
-          res = @unmarshal.call(ev.result.to_s)
-          logger.debug("received_req (unmarshalled): #{res.inspect}")
-          return res
-        end
-      ensure
-        ev.close
+      logger.debug("received req: #{batch_result}")
+      unless batch_result.nil? || batch_result.message.nil?
+        logger.debug("received req.to_s: #{batch_result.message}")
+        res = @unmarshal.call(batch_result.message)
+        logger.debug("received_req (unmarshalled): #{res.inspect}")
+        return res
       end
       logger.debug('found nil; the final response has been sent')
       nil
@@ -324,7 +278,6 @@ module GRPC
       return enum_for(:each_remote_read) unless block_given?
       loop do
         resp = remote_read
-        break if resp.is_a? Struct::Status  # is an OK status
         break if resp.nil?  # the last response was received
         yield resp
       end
@@ -461,8 +414,7 @@ module GRPC
     # @return [Enumerator, nil] a response Enumerator
     def bidi_streamer(requests, **kw, &blk)
       start_call(**kw) unless @started
-      bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline,
-                        @finished_tag)
+      bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline)
       bd.run_on_client(requests, &blk)
     end
 
@@ -478,8 +430,7 @@ module GRPC
     #
     # @param gen_each_reply [Proc] generates the BiDi stream replies
     def run_server_bidi(gen_each_reply)
-      bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline,
-                        @finished_tag)
+      bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline)
       bd.run_on_server(gen_each_reply)
     end
 
@@ -516,21 +467,5 @@ module GRPC
     # a Operation on the client.
     Operation = view_class(:cancel, :cancelled, :deadline, :execute,
                            :metadata, :status)
-
-    # confirms that no events are enqueued, and that the queue is not
-    # shutdown.
-    def assert_queue_is_ready
-      ev = nil
-      begin
-        ev = @cq.pluck(self, ZERO)
-        fail "unexpected event #{ev.inspect}" unless ev.nil?
-      rescue OutOfTime
-        logging.debug('timed out waiting for next event')
-        # expected, nothing should be on the queue and the deadline was ZERO,
-        # except things using another tag
-      ensure
-        ev.close unless ev.nil?
-      end
-    end
   end
 end
diff --git a/src/ruby/lib/grpc/generic/rpc_desc.rb b/src/ruby/lib/grpc/generic/rpc_desc.rb
index 2cb3d2eebf43744be89af19c5caa972c1985fbe5..4074ae49336fa7cc114b868dc6348ac54f5d2247 100644
--- a/src/ruby/lib/grpc/generic/rpc_desc.rb
+++ b/src/ruby/lib/grpc/generic/rpc_desc.rb
@@ -81,7 +81,6 @@ module GRPC
         active_call.run_server_bidi(mth)
       end
       send_status(active_call, OK, 'OK')
-      active_call.finished
     rescue BadStatus => e
       # this is raised by handlers that want GRPC to send an application
       # error code and detail message.
@@ -96,10 +95,6 @@ module GRPC
       # event.  Send a status of deadline exceeded
       logger.warn("late call: #{active_call}")
       send_status(active_call, DEADLINE_EXCEEDED, 'late')
-    rescue Core::EventError => e
-      # This is raised by GRPC internals but should rarely, if ever happen.
-      # Log it, but don't notify the other endpoint..
-      logger.warn("failed call: #{active_call}\n#{e}")
     rescue StandardError => e
       # This will usuaally be an unhandled error in the handling code.
       # Send back a UNKNOWN status to the client
@@ -142,7 +137,7 @@ module GRPC
 
     def send_status(active_client, code, details)
       details = 'Not sure why' if details.nil?
-      active_client.send_status(code, details)
+      active_client.send_status(code, details, code == OK)
     rescue StandardError => e
       logger.warn("Could not send status #{code}:#{details}")
       logger.warn(e)
diff --git a/src/ruby/spec/generic/active_call_spec.rb b/src/ruby/spec/generic/active_call_spec.rb
index 1306336941c5fe9904734730c2d5154793a60c14..575871afb110aa1de18dad5c34ec6c2fc215c3be 100644
--- a/src/ruby/spec/generic/active_call_spec.rb
+++ b/src/ruby/spec/generic/active_call_spec.rb
@@ -34,12 +34,11 @@ include GRPC::Core::StatusCodes
 describe GRPC::ActiveCall do
   ActiveCall = GRPC::ActiveCall
   Call = GRPC::Core::Call
-  CompletionType = GRPC::Core::CompletionType
+  CallOps = GRPC::Core::CallOps
 
   before(:each) do
     @pass_through = proc { |x| x }
     @server_tag = Object.new
-    @server_done_tag = Object.new
     @tag = Object.new
 
     @client_queue = GRPC::Core::CompletionQueue.new
@@ -48,7 +47,7 @@ describe GRPC::ActiveCall do
     @server = GRPC::Core::Server.new(@server_queue, nil)
     server_port = @server.add_http2_port(host)
     @server.start
-    @ch = GRPC::Core::Channel.new("localhost:#{server_port}", nil)
+    @ch = GRPC::Core::Channel.new("0.0.0.0:#{server_port}", nil)
   end
 
   after(:each) do
@@ -58,12 +57,10 @@ describe GRPC::ActiveCall do
   describe 'restricted view methods' do
     before(:each) do
       call = make_test_call
-      done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
-                                                    deadline)
+      md_tag = ActiveCall.client_invoke(call, @client_queue, deadline)
       @client_call = ActiveCall.new(call, @client_queue, @pass_through,
                                     @pass_through, deadline,
-                                    finished_tag: done_tag,
-                                    read_metadata_tag: meta_tag)
+                                    metadata_tag: md_tag)
     end
 
     describe '#multi_req_view' do
@@ -90,48 +87,45 @@ describe GRPC::ActiveCall do
   describe '#remote_send' do
     it 'allows a client to send a payload to the server' do
       call = make_test_call
-      done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
-                                                    deadline)
+      md_tag = ActiveCall.client_invoke(call, @client_queue, deadline)
       @client_call = ActiveCall.new(call, @client_queue, @pass_through,
                                     @pass_through, deadline,
-                                    finished_tag: done_tag,
-                                    read_metadata_tag: meta_tag)
+                                    metadata_tag: md_tag)
       msg = 'message is a string'
       @client_call.remote_send(msg)
 
       # check that server rpc new was received
-      @server.request_call(@server_tag)
-      ev = @server_queue.next(deadline)
-      expect(ev.type).to be(CompletionType::SERVER_RPC_NEW)
-      expect(ev.call).to be_a(Call)
-      expect(ev.tag).to be(@server_tag)
+      recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline)
+      expect(recvd_rpc).to_not eq nil
+      recvd_call = recvd_rpc.call
 
       # Accept the call, and verify that the server reads the response ok.
-      ev.call.server_accept(@client_queue, @server_tag)
-      ev.call.server_end_initial_metadata
-      server_call = ActiveCall.new(ev.call, @client_queue, @pass_through,
+      server_ops = {
+        CallOps::SEND_INITIAL_METADATA => {}
+      }
+      recvd_call.run_batch(@server_queue, @server_tag, deadline, server_ops)
+      server_call = ActiveCall.new(recvd_call, @server_queue, @pass_through,
                                    @pass_through, deadline)
       expect(server_call.remote_read).to eq(msg)
     end
 
     it 'marshals the payload using the marshal func' do
       call = make_test_call
-      done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
-                                                    deadline)
+      ActiveCall.client_invoke(call, @client_queue, deadline)
       marshal = proc { |x| 'marshalled:' + x }
       client_call = ActiveCall.new(call, @client_queue, marshal,
-                                   @pass_through, deadline,
-                                   finished_tag: done_tag,
-                                   read_metadata_tag: meta_tag)
+                                   @pass_through, deadline)
       msg = 'message is a string'
       client_call.remote_send(msg)
 
       # confirm that the message was marshalled
-      @server.request_call(@server_tag)
-      ev = @server_queue.next(deadline)
-      ev.call.server_accept(@client_queue, @server_tag)
-      ev.call.server_end_initial_metadata
-      server_call = ActiveCall.new(ev.call, @client_queue, @pass_through,
+      recvd_rpc =  @server.request_call(@server_queue, @server_tag, deadline)
+      recvd_call = recvd_rpc.call
+      server_ops = {
+        CallOps::SEND_INITIAL_METADATA => nil
+      }
+      recvd_call.run_batch(@server_queue, @server_tag, deadline, server_ops)
+      server_call = ActiveCall.new(recvd_call, @server_queue, @pass_through,
                                    @pass_through, deadline)
       expect(server_call.remote_read).to eq('marshalled:' + msg)
     end
@@ -142,23 +136,22 @@ describe GRPC::ActiveCall do
       call = make_test_call
       ActiveCall.client_invoke(call, @client_queue, deadline,
                                k1: 'v1', k2: 'v2')
-      @server.request_call(@server_tag)
-      ev = @server_queue.next(deadline)
-      expect(ev).to_not be_nil
-      expect(ev.result.metadata['k1']).to eq('v1')
-      expect(ev.result.metadata['k2']).to eq('v2')
+      recvd_rpc =  @server.request_call(@server_queue, @server_tag, deadline)
+      recvd_call = recvd_rpc.call
+      expect(recvd_call).to_not be_nil
+      expect(recvd_rpc.metadata).to_not be_nil
+      expect(recvd_rpc.metadata['k1']).to eq('v1')
+      expect(recvd_rpc.metadata['k2']).to eq('v2')
     end
   end
 
   describe '#remote_read' do
     it 'reads the response sent by a server' do
       call = make_test_call
-      done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
-                                                    deadline)
+      md_tag = ActiveCall.client_invoke(call, @client_queue, deadline)
       client_call = ActiveCall.new(call, @client_queue, @pass_through,
                                    @pass_through, deadline,
-                                   finished_tag: done_tag,
-                                   read_metadata_tag: meta_tag)
+                                   metadata_tag: md_tag)
       msg = 'message is a string'
       client_call.remote_send(msg)
       server_call = expect_server_to_receive(msg)
@@ -168,12 +161,10 @@ describe GRPC::ActiveCall do
 
     it 'saves no metadata when the server adds no metadata' do
       call = make_test_call
-      done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
-                                                    deadline)
+      md_tag = ActiveCall.client_invoke(call, @client_queue, deadline)
       client_call = ActiveCall.new(call, @client_queue, @pass_through,
                                    @pass_through, deadline,
-                                   finished_tag: done_tag,
-                                   read_metadata_tag: meta_tag)
+                                   metadata_tag: md_tag)
       msg = 'message is a string'
       client_call.remote_send(msg)
       server_call = expect_server_to_receive(msg)
@@ -185,12 +176,10 @@ describe GRPC::ActiveCall do
 
     it 'saves metadata add by the server' do
       call = make_test_call
-      done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
-                                                    deadline)
+      md_tag = ActiveCall.client_invoke(call, @client_queue, deadline)
       client_call = ActiveCall.new(call, @client_queue, @pass_through,
                                    @pass_through, deadline,
-                                   finished_tag: done_tag,
-                                   read_metadata_tag: meta_tag)
+                                   metadata_tag: md_tag)
       msg = 'message is a string'
       client_call.remote_send(msg)
       server_call = expect_server_to_receive(msg, k1: 'v1', k2: 'v2')
@@ -203,12 +192,10 @@ describe GRPC::ActiveCall do
 
     it 'get a nil msg before a status when an OK status is sent' do
       call = make_test_call
-      done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
-                                                    deadline)
+      md_tag = ActiveCall.client_invoke(call, @client_queue, deadline)
       client_call = ActiveCall.new(call, @client_queue, @pass_through,
                                    @pass_through, deadline,
-                                   finished_tag: done_tag,
-                                   read_metadata_tag: meta_tag)
+                                   metadata_tag: md_tag)
       msg = 'message is a string'
       client_call.remote_send(msg)
       client_call.writes_done(false)
@@ -222,13 +209,11 @@ describe GRPC::ActiveCall do
 
     it 'unmarshals the response using the unmarshal func' do
       call = make_test_call
-      done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
-                                                    deadline)
+      md_tag = ActiveCall.client_invoke(call, @client_queue, deadline)
       unmarshal = proc { |x| 'unmarshalled:' + x }
       client_call = ActiveCall.new(call, @client_queue, @pass_through,
                                    unmarshal, deadline,
-                                   finished_tag: done_tag,
-                                   read_metadata_tag: meta_tag)
+                                   metadata_tag: md_tag)
 
       # confirm the client receives the unmarshalled message
       msg = 'message is a string'
@@ -249,13 +234,11 @@ describe GRPC::ActiveCall do
 
     it 'the returns an enumerator that can read n responses' do
       call = make_test_call
-      done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
-                                                    deadline)
+      md_tag = ActiveCall.client_invoke(call, @client_queue, deadline)
       client_call = ActiveCall.new(call, @client_queue, @pass_through,
                                    @pass_through, deadline,
-                                   finished_tag: done_tag,
-                                   read_metadata_tag: meta_tag)
-      msg = 'message is 4a string'
+                                   metadata_tag: md_tag)
+      msg = 'message is a string'
       reply = 'server_response'
       client_call.remote_send(msg)
       server_call = expect_server_to_receive(msg)
@@ -269,12 +252,10 @@ describe GRPC::ActiveCall do
 
     it 'the returns an enumerator that stops after an OK Status' do
       call = make_test_call
-      done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
-                                                    deadline)
+      md_tag = ActiveCall.client_invoke(call, @client_queue, deadline)
       client_call = ActiveCall.new(call, @client_queue, @pass_through,
                                    @pass_through, deadline,
-                                   read_metadata_tag: meta_tag,
-                                   finished_tag: done_tag)
+                                   metadata_tag: md_tag)
       msg = 'message is a string'
       reply = 'server_response'
       client_call.remote_send(msg)
@@ -294,12 +275,10 @@ describe GRPC::ActiveCall do
   describe '#writes_done' do
     it 'finishes ok if the server sends a status response' do
       call = make_test_call
-      done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
-                                                    deadline)
+      md_tag = ActiveCall.client_invoke(call, @client_queue, deadline)
       client_call = ActiveCall.new(call, @client_queue, @pass_through,
                                    @pass_through, deadline,
-                                   finished_tag: done_tag,
-                                   read_metadata_tag: meta_tag)
+                                   metadata_tag: md_tag)
       msg = 'message is a string'
       client_call.remote_send(msg)
       expect { client_call.writes_done(false) }.to_not raise_error
@@ -312,12 +291,10 @@ describe GRPC::ActiveCall do
 
     it 'finishes ok if the server sends an early status response' do
       call = make_test_call
-      done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
-                                                    deadline)
+      md_tag = ActiveCall.client_invoke(call, @client_queue, deadline)
       client_call = ActiveCall.new(call, @client_queue, @pass_through,
                                    @pass_through, deadline,
-                                   read_metadata_tag: meta_tag,
-                                   finished_tag: done_tag)
+                                   metadata_tag: md_tag)
       msg = 'message is a string'
       client_call.remote_send(msg)
       server_call = expect_server_to_receive(msg)
@@ -330,12 +307,10 @@ describe GRPC::ActiveCall do
 
     it 'finishes ok if writes_done is true' do
       call = make_test_call
-      done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
-                                                    deadline)
+      md_tag = ActiveCall.client_invoke(call, @client_queue, deadline)
       client_call = ActiveCall.new(call, @client_queue, @pass_through,
                                    @pass_through, deadline,
-                                   read_metadata_tag: meta_tag,
-                                   finished_tag: done_tag)
+                                   metadata_tag: md_tag)
       msg = 'message is a string'
       client_call.remote_send(msg)
       server_call = expect_server_to_receive(msg)
@@ -353,21 +328,20 @@ describe GRPC::ActiveCall do
   end
 
   def expect_server_to_be_invoked(**kw)
-    @server.request_call(@server_tag)
-    ev = @server_queue.next(deadline)
-    ev.call.add_metadata(kw)
-    ev.call.server_accept(@client_queue, @server_done_tag)
-    ev.call.server_end_initial_metadata
-    ActiveCall.new(ev.call, @client_queue, @pass_through,
-                   @pass_through, deadline,
-                   finished_tag: @server_done_tag)
+    recvd_rpc =  @server.request_call(@server_queue, @server_tag, deadline)
+    expect(recvd_rpc).to_not eq nil
+    recvd_call = recvd_rpc.call
+    recvd_call.run_batch(@server_queue, @server_tag, deadline,
+                         CallOps::SEND_INITIAL_METADATA => kw)
+    ActiveCall.new(recvd_call, @server_queue, @pass_through,
+                   @pass_through, deadline)
   end
 
   def make_test_call
-    @ch.create_call(@client_queue, 'dummy_method', 'dummy_host', deadline)
+    @ch.create_call(@client_queue, '/method', 'a.dummy.host', deadline)
   end
 
   def deadline
-    Time.now + 1  # in 1 second; arbitrary
+    Time.now + 2  # in 2 seconds; arbitrary
   end
 end
diff --git a/src/ruby/spec/generic/rpc_desc_spec.rb b/src/ruby/spec/generic/rpc_desc_spec.rb
index 39d1e83748bf4d2812278851ed2b1fbd3fef7a56..a68299465ce35e46433c6cb2cd75d552c61a41c7 100644
--- a/src/ruby/spec/generic/rpc_desc_spec.rb
+++ b/src/ruby/spec/generic/rpc_desc_spec.rb
@@ -37,7 +37,6 @@ describe GRPC::RpcDesc do
   INTERNAL = GRPC::Core::StatusCodes::INTERNAL
   UNKNOWN = GRPC::Core::StatusCodes::UNKNOWN
   CallError = GRPC::Core::CallError
-  EventError = GRPC::Core::EventError
 
   before(:each) do
     @request_response = RpcDesc.new('rr', Object.new, Object.new, 'encode',
@@ -63,24 +62,17 @@ describe GRPC::RpcDesc do
 
       it 'sends the specified status if BadStatus is raised' do
         expect(@call).to receive(:remote_read).once.and_return(Object.new)
-        expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK')
+        expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK', false)
         @request_response.run_server_method(@call, method(:bad_status))
       end
 
       it 'sends status UNKNOWN if other StandardErrors are raised' do
         expect(@call).to receive(:remote_read).once.and_return(Object.new)
-        expect(@call).to receive(:send_status) .once.with(UNKNOWN, @no_reason)
+        expect(@call).to receive(:send_status) .once.with(UNKNOWN, @no_reason,
+                                                          false)
         @request_response.run_server_method(@call, method(:other_error))
       end
 
-      it 'absorbs EventError  with no further action' do
-        expect(@call).to receive(:remote_read).once.and_raise(EventError)
-        blk = proc do
-          @request_response.run_server_method(@call, method(:fake_reqresp))
-        end
-        expect(&blk).to_not raise_error
-      end
-
       it 'absorbs CallError with no further action' do
         expect(@call).to receive(:remote_read).once.and_raise(CallError)
         blk = proc do
@@ -93,8 +85,7 @@ describe GRPC::RpcDesc do
         req = Object.new
         expect(@call).to receive(:remote_read).once.and_return(req)
         expect(@call).to receive(:remote_send).once.with(@ok_response)
-        expect(@call).to receive(:send_status).once.with(OK, 'OK')
-        expect(@call).to receive(:finished).once
+        expect(@call).to receive(:send_status).once.with(OK, 'OK', true)
         @request_response.run_server_method(@call, method(:fake_reqresp))
       end
     end
@@ -107,23 +98,16 @@ describe GRPC::RpcDesc do
       end
 
       it 'sends the specified status if BadStatus is raised' do
-        expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK')
+        expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK', false)
         @client_streamer.run_server_method(@call, method(:bad_status_alt))
       end
 
       it 'sends status UNKNOWN if other StandardErrors are raised' do
-        expect(@call).to receive(:send_status) .once.with(UNKNOWN, @no_reason)
+        expect(@call).to receive(:send_status) .once.with(UNKNOWN, @no_reason,
+                                                          false)
         @client_streamer.run_server_method(@call, method(:other_error_alt))
       end
 
-      it 'absorbs EventError  with no further action' do
-        expect(@call).to receive(:remote_send).once.and_raise(EventError)
-        blk = proc do
-          @client_streamer.run_server_method(@call, method(:fake_clstream))
-        end
-        expect(&blk).to_not raise_error
-      end
-
       it 'absorbs CallError with no further action' do
         expect(@call).to receive(:remote_send).once.and_raise(CallError)
         blk = proc do
@@ -134,8 +118,7 @@ describe GRPC::RpcDesc do
 
       it 'sends a response and closes the stream if there no errors' do
         expect(@call).to receive(:remote_send).once.with(@ok_response)
-        expect(@call).to receive(:send_status).once.with(OK, 'OK')
-        expect(@call).to receive(:finished).once
+        expect(@call).to receive(:send_status).once.with(OK, 'OK', true)
         @client_streamer.run_server_method(@call, method(:fake_clstream))
       end
     end
@@ -149,24 +132,17 @@ describe GRPC::RpcDesc do
 
       it 'sends the specified status if BadStatus is raised' do
         expect(@call).to receive(:remote_read).once.and_return(Object.new)
-        expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK')
+        expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK', false)
         @server_streamer.run_server_method(@call, method(:bad_status))
       end
 
       it 'sends status UNKNOWN if other StandardErrors are raised' do
         expect(@call).to receive(:remote_read).once.and_return(Object.new)
-        expect(@call).to receive(:send_status) .once.with(UNKNOWN, @no_reason)
+        expect(@call).to receive(:send_status) .once.with(UNKNOWN, @no_reason,
+                                                          false)
         @server_streamer.run_server_method(@call, method(:other_error))
       end
 
-      it 'absorbs EventError  with no further action' do
-        expect(@call).to receive(:remote_read).once.and_raise(EventError)
-        blk = proc do
-          @server_streamer.run_server_method(@call, method(:fake_svstream))
-        end
-        expect(&blk).to_not raise_error
-      end
-
       it 'absorbs CallError with no further action' do
         expect(@call).to receive(:remote_read).once.and_raise(CallError)
         blk = proc do
@@ -179,8 +155,7 @@ describe GRPC::RpcDesc do
         req = Object.new
         expect(@call).to receive(:remote_read).once.and_return(req)
         expect(@call).to receive(:remote_send).twice.with(@ok_response)
-        expect(@call).to receive(:send_status).once.with(OK, 'OK')
-        expect(@call).to receive(:finished).once
+        expect(@call).to receive(:send_status).once.with(OK, 'OK', true)
         @server_streamer.run_server_method(@call, method(:fake_svstream))
       end
     end
@@ -197,20 +172,20 @@ describe GRPC::RpcDesc do
       it 'sends the specified status if BadStatus is raised' do
         e = GRPC::BadStatus.new(@bs_code, 'NOK')
         expect(@call).to receive(:run_server_bidi).and_raise(e)
-        expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK')
+        expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK', false)
         @bidi_streamer.run_server_method(@call, method(:bad_status_alt))
       end
 
       it 'sends status UNKNOWN if other StandardErrors are raised' do
         expect(@call).to receive(:run_server_bidi).and_raise(StandardError)
-        expect(@call).to receive(:send_status).once.with(UNKNOWN, @no_reason)
+        expect(@call).to receive(:send_status).once.with(UNKNOWN, @no_reason,
+                                                         false)
         @bidi_streamer.run_server_method(@call, method(:other_error_alt))
       end
 
       it 'closes the stream if there no errors' do
         expect(@call).to receive(:run_server_bidi)
-        expect(@call).to receive(:send_status).once.with(OK, 'OK')
-        expect(@call).to receive(:finished).once
+        expect(@call).to receive(:send_status).once.with(OK, 'OK', true)
         @bidi_streamer.run_server_method(@call, method(:fake_bidistream))
       end
     end