diff --git a/src/ruby/bin/interop/interop_client.rb b/src/ruby/bin/interop/interop_client.rb
index af7a1d5b15d799eb1d7f3bdebf900119ea12bd42..6f1fe2614f46a8ddc19dcb81ab6a9f9f2afe69fd 100755
--- a/src/ruby/bin/interop/interop_client.rb
+++ b/src/ruby/bin/interop/interop_client.rb
@@ -136,12 +136,14 @@ class PingPongPlayer
   include Grpc::Testing::PayloadType
   attr_accessor :assertions # required by Minitest::Assertions
   attr_accessor :queue
+  attr_accessor :canceller_op
 
   # reqs is the enumerator over the requests
   def initialize(msg_sizes)
     @queue = Queue.new
     @msg_sizes = msg_sizes
     @assertions = 0  # required by Minitest::Assertions
+    @canceller_op = nil  # used to cancel after the first response
   end
 
   def each_item
@@ -155,12 +157,15 @@ class PingPongPlayer
                         response_parameters: [p_cls.new(size: resp_size)])
       yield req
       resp = @queue.pop
-      assert_equal(:COMPRESSABLE, resp.payload.type,
-                   'payload type is wrong')
+      assert_equal(:COMPRESSABLE, resp.payload.type, 'payload type is wrong')
       assert_equal(resp_size, resp.payload.body.length,
-                   'payload body #{i} has the wrong length')
+                   "payload body #{count} has the wrong length")
       p "OK: ping_pong #{count}"
       count += 1
+      unless @canceller_op.nil?
+        canceller_op.cancel
+        break
+      end
     end
   end
 end
@@ -260,6 +265,27 @@ class NamedTests
     p 'OK: ping_pong'
   end
 
+  def cancel_after_begin
+    msg_sizes = [27_182, 8, 1828, 45_904]
+    reqs = msg_sizes.map do |x|
+      req = Payload.new(body: nulls(x))
+      StreamingInputCallRequest.new(payload: req)
+    end
+    op = @stub.streaming_input_call(reqs, return_op: true)
+    op.cancel
+    assert_raises(GRPC::Cancelled) { op.execute }
+    p 'OK: cancel_after_begin'
+  end
+
+  def cancel_after_first
+    msg_sizes = [[27_182, 31_415], [8, 9], [1828, 2653], [45_904, 58_979]]
+    ppp = PingPongPlayer.new(msg_sizes)
+    op = @stub.full_duplex_call(ppp.each_item, return_op: true)
+    ppp.canceller_op = op  # causes ppp to cancel after the 1st message
+    assert_raises(GRPC::Cancelled) { op.execute.each { |r| ppp.queue.push(r) } }
+    p 'OK: cancel_after_first'
+  end
+
   def all
     all_methods = NamedTests.instance_methods(false).map(&:to_s)
     all_methods.each do |m|
diff --git a/src/ruby/lib/grpc/errors.rb b/src/ruby/lib/grpc/errors.rb
index b23793730f7b4e96ddd0411e16433c064d0325fb..35e9c02a946364fa1a528e40274f1d4b7c85f710 100644
--- a/src/ruby/lib/grpc/errors.rb
+++ b/src/ruby/lib/grpc/errors.rb
@@ -54,4 +54,8 @@ module GRPC
       Status.new(code, details)
     end
   end
+
+  # Cancelled is an exception class that indicates that an rpc was cancelled.
+  class Cancelled < StandardError
+  end
 end
diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb
index 489349c2c996fb4578c0fc0a13563b51d4613413..8d63de41450b78c05250ff2c5f3680b1e2ff3391 100644
--- a/src/ruby/lib/grpc/generic/active_call.rb
+++ b/src/ruby/lib/grpc/generic/active_call.rb
@@ -30,6 +30,22 @@
 require 'forwardable'
 require 'grpc/generic/bidi_call'
 
+class Struct
+  # BatchResult is the struct returned by calls to call#start_batch.
+  class BatchResult
+    # check_status returns the status, raising an error if the status
+    # is non-nil and not OK.
+    def check_status
+      return nil if status.nil?
+      fail GRPC::Cancelled if status.code == GRPC::Core::StatusCodes::CANCELLED
+      if status.code != GRPC::Core::StatusCodes::OK
+        fail GRPC::BadStatus.new(status.code, status.details)
+      end
+      status
+    end
+  end
+end
+
 # GRPC contains the General RPC module.
 module GRPC
   # The ActiveCall class provides simple methods for sending marshallable
@@ -38,7 +54,9 @@ module GRPC
     include Core::StatusCodes
     include Core::TimeConsts
     include Core::CallOps
+    extend Forwardable
     attr_reader(:deadline)
+    def_delegators :@call, :cancel, :metadata
 
     # client_invoke begins a client invocation.
     #
@@ -101,50 +119,6 @@ module GRPC
       @metadata_tag = metadata_tag
     end
 
-    # Obtains the status of the call.
-    #
-    # this value is nil until the call completes
-    # @return this call's status
-    def status
-      @call.status
-    end
-
-    # Obtains the metadata of the call.
-    #
-    # At the start of the call this will be nil.  During the call this gets
-    # some values as soon as the other end of the connection acknowledges the
-    # request.
-    #
-    # @return this calls's metadata
-    def metadata
-      @call.metadata
-    end
-
-    # Cancels the call.
-    #
-    # Cancels the call.  The call does not return any result, but once this it
-    # has been called, the call should eventually terminate.  Due to potential
-    # races between the execution of the cancel and the in-flight request, the
-    # result of the call after calling #cancel is indeterminate:
-    #
-    # - the call may terminate with a BadStatus exception, with code=CANCELLED
-    # - the call may terminate with OK Status, and return a response
-    # - the call may terminate with a different BadStatus exception if that
-    #   was happening
-    def cancel
-      @call.cancel
-    end
-
-    # indicates if the call is shutdown
-    def shutdown
-      @shutdown ||= false
-    end
-
-    # indicates if the call is cancelled.
-    def cancelled
-      @cancelled ||= false
-    end
-
     # multi_req_view provides a restricted view of this ActiveCall for use
     # in a server client-streaming handler.
     def multi_req_view
@@ -176,9 +150,9 @@ module GRPC
         SEND_CLOSE_FROM_CLIENT => nil
       }
       ops[RECV_STATUS_ON_CLIENT] = nil if assert_finished
-      @call.run_batch(@cq, self, INFINITE_FUTURE, ops)
+      batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, ops)
       return unless assert_finished
-      @call.status
+      batch_result.check_status
     end
 
     # finished waits until a client call is completed.
@@ -192,17 +166,12 @@ module GRPC
       elsif !batch_result.metadata.nil?
         @call.metadata.merge!(batch_result.metadata)
       end
-      if batch_result.status.code != Core::StatusCodes::OK
-        fail BadStatus.new(batch_result.status.code,
-                           batch_result.status.details)
-      end
-      batch_result
+      batch_result.check_status
     end
 
     # remote_send sends a request to the remote endpoint.
     #
-    # It blocks until the remote endpoint acknowledges by sending a
-    # WRITE_ACCEPTED.  req can be marshalled already.
+    # It blocks until the remote endpoint accepts the message.
     #
     # @param req [Object, String] the object to send or it's marshal form.
     # @param marshalled [false, true] indicates if the object is already
@@ -332,6 +301,9 @@ module GRPC
       response = remote_read
       finished unless response.is_a? Struct::Status
       response
+    rescue GRPC::Core::CallError => e
+      finished  # checks for Cancelled
+      raise e
     end
 
     # client_streamer sends a stream of requests to a GRPC server, and
@@ -355,6 +327,9 @@ module GRPC
       response = remote_read
       finished unless response.is_a? Struct::Status
       response
+    rescue GRPC::Core::CallError => e
+      finished  # checks for Cancelled
+      raise e
     end
 
     # server_streamer sends one request to the GRPC server, which yields a
@@ -381,6 +356,9 @@ module GRPC
       replies = enum_for(:each_remote_read_then_finish)
       return replies unless block_given?
       replies.each { |r| yield r }
+    rescue GRPC::Core::CallError => e
+      finished  # checks for Cancelled
+      raise e
     end
 
     # bidi_streamer sends a stream of requests to the GRPC server, and yields
@@ -416,6 +394,9 @@ module GRPC
       start_call(**kw) unless @started
       bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline)
       bd.run_on_client(requests, &blk)
+    rescue GRPC::Core::CallError => e
+      finished  # checks for Cancelled
+      raise e
     end
 
     # run_server_bidi orchestrates a BiDi stream processing on a server.
@@ -436,9 +417,10 @@ module GRPC
 
     private
 
+    # Starts the call if not already started
     def start_call(**kw)
-      tags = ActiveCall.client_invoke(@call, @cq, @deadline, **kw)
-      @finished_tag, @read_metadata_tag = tags
+      return if @started
+      @metadata_tag = ActiveCall.client_invoke(@call, @cq, @deadline, **kw)
       @started = true
     end
 
@@ -466,6 +448,6 @@ module GRPC
     # Operation limits access to an ActiveCall's methods for use as
     # a Operation on the client.
     Operation = view_class(:cancel, :cancelled, :deadline, :execute,
-                           :metadata, :status)
+                           :metadata, :status, :start_call)
   end
 end
diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb
index 1c1b3b0db78a20d5491d81bb639559b74eda57e8..b813ab5b542420df20986b40513c5cc81e1003bb 100644
--- a/src/ruby/lib/grpc/generic/bidi_call.rb
+++ b/src/ruby/lib/grpc/generic/bidi_call.rb
@@ -78,13 +78,11 @@ module GRPC
     # @param requests the Enumerable of requests to send
     # @return an Enumerator of requests to yield
     def run_on_client(requests, &blk)
-      enq_th = start_write_loop(requests)
-      loop_th = start_read_loop
+      @enq_th = start_write_loop(requests)
+      @loop_th = start_read_loop
       replies = each_queued_msg
       return replies if blk.nil?
       replies.each { |r| blk.call(r) }
-      enq_th.join
-      loop_th.join
     end
 
     # Begins orchestration of the Bidi stream for a server generating replies.
@@ -100,10 +98,8 @@ module GRPC
     # @param gen_each_reply [Proc] generates the BiDi stream replies.
     def run_on_server(gen_each_reply)
       replys = gen_each_reply.call(each_queued_msg)
-      enq_th = start_write_loop(replys, is_client: false)
-      loop_th = start_read_loop
-      loop_th.join
-      enq_th.join
+      @enq_th = start_write_loop(replys, is_client: false)
+      @loop_th = start_read_loop
     end
 
     private
@@ -122,10 +118,13 @@ module GRPC
         logger.debug("each_queued_msg: msg##{count}")
         count += 1
         req = @readq.pop
+        logger.debug("each_queued_msg: req = #{req}")
         throw req if req.is_a? StandardError
         break if req.equal?(END_OF_READS)
         yield req
       end
+      @loop_th.join
+      @enq_th.join
     end
 
     # during bidi-streaming, read the requests to send from a separate thread
@@ -136,20 +135,23 @@ module GRPC
         begin
           count = 0
           requests.each do |req|
+            logger.debug("bidi-write_loop: #{count}")
             count += 1
             payload = @marshal.call(req)
             @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
                             SEND_MESSAGE => payload)
           end
           if is_client
-            logger.debug("bidi-client: sent #{count} reqs, waiting to finish")
-            @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
-                            SEND_CLOSE_FROM_CLIENT => nil,
-                            RECV_STATUS_ON_CLIENT => nil)
+            logger.debug("bidi-write-loop: sent #{count}, waiting to finish")
+            batch_result = @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
+                                           SEND_CLOSE_FROM_CLIENT => nil,
+                                           RECV_STATUS_ON_CLIENT => nil)
+            batch_result.check_status
           end
         rescue StandardError => e
-          logger.warn('bidi: write_loop failed')
+          logger.warn('bidi-write_loop: failed')
           logger.warn(e)
+          raise e
         end
       end
     end
@@ -163,7 +165,7 @@ module GRPC
 
           # queue the initial read before beginning the loop
           loop do
-            logger.debug("waiting for read #{count}")
+            logger.debug("bidi-read_loop: #{count}")
             count += 1
             # TODO: ensure metadata is read if available, currently it's not
             batch_result = @call.run_batch(@cq, read_tag, INFINITE_FUTURE,
@@ -171,7 +173,7 @@ module GRPC
             # handle the next message
             if batch_result.message.nil?
               @readq.push(END_OF_READS)
-              logger.debug('done reading!')
+              logger.debug('bidi-read-loop: done reading!')
               break
             end
 
diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb
index 245999ea03018798d104bed98ce028c589fa901f..1323bacfa60e9fc40b39bdf8f77a37bdc82c0dd9 100644
--- a/src/ruby/spec/generic/rpc_server_spec.rb
+++ b/src/ruby/spec/generic/rpc_server_spec.rb
@@ -388,6 +388,23 @@ describe GRPC::RpcServer do
         t.join
       end
 
+      it 'should handle cancellation correctly', server: true do
+        service = SlowService.new
+        @srv.handle(service)
+        t = Thread.new { @srv.run }
+        @srv.wait_till_running
+        req = EchoMsg.new
+        stub = SlowStub.new(@host, **@client_opts)
+        op = stub.an_rpc(req, k1: 'v1', k2: 'v2', return_op: true)
+        Thread.new do  # cancel the call
+          sleep 0.1
+          op.cancel
+        end
+        expect { op.execute }.to raise_error GRPC::Cancelled
+        @srv.stop
+        t.join
+      end
+
       it 'should receive updated metadata', server: true do
         service = EchoService.new
         @srv.handle(service)