From cdb7cccceb85a00e5518f7db916cbae24e24eea2 Mon Sep 17 00:00:00 2001
From: Tim Emiola <temiola@google.com>
Date: Thu, 13 Aug 2015 21:45:04 -0700
Subject: [PATCH] Adds the ruby timeout interop test.

Corrects some other issues

- fixes status return when calls fail by always returning the status

- resolves bidi_call client's failure to return an exception on bad
  status by swapping the wait for status to the read thread

  * this also improves the cancel_after_first_response test

Also

- adds a unit test that verifies that a bidi call will time out.
---
 src/ruby/bin/interop/interop_client.rb    | 13 +++++++++++-
 src/ruby/ext/grpc/rb_call.c               |  8 ++------
 src/ruby/lib/grpc/generic/bidi_call.rb    | 25 +++++++++++++++--------
 src/ruby/spec/generic/client_stub_spec.rb | 20 ++++++++++++++++++
 4 files changed, 50 insertions(+), 16 deletions(-)

diff --git a/src/ruby/bin/interop/interop_client.rb b/src/ruby/bin/interop/interop_client.rb
index da4caa842b..9d753a85ab 100755
--- a/src/ruby/bin/interop/interop_client.rb
+++ b/src/ruby/bin/interop/interop_client.rb
@@ -265,6 +265,17 @@ class NamedTests
     p 'OK: ping_pong'
   end
 
+  def timeout_on_sleeping_server
+    msg_sizes = [[27_182, 31_415]]
+    ppp = PingPongPlayer.new(msg_sizes)
+    resps = @stub.full_duplex_call(ppp.each_item, timeout: 0.001)
+    resps.each { |r| ppp.queue.push(r) }
+    fail 'Should have raised GRPC::BadStatus(DEADLINE_EXCEEDED)'
+  rescue GRPC::BadStatus => e
+    assert_equal(e.code, GRPC::Core::StatusCodes::DEADLINE_EXCEEDED)
+    p "OK: #{__callee__}"
+  end
+
   def cancel_after_begin
     msg_sizes = [27_182, 8, 1828, 45_904]
     reqs = msg_sizes.map do |x|
@@ -283,7 +294,7 @@ class NamedTests
     ppp = PingPongPlayer.new(msg_sizes)
     op = @stub.full_duplex_call(ppp.each_item, return_op: true)
     ppp.canceller_op = op  # causes ppp to cancel after the 1st message
-    op.execute.each { |r| ppp.queue.push(r) }
+    assert_raises(GRPC::Cancelled) { op.execute.each { |r| ppp.queue.push(r) } }
     op.wait
     assert(op.cancelled, 'call operation was not CANCELLED')
     p 'OK: cancel_after_first_response'
diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c
index 70f0795f29..b09d4e2cd9 100644
--- a/src/ruby/ext/grpc/rb_call.c
+++ b/src/ruby/ext/grpc/rb_call.c
@@ -629,13 +629,9 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag,
     rb_raise(grpc_rb_eOutOfTime, "grpc_call_start_batch timed out");
     return Qnil;
   }
-  if (!ev.success) {
-    grpc_run_batch_stack_cleanup(&st);
-    rb_raise(grpc_rb_eCallError, "start_batch completion failed");
-    return Qnil;
-  }
 
-  /* Build and return the BatchResult struct result */
+  /* Build and return the BatchResult struct result,
+     if there is an error, it's reflected in the status */
   result = grpc_run_batch_stack_build_result(&st);
   grpc_run_batch_stack_cleanup(&st);
   return result;
diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb
index 1b6a04a506..9dbbb74caf 100644
--- a/src/ruby/lib/grpc/generic/bidi_call.rb
+++ b/src/ruby/lib/grpc/generic/bidi_call.rb
@@ -97,7 +97,7 @@ module GRPC
     # @param gen_each_reply [Proc] generates the BiDi stream replies.
     def run_on_server(gen_each_reply)
       replys = gen_each_reply.call(each_queued_msg)
-      @loop_th = start_read_loop
+      @loop_th = start_read_loop(is_client: false)
       write_loop(replys, is_client: false)
     end
 
@@ -125,7 +125,7 @@ module GRPC
         count += 1
         req = @readq.pop
         GRPC.logger.debug("each_queued_msg: req = #{req}")
-        throw req if req.is_a? StandardError
+        fail req if req.is_a? StandardError
         break if req.equal?(END_OF_READS)
         yield req
       end
@@ -145,12 +145,9 @@ module GRPC
       GRPC.logger.debug("bidi-write-loop: #{count} writes done")
       if is_client
         GRPC.logger.debug("bidi-write-loop: client sent #{count}, waiting")
-        batch_result = @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
-                                       SEND_CLOSE_FROM_CLIENT => nil,
-                                       RECV_STATUS_ON_CLIENT => nil)
-        @call.status = batch_result.status
-        batch_result.check_status
-        GRPC.logger.debug("bidi-write-loop: done status #{@call.status}")
+        @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
+                        SEND_CLOSE_FROM_CLIENT => nil)
+        GRPC.logger.debug('bidi-write-loop: done')
         notify_done
       end
       GRPC.logger.debug('bidi-write-loop: finished')
@@ -162,7 +159,7 @@ module GRPC
     end
 
     # starts the read loop
-    def start_read_loop
+    def start_read_loop(is_client: true)
       Thread.new do
         GRPC.logger.debug('bidi-read-loop: starting')
         begin
@@ -175,9 +172,19 @@ module GRPC
             # TODO: ensure metadata is read if available, currently it's not
             batch_result = @call.run_batch(@cq, read_tag, INFINITE_FUTURE,
                                            RECV_MESSAGE => nil)
+
             # handle the next message
             if batch_result.message.nil?
               GRPC.logger.debug("bidi-read-loop: null batch #{batch_result}")
+
+              if is_client
+                batch_result = @call.run_batch(@cq, read_tag, INFINITE_FUTURE,
+                                               RECV_STATUS_ON_CLIENT => nil)
+                @call.status = batch_result.status
+                batch_result.check_status
+                GRPC.logger.debug("bidi-read-loop: done status #{@call.status}")
+              end
+
               @readq.push(END_OF_READS)
               GRPC.logger.debug('bidi-read-loop: done reading!')
               break
diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb
index 68d4b11790..edcc962a7d 100644
--- a/src/ruby/spec/generic/client_stub_spec.rb
+++ b/src/ruby/spec/generic/client_stub_spec.rb
@@ -408,6 +408,26 @@ describe 'ClientStub' do
 
       it_behaves_like 'bidi streaming'
     end
+
+    describe 'without enough time to run' do
+      before(:each) do
+        @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s }
+        @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s }
+        server_port = create_test_server
+        @host = "localhost:#{server_port}"
+      end
+
+      it 'should fail with DeadlineExceeded', bidi: true do
+        @server.start
+        stub = GRPC::ClientStub.new(@host, @cq)
+        blk = proc do
+          e = stub.bidi_streamer(@method, @sent_msgs, noop, noop,
+                                 timeout: 0.001)
+          e.collect { |r| r }
+        end
+        expect(&blk).to raise_error GRPC::BadStatus, /Deadline Exceeded/
+      end
+    end
   end
 
   def run_server_streamer(expected_input, replys, status, **kw)
-- 
GitLab