diff --git a/src/ruby/bin/interop/interop_client.rb b/src/ruby/bin/interop/interop_client.rb
index 8df03ffb3cd2b480d45b63d1de12b93d62b76de5..16fb1b199dd064ccc11402192a00c2c36c8db3b5 100755
--- a/src/ruby/bin/interop/interop_client.rb
+++ b/src/ruby/bin/interop/interop_client.rb
@@ -274,6 +274,7 @@ class NamedTests
     op = @stub.streaming_input_call(reqs, return_op: true)
     op.cancel
     assert_raises(GRPC::Cancelled) { op.execute }
+    assert(op.cancelled, 'call operation should be CANCELLED')
     p 'OK: cancel_after_begin'
   end
 
@@ -282,7 +283,8 @@ class NamedTests
     ppp = PingPongPlayer.new(msg_sizes)
     op = @stub.full_duplex_call(ppp.each_item, return_op: true)
     ppp.canceller_op = op  # causes ppp to cancel after the 1st message
-    assert_raises(GRPC::Cancelled) { op.execute.each { |r| ppp.queue.push(r) } }
+    op.execute.each { |r| ppp.queue.push(r) }
+    assert(op.cancelled, 'call operation should be CANCELLED')
     p 'OK: cancel_after_first_response'
   end
 
diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb
index 04abab8ac37de2d2004c86a16cb626044ce5c0d2..3814ef34b416ca2fbaebf860e7ca79a6e956b86f 100644
--- a/src/ruby/lib/grpc/generic/active_call.rb
+++ b/src/ruby/lib/grpc/generic/active_call.rb
@@ -55,7 +55,6 @@ module GRPC
   # The ActiveCall class provides simple methods for sending marshallable
   # data to a call
   class ActiveCall
-    include Core::StatusCodes
     include Core::TimeConsts
     include Core::CallOps
     extend Forwardable
@@ -129,6 +128,11 @@ module GRPC
       @output_metadata ||= {}
     end
 
+    # cancelled indicates if the call was cancelled
+    def cancelled
+      !@call.status.nil? && @call.status.code == Core::StatusCodes::CANCELLED
+    end
+
     # multi_req_view provides a restricted view of this ActiveCall for use
     # in a server client-streaming handler.
     def multi_req_view
@@ -162,6 +166,7 @@ module GRPC
       ops[RECV_STATUS_ON_CLIENT] = nil if assert_finished
       batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, ops)
       return unless assert_finished
+      @call.status = batch_result.status
       batch_result.check_status
     end
 
@@ -178,6 +183,7 @@ module GRPC
           @call.metadata.merge!(batch_result.status.metadata)
         end
       end
+      @call.status = batch_result.status
       batch_result.check_status
     end
 
@@ -410,9 +416,6 @@ module GRPC
       start_call(**kw) unless @started
       bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline)
       bd.run_on_client(requests, &blk)
-    rescue GRPC::Core::CallError => e
-      finished  # checks for Cancelled
-      raise e
     end
 
     # run_server_bidi orchestrates a BiDi stream processing on a server.
diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb
index f1b9f6b00da56b541c2a2afbb2eaf482dbd3e9fe..489dd5162a02ab3016b55c4f7b6edf9b6f14cdf6 100644
--- a/src/ruby/lib/grpc/generic/bidi_call.rb
+++ b/src/ruby/lib/grpc/generic/bidi_call.rb
@@ -78,11 +78,9 @@ module GRPC
     # @param requests the Enumerable of requests to send
     # @return an Enumerator of requests to yield
     def run_on_client(requests, &blk)
-      @enq_th = start_write_loop(requests)
+      @enq_th = Thread.new { write_loop(requests) }
       @loop_th = start_read_loop
-      replies = each_queued_msg
-      return replies if blk.nil?
-      replies.each { |r| blk.call(r) }
+      each_queued_msg(&blk)
     end
 
     # Begins orchestration of the Bidi stream for a server generating replies.
@@ -98,9 +96,8 @@ module GRPC
     # @param gen_each_reply [Proc] generates the BiDi stream replies.
     def run_on_server(gen_each_reply)
       replys = gen_each_reply.call(each_queued_msg)
-      @enq_th = start_write_loop(replys, is_client: false)
       @loop_th = start_read_loop
-      @enq_th.join if @enq_th.alive?
+      write_loop(replys, is_client: false)
     end
 
     private
@@ -126,37 +123,32 @@ module GRPC
       end
     end
 
-    # during bidi-streaming, read the requests to send from a separate thread
-    # read so that read_loop does not block waiting for requests to read.
-    def start_write_loop(requests, is_client: true)
-      Thread.new do  # TODO: run on a thread pool
-        GRPC.logger.debug('bidi-write-loop: starting')
-        begin
-          write_tag = Object.new
-          count = 0
-          requests.each do |req|
-            GRPC.logger.debug("bidi-write-loop: #{count}")
-            count += 1
-            payload = @marshal.call(req)
-            @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
-                            SEND_MESSAGE => payload)
-          end
-          GRPC.logger.debug("bidi-write-loop: #{count} writes done")
-          if is_client
-            GRPC.logger.debug("bidi-write-loop: client sent #{count}, waiting")
-            @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
-                            SEND_CLOSE_FROM_CLIENT => nil)
-            batch_result = @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
-                                           RECV_STATUS_ON_CLIENT => nil)
-            batch_result.check_status
-          end
-        rescue StandardError => e
-          GRPC.logger.warn('bidi-write-loop: failed')
-          GRPC.logger.warn(e)
-          raise e
-        end
-        GRPC.logger.debug('bidi-write-loop: finished')
+    def write_loop(requests, is_client: true)
+      GRPC.logger.debug('bidi-write-loop: starting')
+      write_tag = Object.new
+      count = 0
+      requests.each do |req|
+        GRPC.logger.debug("bidi-write-loop: #{count}")
+        count += 1
+        payload = @marshal.call(req)
+        @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
+                        SEND_MESSAGE => payload)
+      end
+      GRPC.logger.debug("bidi-write-loop: #{count} writes done")
+      if is_client
+        GRPC.logger.debug("bidi-write-loop: client sent #{count}, waiting")
+        batch_result = @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
+                                       SEND_CLOSE_FROM_CLIENT => nil,
+                                       RECV_STATUS_ON_CLIENT => nil)
+        @call.status = batch_result.status
+        batch_result.check_status
+        GRPC.logger.debug("bidi-write-loop: done status #{@call.status}")
       end
+      GRPC.logger.debug('bidi-write-loop: finished')
+    rescue StandardError => e
+      GRPC.logger.warn('bidi-write-loop: failed')
+      GRPC.logger.warn(e)
+      raise e
     end
 
     # starts the read loop