diff --git a/src/ruby/bin/interop/interop_client.rb b/src/ruby/bin/interop/interop_client.rb index da4caa842b6702192ec63ba00a6fc63af58db030..9d753a85abf0e6653800da2cbb0ddf3a6a39d029 100755 --- a/src/ruby/bin/interop/interop_client.rb +++ b/src/ruby/bin/interop/interop_client.rb @@ -265,6 +265,17 @@ class NamedTests p 'OK: ping_pong' end + def timeout_on_sleeping_server + msg_sizes = [[27_182, 31_415]] + ppp = PingPongPlayer.new(msg_sizes) + resps = @stub.full_duplex_call(ppp.each_item, timeout: 0.001) + resps.each { |r| ppp.queue.push(r) } + fail 'Should have raised GRPC::BadStatus(DEADLINE_EXCEEDED)' + rescue GRPC::BadStatus => e + assert_equal(e.code, GRPC::Core::StatusCodes::DEADLINE_EXCEEDED) + p "OK: #{__callee__}" + end + def cancel_after_begin msg_sizes = [27_182, 8, 1828, 45_904] reqs = msg_sizes.map do |x| @@ -283,7 +294,7 @@ class NamedTests ppp = PingPongPlayer.new(msg_sizes) op = @stub.full_duplex_call(ppp.each_item, return_op: true) ppp.canceller_op = op # causes ppp to cancel after the 1st message - op.execute.each { |r| ppp.queue.push(r) } + assert_raises(GRPC::Cancelled) { op.execute.each { |r| ppp.queue.push(r) } } op.wait assert(op.cancelled, 'call operation was not CANCELLED') p 'OK: cancel_after_first_response' diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c index 70f0795f29e0e1ada6a79c1d3e01aea77cff74c0..b09d4e2cd954e99bd15edaeec82886a31ca7770c 100644 --- a/src/ruby/ext/grpc/rb_call.c +++ b/src/ruby/ext/grpc/rb_call.c @@ -629,13 +629,9 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag, rb_raise(grpc_rb_eOutOfTime, "grpc_call_start_batch timed out"); return Qnil; } - if (!ev.success) { - grpc_run_batch_stack_cleanup(&st); - rb_raise(grpc_rb_eCallError, "start_batch completion failed"); - return Qnil; - } - /* Build and return the BatchResult struct result */ + /* Build and return the BatchResult struct result, + if there is an error, it's reflected in the status */ result = grpc_run_batch_stack_build_result(&st); grpc_run_batch_stack_cleanup(&st); return result; diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index 215c0069a3f0ceb158454ba8da724730c16d8c1c..17da401c6bb139180a5023d9167f16b84079547c 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -74,8 +74,7 @@ module GRPC # # @param call [Call] a call on which to start and invocation # @param q [CompletionQueue] the completion queue - # @param deadline [Fixnum,TimeSpec] the deadline - def self.client_invoke(call, q, _deadline, **kw) + def self.client_invoke(call, q, **kw) fail(TypeError, '!Core::Call') unless call.is_a? Core::Call unless q.is_a? Core::CompletionQueue fail(TypeError, '!Core::CompletionQueue') @@ -418,7 +417,7 @@ module GRPC # @return [Enumerator, nil] a response Enumerator def bidi_streamer(requests, **kw, &blk) start_call(**kw) unless @started - bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline) + bd = BidiCall.new(@call, @cq, @marshal, @unmarshal) bd.run_on_client(requests, @op_notifier, &blk) end @@ -434,7 +433,7 @@ module GRPC # # @param gen_each_reply [Proc] generates the BiDi stream replies def run_server_bidi(gen_each_reply) - bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline) + bd = BidiCall.new(@call, @cq, @marshal, @unmarshal) bd.run_on_server(gen_each_reply) end @@ -456,7 +455,7 @@ module GRPC # Starts the call if not already started def start_call(**kw) return if @started - @metadata_tag = ActiveCall.client_invoke(@call, @cq, @deadline, **kw) + @metadata_tag = ActiveCall.client_invoke(@call, @cq, **kw) @started = true end diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb index 3b0c71395cea560be04572b312bfa87a55e92947..9dbbb74caff191cce3fcf31cdf52d1b949f4a0be 100644 --- a/src/ruby/lib/grpc/generic/bidi_call.rb +++ b/src/ruby/lib/grpc/generic/bidi_call.rb @@ -56,15 +56,13 @@ module GRPC # the call # @param marshal [Function] f(obj)->string that marshal requests # @param unmarshal [Function] f(string)->obj that unmarshals responses - # @param deadline [Fixnum] the deadline for the call to complete - def initialize(call, q, marshal, unmarshal, deadline) + def initialize(call, q, marshal, unmarshal) fail(ArgumentError, 'not a call') unless call.is_a? Core::Call unless q.is_a? Core::CompletionQueue fail(ArgumentError, 'not a CompletionQueue') end @call = call @cq = q - @deadline = deadline @marshal = marshal @op_notifier = nil # signals completion on clients @readq = Queue.new @@ -99,7 +97,7 @@ module GRPC # @param gen_each_reply [Proc] generates the BiDi stream replies. def run_on_server(gen_each_reply) replys = gen_each_reply.call(each_queued_msg) - @loop_th = start_read_loop + @loop_th = start_read_loop(is_client: false) write_loop(replys, is_client: false) end @@ -127,7 +125,7 @@ module GRPC count += 1 req = @readq.pop GRPC.logger.debug("each_queued_msg: req = #{req}") - throw req if req.is_a? StandardError + fail req if req.is_a? StandardError break if req.equal?(END_OF_READS) yield req end @@ -147,12 +145,9 @@ module GRPC GRPC.logger.debug("bidi-write-loop: #{count} writes done") if is_client GRPC.logger.debug("bidi-write-loop: client sent #{count}, waiting") - batch_result = @call.run_batch(@cq, write_tag, INFINITE_FUTURE, - SEND_CLOSE_FROM_CLIENT => nil, - RECV_STATUS_ON_CLIENT => nil) - @call.status = batch_result.status - batch_result.check_status - GRPC.logger.debug("bidi-write-loop: done status #{@call.status}") + @call.run_batch(@cq, write_tag, INFINITE_FUTURE, + SEND_CLOSE_FROM_CLIENT => nil) + GRPC.logger.debug('bidi-write-loop: done') notify_done end GRPC.logger.debug('bidi-write-loop: finished') @@ -164,7 +159,7 @@ module GRPC end # starts the read loop - def start_read_loop + def start_read_loop(is_client: true) Thread.new do GRPC.logger.debug('bidi-read-loop: starting') begin @@ -177,9 +172,19 @@ module GRPC # TODO: ensure metadata is read if available, currently it's not batch_result = @call.run_batch(@cq, read_tag, INFINITE_FUTURE, RECV_MESSAGE => nil) + # handle the next message if batch_result.message.nil? GRPC.logger.debug("bidi-read-loop: null batch #{batch_result}") + + if is_client + batch_result = @call.run_batch(@cq, read_tag, INFINITE_FUTURE, + RECV_STATUS_ON_CLIENT => nil) + @call.status = batch_result.status + batch_result.check_status + GRPC.logger.debug("bidi-read-loop: done status #{@call.status}") + end + @readq.push(END_OF_READS) GRPC.logger.debug('bidi-read-loop: done reading!') break diff --git a/src/ruby/lib/grpc/generic/client_stub.rb b/src/ruby/lib/grpc/generic/client_stub.rb index cce718537cdf55e45e0339d4100a19d7ea076885..24ec1793f63e3f8238f162f8bbcf1576f4962630 100644 --- a/src/ruby/lib/grpc/generic/client_stub.rb +++ b/src/ruby/lib/grpc/generic/client_stub.rb @@ -161,15 +161,21 @@ module GRPC # @param marshal [Function] f(obj)->string that marshals requests # @param unmarshal [Function] f(string)->obj that unmarshals responses # @param timeout [Numeric] (optional) the max completion time in seconds + # @param deadline [Time] (optional) the time the request should complete # @param parent [Core::Call] a prior call whose reserved metadata # will be propagated by this one. # @param return_op [true|false] return an Operation if true # @return [Object] the response received from the server - def request_response(method, req, marshal, unmarshal, timeout = nil, + def request_response(method, req, marshal, unmarshal, + deadline: nil, + timeout: nil, return_op: false, parent: parent, **kw) - c = new_active_call(method, marshal, unmarshal, timeout, parent: parent) + c = new_active_call(method, marshal, unmarshal, + deadline: deadline, + timeout: timeout, + parent: parent) kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method) md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri) return c.request_response(req, **md) unless return_op @@ -222,16 +228,22 @@ module GRPC # @param requests [Object] an Enumerable of requests to send # @param marshal [Function] f(obj)->string that marshals requests # @param unmarshal [Function] f(string)->obj that unmarshals responses - # @param timeout [Numeric] the max completion time in seconds + # @param timeout [Numeric] (optional) the max completion time in seconds + # @param deadline [Time] (optional) the time the request should complete # @param return_op [true|false] return an Operation if true # @param parent [Core::Call] a prior call whose reserved metadata # will be propagated by this one. # @return [Object|Operation] the response received from the server - def client_streamer(method, requests, marshal, unmarshal, timeout = nil, + def client_streamer(method, requests, marshal, unmarshal, + deadline: nil, + timeout: nil, return_op: false, parent: nil, **kw) - c = new_active_call(method, marshal, unmarshal, timeout, parent: parent) + c = new_active_call(method, marshal, unmarshal, + deadline: deadline, + timeout: timeout, + parent: parent) kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method) md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri) return c.client_streamer(requests, **md) unless return_op @@ -292,18 +304,24 @@ module GRPC # @param req [Object] the request sent to the server # @param marshal [Function] f(obj)->string that marshals requests # @param unmarshal [Function] f(string)->obj that unmarshals responses - # @param timeout [Numeric] the max completion time in seconds + # @param timeout [Numeric] (optional) the max completion time in seconds + # @param deadline [Time] (optional) the time the request should complete # @param return_op [true|false]return an Operation if true # @param parent [Core::Call] a prior call whose reserved metadata # will be propagated by this one. # @param blk [Block] when provided, is executed for each response # @return [Enumerator|Operation|nil] as discussed above - def server_streamer(method, req, marshal, unmarshal, timeout = nil, + def server_streamer(method, req, marshal, unmarshal, + deadline: nil, + timeout: nil, return_op: false, parent: nil, **kw, &blk) - c = new_active_call(method, marshal, unmarshal, timeout, parent: parent) + c = new_active_call(method, marshal, unmarshal, + deadline: deadline, + timeout: timeout, + parent: parent) kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method) md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri) return c.server_streamer(req, **md, &blk) unless return_op @@ -404,17 +422,23 @@ module GRPC # @param marshal [Function] f(obj)->string that marshals requests # @param unmarshal [Function] f(string)->obj that unmarshals responses # @param timeout [Numeric] (optional) the max completion time in seconds + # @param deadline [Time] (optional) the time the request should complete # @param parent [Core::Call] a prior call whose reserved metadata # will be propagated by this one. # @param return_op [true|false] return an Operation if true # @param blk [Block] when provided, is executed for each response # @return [Enumerator|nil|Operation] as discussed above - def bidi_streamer(method, requests, marshal, unmarshal, timeout = nil, + def bidi_streamer(method, requests, marshal, unmarshal, + deadline: nil, + timeout: nil, return_op: false, parent: nil, **kw, &blk) - c = new_active_call(method, marshal, unmarshal, timeout, parent: parent) + c = new_active_call(method, marshal, unmarshal, + deadline: deadline, + timeout: timeout, + parent: parent) kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method) md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri) return c.bidi_streamer(requests, **md, &blk) unless return_op @@ -438,8 +462,13 @@ module GRPC # @param parent [Grpc::Call] a parent call, available when calls are # made from server # @param timeout [TimeConst] - def new_active_call(method, marshal, unmarshal, timeout = nil, parent: nil) - deadline = from_relative_time(timeout.nil? ? @timeout : timeout) + def new_active_call(method, marshal, unmarshal, + deadline: nil, + timeout: nil, + parent: nil) + if deadline.nil? + deadline = from_relative_time(timeout.nil? ? @timeout : timeout) + end call = @ch.create_call(@queue, parent, # parent call @propagate_mask, # propagation options diff --git a/src/ruby/lib/grpc/generic/service.rb b/src/ruby/lib/grpc/generic/service.rb index 3b9743ea668c1facc90810c7b9d5beb8bfc736ad..80ff669cca63fec2516f7e1d0799e3dfc0b11065 100644 --- a/src/ruby/lib/grpc/generic/service.rb +++ b/src/ruby/lib/grpc/generic/service.rb @@ -174,26 +174,24 @@ module GRPC unmarshal = desc.unmarshal_proc(:output) route = "/#{route_prefix}/#{name}" if desc.request_response? - define_method(mth_name) do |req, deadline = nil, **kw| + define_method(mth_name) do |req, **kw| GRPC.logger.debug("calling #{@host}:#{route}") - request_response(route, req, marshal, unmarshal, deadline, **kw) + request_response(route, req, marshal, unmarshal, **kw) end elsif desc.client_streamer? - define_method(mth_name) do |reqs, deadline = nil, **kw| + define_method(mth_name) do |reqs, **kw| GRPC.logger.debug("calling #{@host}:#{route}") - client_streamer(route, reqs, marshal, unmarshal, deadline, **kw) + client_streamer(route, reqs, marshal, unmarshal, **kw) end elsif desc.server_streamer? - define_method(mth_name) do |req, deadline = nil, **kw, &blk| + define_method(mth_name) do |req, **kw, &blk| GRPC.logger.debug("calling #{@host}:#{route}") - server_streamer(route, req, marshal, unmarshal, deadline, **kw, - &blk) + server_streamer(route, req, marshal, unmarshal, **kw, &blk) end else # is a bidi_stream - define_method(mth_name) do |reqs, deadline = nil, **kw, &blk| + define_method(mth_name) do |reqs, **kw, &blk| GRPC.logger.debug("calling #{@host}:#{route}") - bidi_streamer(route, reqs, marshal, unmarshal, deadline, **kw, - &blk) + bidi_streamer(route, reqs, marshal, unmarshal, **kw, &blk) end end end diff --git a/src/ruby/spec/generic/active_call_spec.rb b/src/ruby/spec/generic/active_call_spec.rb index 0bf65ba2e9404fa2c4b4da27e56d3e537255b8ef..26208b714a41a421261459ff85d158ea7657456d 100644 --- a/src/ruby/spec/generic/active_call_spec.rb +++ b/src/ruby/spec/generic/active_call_spec.rb @@ -57,7 +57,7 @@ describe GRPC::ActiveCall do describe 'restricted view methods' do before(:each) do call = make_test_call - md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) + md_tag = ActiveCall.client_invoke(call, @client_queue) @client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, metadata_tag: md_tag) @@ -87,7 +87,7 @@ describe GRPC::ActiveCall do describe '#remote_send' do it 'allows a client to send a payload to the server' do call = make_test_call - md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) + md_tag = ActiveCall.client_invoke(call, @client_queue) @client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, metadata_tag: md_tag) @@ -111,7 +111,7 @@ describe GRPC::ActiveCall do it 'marshals the payload using the marshal func' do call = make_test_call - ActiveCall.client_invoke(call, @client_queue, deadline) + ActiveCall.client_invoke(call, @client_queue) marshal = proc { |x| 'marshalled:' + x } client_call = ActiveCall.new(call, @client_queue, marshal, @pass_through, deadline) @@ -134,8 +134,7 @@ describe GRPC::ActiveCall do describe '#client_invoke' do it 'sends keywords as metadata to the server when the are present' do call = make_test_call - ActiveCall.client_invoke(call, @client_queue, deadline, - k1: 'v1', k2: 'v2') + ActiveCall.client_invoke(call, @client_queue, k1: 'v1', k2: 'v2') recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline) recvd_call = recvd_rpc.call expect(recvd_call).to_not be_nil @@ -148,7 +147,7 @@ describe GRPC::ActiveCall do describe '#remote_read' do it 'reads the response sent by a server' do call = make_test_call - md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) + md_tag = ActiveCall.client_invoke(call, @client_queue) client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, metadata_tag: md_tag) @@ -161,7 +160,7 @@ describe GRPC::ActiveCall do it 'saves no metadata when the server adds no metadata' do call = make_test_call - md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) + md_tag = ActiveCall.client_invoke(call, @client_queue) client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, metadata_tag: md_tag) @@ -176,7 +175,7 @@ describe GRPC::ActiveCall do it 'saves metadata add by the server' do call = make_test_call - md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) + md_tag = ActiveCall.client_invoke(call, @client_queue) client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, metadata_tag: md_tag) @@ -192,7 +191,7 @@ describe GRPC::ActiveCall do it 'get a nil msg before a status when an OK status is sent' do call = make_test_call - md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) + md_tag = ActiveCall.client_invoke(call, @client_queue) client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, metadata_tag: md_tag) @@ -209,7 +208,7 @@ describe GRPC::ActiveCall do it 'unmarshals the response using the unmarshal func' do call = make_test_call - md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) + md_tag = ActiveCall.client_invoke(call, @client_queue) unmarshal = proc { |x| 'unmarshalled:' + x } client_call = ActiveCall.new(call, @client_queue, @pass_through, unmarshal, deadline, @@ -234,7 +233,7 @@ describe GRPC::ActiveCall do it 'the returns an enumerator that can read n responses' do call = make_test_call - md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) + md_tag = ActiveCall.client_invoke(call, @client_queue) client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, metadata_tag: md_tag) @@ -252,7 +251,7 @@ describe GRPC::ActiveCall do it 'the returns an enumerator that stops after an OK Status' do call = make_test_call - md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) + md_tag = ActiveCall.client_invoke(call, @client_queue) client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, metadata_tag: md_tag) @@ -275,7 +274,7 @@ describe GRPC::ActiveCall do describe '#writes_done' do it 'finishes ok if the server sends a status response' do call = make_test_call - md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) + md_tag = ActiveCall.client_invoke(call, @client_queue) client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, metadata_tag: md_tag) @@ -291,7 +290,7 @@ describe GRPC::ActiveCall do it 'finishes ok if the server sends an early status response' do call = make_test_call - md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) + md_tag = ActiveCall.client_invoke(call, @client_queue) client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, metadata_tag: md_tag) @@ -307,7 +306,7 @@ describe GRPC::ActiveCall do it 'finishes ok if writes_done is true' do call = make_test_call - md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) + md_tag = ActiveCall.client_invoke(call, @client_queue) client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, metadata_tag: md_tag) diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb index 68d4b117905c44ad7edff98c377999f9bdec3233..edcc962a7dbc3e7c5cd66f1245c02cd4315bd0fe 100644 --- a/src/ruby/spec/generic/client_stub_spec.rb +++ b/src/ruby/spec/generic/client_stub_spec.rb @@ -408,6 +408,26 @@ describe 'ClientStub' do it_behaves_like 'bidi streaming' end + + describe 'without enough time to run' do + before(:each) do + @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s } + @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s } + server_port = create_test_server + @host = "localhost:#{server_port}" + end + + it 'should fail with DeadlineExceeded', bidi: true do + @server.start + stub = GRPC::ClientStub.new(@host, @cq) + blk = proc do + e = stub.bidi_streamer(@method, @sent_msgs, noop, noop, + timeout: 0.001) + e.collect { |r| r } + end + expect(&blk).to raise_error GRPC::BadStatus, /Deadline Exceeded/ + end + end end def run_server_streamer(expected_input, replys, status, **kw) diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb index 0326f6e894b32ddcaf673c8487b2136de890f902..1295fd7fddcf94e42ff1792551a61666d14d25f3 100644 --- a/src/ruby/spec/generic/rpc_server_spec.rb +++ b/src/ruby/spec/generic/rpc_server_spec.rb @@ -396,8 +396,9 @@ describe GRPC::RpcServer do @srv.wait_till_running req = EchoMsg.new stub = SlowStub.new(@host, **client_opts) - deadline = service.delay + 1.0 # wait for long enough - expect(stub.an_rpc(req, deadline, k1: 'v1', k2: 'v2')).to be_a(EchoMsg) + timeout = service.delay + 1.0 # wait for long enough + resp = stub.an_rpc(req, timeout: timeout, k1: 'v1', k2: 'v2') + expect(resp).to be_a(EchoMsg) wanted_md = [{ 'k1' => 'v1', 'k2' => 'v2' }] check_md(wanted_md, service.received_md) @srv.stop @@ -411,8 +412,8 @@ describe GRPC::RpcServer do @srv.wait_till_running req = EchoMsg.new stub = SlowStub.new(@host, **client_opts) - deadline = 0.1 # too short for SlowService to respond - blk = proc { stub.an_rpc(req, deadline, k1: 'v1', k2: 'v2') } + timeout = 0.1 # too short for SlowService to respond + blk = proc { stub.an_rpc(req, timeout: timeout, k1: 'v1', k2: 'v2') } expect(&blk).to raise_error GRPC::BadStatus wanted_md = [] expect(service.received_md).to eq(wanted_md)