diff --git a/src/ruby/pb/test/client.rb b/src/ruby/pb/test/client.rb
index 73494e6aab1a9905e9cf21cba1eae2b78dd9a20b..33f40c2b9d0d39155fd410758972e6fdfc226f81 100755
--- a/src/ruby/pb/test/client.rb
+++ b/src/ruby/pb/test/client.rb
@@ -236,29 +236,18 @@ end
 # Wraps a Queue to yield items to it.
 # Intended to be used to wrap a call_op as well, and to adjust
 # the write flag of the call_op in between messages yielded to it.
-class WriteFlagSettingEnumeratorQueue
-  extend Forwardable
-  def_delegators :@q, :push
+class WriteFlagSettingStreamingInputEnumerable
   attr_accessor :call_op
 
-  def initialize(sentinel)
-    @q = Queue.new
-    @sentinel = sentinel
-    @received_notes = {}
+  def initialize(requests_and_write_flags)
+    @requests_and_write_flags = requests_and_write_flags
   end
 
-  def each_item
-    return enum_for(:each_item) unless block_given?
-    loop do
-      request_and_write_flag = @q.pop
-      break if request_and_write_flag.equal?(@sentinel)
-      fail request_and_write_flag if
-        request_and_write_flag.is_a? Exception
-
-      @call_op.write_flag = request_and_write_flag[:write_flag] if
-        request_and_write_flag[:write_flag]
-
-      yield request_and_write_flag[:request]
+  def each
+    @requests_and_write_flags.each do |request_and_flag|
+      @call_op.write_flag = request_and_flag[:write_flag] if
+        request_and_flag[:write_flag]
+      yield request_and_flag[:request]
     end
   end
 end
@@ -415,35 +404,25 @@ class NamedTests
                                  metadata: request_uncompressed_args)
     end
 
-    # Create the deferred enumerator, start the streaming call with it, and
-    # set the enumerator's call_op to the call.
-    requests = WriteFlagSettingEnumeratorQueue.new(self)
-    call_op = @stub.streaming_input_call(requests.each_item,
-                                         return_op: true)
-    requests.call_op = call_op
-
-    request_thread = Thread.new do
-      call_op.execute
-    end
-
-    # send a compressed request
-    requests.push({ request: first_request })
-
-    # send an uncompressed request
     second_request = StreamingInputCallRequest.new(
       payload: Payload.new(type: :COMPRESSABLE, body: nulls(45_904)),
       expect_compressed: BoolValue.new(value: false)
     )
 
-    requests.push(
+    # Create the requests messages and the corresponding write flags
+    # for each message
+    requests = WriteFlagSettingStreamingInputEnumerable.new([
+      { request: first_request },
       { request: second_request,
-        write_flag: GRPC::Core::WriteFlags::NO_COMPRESS
-      })
+        write_flag: GRPC::Core::WriteFlags::NO_COMPRESS }
+    ])
 
-    # Close the input stream
-    requests.push(self)
-
-    resp = request_thread.value
+    # Create the call_op, pass it to the requests enumerable, and
+    # run the call
+    call_op = @stub.streaming_input_call(requests,
+                                         return_op: true)
+    requests.call_op = call_op
+    resp = call_op.execute
 
     wanted_aggregate_size = 73_086