diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb
index c2ac3c4dafea671df25614066f7e1f4ebecf7281..75ddff0bfd91e35c0543423b88087051c88d0ca9 100644
--- a/src/ruby/lib/grpc/generic/bidi_call.rb
+++ b/src/ruby/lib/grpc/generic/bidi_call.rb
@@ -61,7 +61,6 @@ module GRPC
       @call = call
       @marshal = marshal
       @op_notifier = nil  # signals completion on clients
-      @readq = Queue.new
       @unmarshal = unmarshal
       @metadata_received = metadata_received
       @reads_complete = false
@@ -81,8 +80,7 @@ module GRPC
     def run_on_client(requests, op_notifier, &blk)
       @op_notifier = op_notifier
       @enq_th = Thread.new { write_loop(requests) }
-      @loop_th = start_read_loop
-      each_queued_msg(&blk)
+      read_loop(&blk)
     end
 
     # Begins orchestration of the Bidi stream for a server generating replies.
@@ -97,8 +95,7 @@ module GRPC
     #
     # @param gen_each_reply [Proc] generates the BiDi stream replies.
     def run_on_server(gen_each_reply)
-      replys = gen_each_reply.call(each_queued_msg)
-      @loop_th = start_read_loop(is_client: false)
+      replys = gen_each_reply.call(read_loop(is_client: false))
       write_loop(replys, is_client: false)
     end
 
@@ -135,24 +132,6 @@ module GRPC
       batch_result
     end
 
-    # each_queued_msg yields each message on this instances readq
-    #
-    # - messages are added to the readq by #read_loop
-    # - iteration ends when the instance itself is added
-    def each_queued_msg
-      return enum_for(:each_queued_msg) unless block_given?
-      count = 0
-      loop do
-        GRPC.logger.debug("each_queued_msg: waiting##{count}")
-        count += 1
-        req = @readq.pop
-        GRPC.logger.debug("each_queued_msg: req = #{req}")
-        fail req if req.is_a? StandardError
-        break if req.equal?(END_OF_READS)
-        yield req
-      end
-    end
-
     def write_loop(requests, is_client: true)
       GRPC.logger.debug('bidi-write-loop: starting')
       count = 0
@@ -190,47 +169,45 @@ module GRPC
       raise e
     end
 
-    # starts the read loop
-    def start_read_loop(is_client: true)
-      Thread.new do
-        GRPC.logger.debug('bidi-read-loop: starting')
-        begin
-          count = 0
-          # queue the initial read before beginning the loop
-          loop do
-            GRPC.logger.debug("bidi-read-loop: #{count}")
-            count += 1
-            batch_result = read_using_run_batch
-
-            # handle the next message
-            if batch_result.message.nil?
-              GRPC.logger.debug("bidi-read-loop: null batch #{batch_result}")
-
-              if is_client
-                batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil)
-                @call.status = batch_result.status
-                batch_result.check_status
-                GRPC.logger.debug("bidi-read-loop: done status #{@call.status}")
-              end
-
-              @readq.push(END_OF_READS)
-              GRPC.logger.debug('bidi-read-loop: done reading!')
-              break
+    # Provides an enumerator that yields results of remote reads
+    def read_loop(is_client: true)
+      return enum_for(:read_loop,
+                      is_client: is_client) unless block_given?
+      GRPC.logger.debug('bidi-read-loop: starting')
+      begin
+        count = 0
+        # queue the initial read before beginning the loop
+        loop do
+          GRPC.logger.debug("bidi-read-loop: #{count}")
+          count += 1
+          batch_result = read_using_run_batch
+
+          # handle the next message
+          if batch_result.message.nil?
+            GRPC.logger.debug("bidi-read-loop: null batch #{batch_result}")
+
+            if is_client
+              batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil)
+              @call.status = batch_result.status
+              batch_result.check_status
+              GRPC.logger.debug("bidi-read-loop: done status #{@call.status}")
             end
 
-            # push the latest read onto the queue and continue reading
-            res = @unmarshal.call(batch_result.message)
-            @readq.push(res)
+            GRPC.logger.debug('bidi-read-loop: done reading!')
+            break
           end
-        rescue StandardError => e
-          GRPC.logger.warn('bidi: read-loop failed')
-          GRPC.logger.warn(e)
-          @readq.push(e)  # let each_queued_msg terminate with this error
+
+          res = @unmarshal.call(batch_result.message)
+          yield res
         end
-        GRPC.logger.debug('bidi-read-loop: finished')
-        @reads_complete = true
-        finished
+      rescue StandardError => e
+        GRPC.logger.warn('bidi: read-loop failed')
+        GRPC.logger.warn(e)
+        raise e
       end
+      GRPC.logger.debug('bidi-read-loop: finished')
+      @reads_complete = true
+      finished
     end
   end
 end