diff --git a/src/ruby/.rubocop_todo.yml b/src/ruby/.rubocop_todo.yml index 02136a81a919a6b289d78fcb56ae4c7162add2ed..ed4a4438b3f893d2a31c5fe5cdb23039ac206ef0 100644 --- a/src/ruby/.rubocop_todo.yml +++ b/src/ruby/.rubocop_todo.yml @@ -1,18 +1,18 @@ # This configuration was generated by `rubocop --auto-gen-config` -# on 2015-04-16 12:30:09 -0700 using RuboCop version 0.30.0. +# on 2015-04-17 14:43:27 -0700 using RuboCop version 0.30.0. # The point is for the user to remove these configuration records # one by one as the offenses are removed from the code base. # Note that changes in the inspected code, or installation of new # versions of RuboCop, may require this file to be generated again. -# Offense count: 34 +# Offense count: 30 Metrics/AbcSize: - Max: 36 + Max: 40 # Offense count: 3 # Configuration parameters: CountComments. Metrics/ClassLength: - Max: 185 + Max: 184 # Offense count: 35 # Configuration parameters: CountComments. diff --git a/src/ruby/lib/grpc/errors.rb b/src/ruby/lib/grpc/errors.rb index 35e9c02a946364fa1a528e40274f1d4b7c85f710..f1201c17040737c350552aad505ea2fca455bd5f 100644 --- a/src/ruby/lib/grpc/errors.rb +++ b/src/ruby/lib/grpc/errors.rb @@ -36,14 +36,15 @@ module GRPC # error should be returned to the other end of a GRPC connection; when # caught it means that this end received a status error. class BadStatus < StandardError - attr_reader :code, :details + attr_reader :code, :details, :metadata # @param code [Numeric] the status code # @param details [String] the details of the exception - def initialize(code, details = 'unknown cause') + def initialize(code, details = 'unknown cause', **kw) super("#{code}:#{details}") @code = code @details = details + @metadata = kw end # Converts the exception to a GRPC::Status for use in the networking @@ -51,7 +52,7 @@ module GRPC # # @return [Status] with the same code and details def to_status - Status.new(code, details) + Struct::Status.new(code, details, @metadata) end end diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index 8d63de41450b78c05250ff2c5f3680b1e2ff3391..43ba549905986c5f175a5d7b80ca0ad73f9a5486 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -39,7 +39,10 @@ class Struct return nil if status.nil? fail GRPC::Cancelled if status.code == GRPC::Core::StatusCodes::CANCELLED if status.code != GRPC::Core::StatusCodes::OK - fail GRPC::BadStatus.new(status.code, status.details) + # raise BadStatus, propagating the metadata if present. + md = status.metadata + with_sym_keys = Hash[md.each_pair.collect { |x, y| [x.to_sym, y] }] + fail GRPC::BadStatus.new(status.code, status.details, **with_sym_keys) end status end @@ -119,6 +122,12 @@ module GRPC @metadata_tag = metadata_tag end + # output_metadata are provides access to hash that can be used to + # save metadata to be sent as trailer + def output_metadata + @output_metadata ||= {} + end + # multi_req_view provides a restricted view of this ActiveCall for use # in a server client-streaming handler. def multi_req_view @@ -161,10 +170,12 @@ module GRPC def finished batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, RECV_STATUS_ON_CLIENT => nil) - if @call.metadata.nil? - @call.metadata = batch_result.metadata - elsif !batch_result.metadata.nil? - @call.metadata.merge!(batch_result.metadata) + unless batch_result.status.nil? + if @call.metadata.nil? + @call.metadata = batch_result.status.metadata + else + @call.metadata.merge!(batch_result.status.metadata) + end end batch_result.check_status end @@ -192,9 +203,13 @@ module GRPC # @param details [String] details # @param assert_finished [true, false] when true(default), waits for # FINISHED. - def send_status(code = OK, details = '', assert_finished = false) + # + # == Keyword Arguments == + # any keyword arguments are treated as metadata to be sent to the server + # if a keyword value is a list, multiple metadata for it's key are sent + def send_status(code = OK, details = '', assert_finished = false, **kw) ops = { - SEND_STATUS_FROM_SERVER => Struct::Status.new(code, details) + SEND_STATUS_FROM_SERVER => Struct::Status.new(code, details, kw) } ops[RECV_CLOSE_ON_SERVER] = nil if assert_finished @call.run_batch(@cq, self, INFINITE_FUTURE, ops) @@ -438,12 +453,13 @@ module GRPC # SingleReqView limits access to an ActiveCall's methods for use in server # handlers that receive just one request. - SingleReqView = view_class(:cancelled, :deadline, :metadata) + SingleReqView = view_class(:cancelled, :deadline, :metadata, + :output_metadata) # MultiReqView limits access to an ActiveCall's methods for use in # server client_streamer handlers. MultiReqView = view_class(:cancelled, :deadline, :each_queued_msg, - :each_remote_read, :metadata) + :each_remote_read, :metadata, :output_metadata) # Operation limits access to an ActiveCall's methods for use as # a Operation on the client. diff --git a/src/ruby/lib/grpc/generic/rpc_desc.rb b/src/ruby/lib/grpc/generic/rpc_desc.rb index 3e48b8e51d723dba00f54e058b4df809d610d28c..10211ae2397b7a89c1f45f2482d172bb387adf22 100644 --- a/src/ruby/lib/grpc/generic/rpc_desc.rb +++ b/src/ruby/lib/grpc/generic/rpc_desc.rb @@ -80,12 +80,12 @@ module GRPC else # is a bidi_stream active_call.run_server_bidi(mth) end - send_status(active_call, OK, 'OK') + send_status(active_call, OK, 'OK', **active_call.output_metadata) rescue BadStatus => e - # this is raised by handlers that want GRPC to send an application - # error code and detail message. + # this is raised by handlers that want GRPC to send an application error + # code and detail message and some additional app-specific metadata. logger.debug("app err: #{active_call}, status:#{e.code}:#{e.details}") - send_status(active_call, e.code, e.details) + send_status(active_call, e.code, e.details, **e.metadata) rescue Core::CallError => e # This is raised by GRPC internals but should rarely, if ever happen. # Log it, but don't notify the other endpoint.. @@ -135,9 +135,9 @@ module GRPC "##{mth.name}: bad arg count; got:#{mth.arity}, want:#{want}, #{msg}" end - def send_status(active_client, code, details) + def send_status(active_client, code, details, **kw) details = 'Not sure why' if details.nil? - active_client.send_status(code, details, code == OK) + active_client.send_status(code, details, code == OK, **kw) rescue StandardError => e logger.warn("Could not send status #{code}:#{details}") logger.warn(e) diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index c7c8267fa36c2f2844b5fc67a259801cbd3fe7bf..88c24aa92bbdc18e14e13694e025c69ae3eaa1d0 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -37,6 +37,120 @@ $grpc_signals = [] # GRPC contains the General RPC module. module GRPC + # Handles the signals in $grpc_signals. + # + # @return false if the server should exit, true if not. + def handle_signals + loop do + sig = $grpc_signals.shift + case sig + when 'INT' + return false + when 'TERM' + return false + end + end + true + end + module_function :handle_signals + + # Pool is a simple thread pool. + class Pool + # Default keep alive period is 1s + DEFAULT_KEEP_ALIVE = 1 + + def initialize(size, keep_alive: DEFAULT_KEEP_ALIVE) + fail 'pool size must be positive' unless size > 0 + @jobs = Queue.new + @size = size + @stopped = false + @stop_mutex = Mutex.new + @stop_cond = ConditionVariable.new + @workers = [] + @keep_alive = keep_alive + end + + # Returns the number of jobs waiting + def jobs_waiting + @jobs.size + end + + # Runs the given block on the queue with the provided args. + # + # @param args the args passed blk when it is called + # @param blk the block to call + def schedule(*args, &blk) + fail 'already stopped' if @stopped + return if blk.nil? + logger.info('schedule another job') + @jobs << [blk, args] + end + + # Starts running the jobs in the thread pool. + def start + fail 'already stopped' if @stopped + until @workers.size == @size.to_i + next_thread = Thread.new do + catch(:exit) do # allows { throw :exit } to kill a thread + loop_execute_jobs + end + remove_current_thread + end + @workers << next_thread + end + end + + # Stops the jobs in the pool + def stop + logger.info('stopping, will wait for all the workers to exit') + @workers.size.times { schedule { throw :exit } } + @stopped = true + @stop_mutex.synchronize do # wait @keep_alive for works to stop + @stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0 + end + forcibly_stop_workers + logger.info('stopped, all workers are shutdown') + end + + protected + + # Forcibly shutdown any threads that are still alive. + def forcibly_stop_workers + return unless @workers.size > 0 + logger.info("forcibly terminating #{@workers.size} worker(s)") + @workers.each do |t| + next unless t.alive? + begin + t.exit + rescue StandardError => e + logger.warn('error while terminating a worker') + logger.warn(e) + end + end + end + + # removes the threads from workers, and signal when all the + # threads are complete. + def remove_current_thread + @stop_mutex.synchronize do + @workers.delete(Thread.current) + @stop_cond.signal if @workers.size == 0 + end + end + + def loop_execute_jobs + loop do + begin + blk, args = @jobs.pop + blk.call(*args) + rescue StandardError => e + logger.warn('Error in worker thread') + logger.warn(e) + end + end + end + end + # RpcServer hosts a number of services and makes them available on the # network. class RpcServer @@ -69,6 +183,32 @@ module GRPC %w(INT TERM).each { |sig| trap(sig) { $grpc_signals << sig } } end + # setup_cq is used by #initialize to constuct a Core::CompletionQueue from + # its arguments. + def self.setup_cq(alt_cq) + return Core::CompletionQueue.new if alt_cq.nil? + unless alt_cq.is_a? Core::CompletionQueue + fail(TypeError, '!CompletionQueue') + end + alt_cq + end + + # setup_srv is used by #initialize to constuct a Core::Server from its + # arguments. + def self.setup_srv(alt_srv, cq, **kw) + return Core::Server.new(cq, kw) if alt_srv.nil? + fail(TypeError, '!Server') unless alt_srv.is_a? Core::Server + alt_srv + end + + # setup_connect_md_proc is used by #initialize to validate the + # connect_md_proc. + def self.setup_connect_md_proc(a_proc) + return nil if a_proc.nil? + fail(TypeError, '!Proc') unless a_proc.is_a? Proc + a_proc + end + # Creates a new RpcServer. # # The RPC server is configured using keyword arguments. @@ -96,30 +236,21 @@ module GRPC # * max_waiting_requests: the maximum number of requests that are not # being handled to allow. When this limit is exceeded, the server responds # with not available to new requests + # + # * connect_md_proc: + # when non-nil is a proc for determining metadata to to send back the client + # on receiving an invocation req. The proc signature is: + # {key: val, ..} func(method_name, {key: val, ...}) def initialize(pool_size:DEFAULT_POOL_SIZE, max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS, poll_period:DEFAULT_POLL_PERIOD, completion_queue_override:nil, server_override:nil, + connect_md_proc:nil, **kw) - if completion_queue_override.nil? - cq = Core::CompletionQueue.new - else - cq = completion_queue_override - unless cq.is_a? Core::CompletionQueue - fail(ArgumentError, 'not a CompletionQueue') - end - end - @cq = cq - - if server_override.nil? - srv = Core::Server.new(@cq, kw) - else - srv = server_override - fail(ArgumentError, 'not a Server') unless srv.is_a? Core::Server - end - @server = srv - + @cq = RpcServer.setup_cq(completion_queue_override) + @server = RpcServer.setup_srv(server_override, @cq, **kw) + @connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc) @pool_size = pool_size @max_waiting_requests = max_waiting_requests @poll_period = poll_period @@ -179,22 +310,6 @@ module GRPC t.join end - # Handles the signals in $grpc_signals. - # - # @return false if the server should exit, true if not. - def handle_signals - loop do - sig = $grpc_signals.shift - case sig - when 'INT' - return false - when 'TERM' - return false - end - end - true - end - # Determines if the server is currently stopped def stopped? @stopped ||= false @@ -258,19 +373,7 @@ module GRPC end @pool.start @server.start - request_call_tag = Object.new - until stopped? - deadline = from_relative_time(@poll_period) - an_rpc = @server.request_call(@cq, request_call_tag, deadline) - next if an_rpc.nil? - c = new_active_server_call(an_rpc) - unless c.nil? - mth = an_rpc.method.to_sym - @pool.schedule(c) do |call| - rpc_descs[mth].run_server_method(call, rpc_handlers[mth]) - end - end - end + loop_handle_server_calls @running = false end @@ -297,17 +400,35 @@ module GRPC nil end + # handles calls to the server + def loop_handle_server_calls + fail 'not running' unless @running + request_call_tag = Object.new + until stopped? + deadline = from_relative_time(@poll_period) + an_rpc = @server.request_call(@cq, request_call_tag, deadline) + c = new_active_server_call(an_rpc) + unless c.nil? + mth = an_rpc.method.to_sym + @pool.schedule(c) do |call| + rpc_descs[mth].run_server_method(call, rpc_handlers[mth]) + end + end + end + end + def new_active_server_call(an_rpc) - # Accept the call. This is necessary even if a status is to be sent - # back immediately return nil if an_rpc.nil? || an_rpc.call.nil? # allow the metadata to be accessed from the call handle_call_tag = Object.new - an_rpc.call.metadata = an_rpc.metadata - # TODO: add a hook to send md + an_rpc.call.metadata = an_rpc.metadata # attaches md to call for handlers + connect_md = nil + unless @connect_md_proc.nil? + connect_md = @connect_md_proc.call(an_rpc.method, an_rpc.metadata) + end an_rpc.call.run_batch(@cq, handle_call_tag, INFINITE_FUTURE, - SEND_INITIAL_METADATA => nil) + SEND_INITIAL_METADATA => connect_md) return nil unless available?(an_rpc) return nil unless found?(an_rpc) @@ -319,93 +440,6 @@ module GRPC an_rpc.deadline) end - # Pool is a simple thread pool for running server requests. - class Pool - # Default keep alive period is 1s - DEFAULT_KEEP_ALIVE = 1 - - def initialize(size, keep_alive: DEFAULT_KEEP_ALIVE) - fail 'pool size must be positive' unless size > 0 - @jobs = Queue.new - @size = size - @stopped = false - @stop_mutex = Mutex.new - @stop_cond = ConditionVariable.new - @workers = [] - @keep_alive = keep_alive - end - - # Returns the number of jobs waiting - def jobs_waiting - @jobs.size - end - - # Runs the given block on the queue with the provided args. - # - # @param args the args passed blk when it is called - # @param blk the block to call - def schedule(*args, &blk) - fail 'already stopped' if @stopped - return if blk.nil? - logger.info('schedule another job') - @jobs << [blk, args] - end - - # Starts running the jobs in the thread pool. - def start - fail 'already stopped' if @stopped - until @workers.size == @size.to_i - next_thread = Thread.new do - catch(:exit) do # allows { throw :exit } to kill a thread - loop do - begin - blk, args = @jobs.pop - blk.call(*args) - rescue StandardError => e - logger.warn('Error in worker thread') - logger.warn(e) - end - end - end - - # removes the threads from workers, and signal when all the - # threads are complete. - @stop_mutex.synchronize do - @workers.delete(Thread.current) - @stop_cond.signal if @workers.size == 0 - end - end - @workers << next_thread - end - end - - # Stops the jobs in the pool - def stop - logger.info('stopping, will wait for all the workers to exit') - @workers.size.times { schedule { throw :exit } } - @stopped = true - - @stop_mutex.synchronize do - @stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0 - end - - # Forcibly shutdown any threads that are still alive. - if @workers.size > 0 - logger.info("forcibly terminating #{@workers.size} worker(s)") - @workers.each do |t| - next unless t.alive? - begin - t.exit - rescue StandardError => e - logger.warn('error while terminating a worker') - logger.warn(e) - end - end - end - logger.info('stopped, all workers are shutdown') - end - end - protected def rpc_descs @@ -416,11 +450,9 @@ module GRPC @rpc_handlers ||= {} end - private - def assert_valid_service_class(cls) unless cls.include?(GenericService) - fail "#{cls} should 'include GenericService'" + fail "#{cls} must 'include GenericService'" end if cls.rpc_descs.size == 0 fail "#{cls} should specify some rpc descriptions" @@ -430,21 +462,17 @@ module GRPC def add_rpc_descs_for(service) cls = service.is_a?(Class) ? service : service.class - specs = rpc_descs - handlers = rpc_handlers + specs, handlers = rpc_descs, rpc_handlers cls.rpc_descs.each_pair do |name, spec| route = "/#{cls.service_name}/#{name}".to_sym - if specs.key? route - fail "Cannot add rpc #{route} from #{spec}, already registered" + fail "already registered: rpc #{route} from #{spec}" if specs.key? route + specs[route] = spec + if service.is_a?(Class) + handlers[route] = cls.new.method(name.to_s.underscore.to_sym) else - specs[route] = spec - if service.is_a?(Class) - handlers[route] = cls.new.method(name.to_s.underscore.to_sym) - else - handlers[route] = service.method(name.to_s.underscore.to_sym) - end - logger.info("handling #{route} with #{handlers[route]}") + handlers[route] = service.method(name.to_s.underscore.to_sym) end + logger.info("handling #{route} with #{handlers[route]}") end end end diff --git a/src/ruby/spec/generic/rpc_desc_spec.rb b/src/ruby/spec/generic/rpc_desc_spec.rb index a68299465ce35e46433c6cb2cd75d552c61a41c7..083632a080fbe3f4b7a6e1688f1b412500c93c28 100644 --- a/src/ruby/spec/generic/rpc_desc_spec.rb +++ b/src/ruby/spec/generic/rpc_desc_spec.rb @@ -52,41 +52,49 @@ describe GRPC::RpcDesc do @ok_response = Object.new end + shared_examples 'it handles errors' do + it 'sends the specified status if BadStatus is raised' do + expect(@call).to receive(:remote_read).once.and_return(Object.new) + expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK', false, + {}) + this_desc.run_server_method(@call, method(:bad_status)) + end + + it 'sends status UNKNOWN if other StandardErrors are raised' do + expect(@call).to receive(:remote_read).once.and_return(Object.new) + expect(@call).to receive(:send_status) .once.with(UNKNOWN, @no_reason, + false, {}) + this_desc.run_server_method(@call, method(:other_error)) + end + + it 'absorbs CallError with no further action' do + expect(@call).to receive(:remote_read).once.and_raise(CallError) + blk = proc do + this_desc.run_server_method(@call, method(:fake_reqresp)) + end + expect(&blk).to_not raise_error + end + end + describe '#run_server_method' do + let(:fake_md) { { k1: 'v1', k2: 'v2' } } describe 'for request responses' do + let(:this_desc) { @request_response } before(:each) do @call = double('active_call') allow(@call).to receive(:single_req_view).and_return(@call) - allow(@call).to receive(:gc) - end - - it 'sends the specified status if BadStatus is raised' do - expect(@call).to receive(:remote_read).once.and_return(Object.new) - expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK', false) - @request_response.run_server_method(@call, method(:bad_status)) - end - - it 'sends status UNKNOWN if other StandardErrors are raised' do - expect(@call).to receive(:remote_read).once.and_return(Object.new) - expect(@call).to receive(:send_status) .once.with(UNKNOWN, @no_reason, - false) - @request_response.run_server_method(@call, method(:other_error)) end - it 'absorbs CallError with no further action' do - expect(@call).to receive(:remote_read).once.and_raise(CallError) - blk = proc do - @request_response.run_server_method(@call, method(:fake_reqresp)) - end - expect(&blk).to_not raise_error - end + it_behaves_like 'it handles errors' it 'sends a response and closes the stream if there no errors' do req = Object.new expect(@call).to receive(:remote_read).once.and_return(req) expect(@call).to receive(:remote_send).once.with(@ok_response) - expect(@call).to receive(:send_status).once.with(OK, 'OK', true) - @request_response.run_server_method(@call, method(:fake_reqresp)) + expect(@call).to receive(:output_metadata).and_return(fake_md) + expect(@call).to receive(:send_status).once.with(OK, 'OK', true, + **fake_md) + this_desc.run_server_method(@call, method(:fake_reqresp)) end end @@ -94,17 +102,17 @@ describe GRPC::RpcDesc do before(:each) do @call = double('active_call') allow(@call).to receive(:multi_req_view).and_return(@call) - allow(@call).to receive(:gc) end it 'sends the specified status if BadStatus is raised' do - expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK', false) + expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK', false, + {}) @client_streamer.run_server_method(@call, method(:bad_status_alt)) end it 'sends status UNKNOWN if other StandardErrors are raised' do expect(@call).to receive(:send_status) .once.with(UNKNOWN, @no_reason, - false) + false, {}) @client_streamer.run_server_method(@call, method(:other_error_alt)) end @@ -118,44 +126,29 @@ describe GRPC::RpcDesc do it 'sends a response and closes the stream if there no errors' do expect(@call).to receive(:remote_send).once.with(@ok_response) - expect(@call).to receive(:send_status).once.with(OK, 'OK', true) + expect(@call).to receive(:output_metadata).and_return(fake_md) + expect(@call).to receive(:send_status).once.with(OK, 'OK', true, + **fake_md) @client_streamer.run_server_method(@call, method(:fake_clstream)) end end describe 'for server streaming' do + let(:this_desc) { @request_response } before(:each) do @call = double('active_call') allow(@call).to receive(:single_req_view).and_return(@call) - allow(@call).to receive(:gc) end - it 'sends the specified status if BadStatus is raised' do - expect(@call).to receive(:remote_read).once.and_return(Object.new) - expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK', false) - @server_streamer.run_server_method(@call, method(:bad_status)) - end - - it 'sends status UNKNOWN if other StandardErrors are raised' do - expect(@call).to receive(:remote_read).once.and_return(Object.new) - expect(@call).to receive(:send_status) .once.with(UNKNOWN, @no_reason, - false) - @server_streamer.run_server_method(@call, method(:other_error)) - end - - it 'absorbs CallError with no further action' do - expect(@call).to receive(:remote_read).once.and_raise(CallError) - blk = proc do - @server_streamer.run_server_method(@call, method(:fake_svstream)) - end - expect(&blk).to_not raise_error - end + it_behaves_like 'it handles errors' it 'sends a response and closes the stream if there no errors' do req = Object.new expect(@call).to receive(:remote_read).once.and_return(req) expect(@call).to receive(:remote_send).twice.with(@ok_response) - expect(@call).to receive(:send_status).once.with(OK, 'OK', true) + expect(@call).to receive(:output_metadata).and_return(fake_md) + expect(@call).to receive(:send_status).once.with(OK, 'OK', true, + **fake_md) @server_streamer.run_server_method(@call, method(:fake_svstream)) end end @@ -166,26 +159,28 @@ describe GRPC::RpcDesc do enq_th, rwl_th = double('enqueue_th'), ('read_write_loop_th') allow(enq_th).to receive(:join) allow(rwl_th).to receive(:join) - allow(@call).to receive(:gc) end it 'sends the specified status if BadStatus is raised' do e = GRPC::BadStatus.new(@bs_code, 'NOK') expect(@call).to receive(:run_server_bidi).and_raise(e) - expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK', false) + expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK', false, + {}) @bidi_streamer.run_server_method(@call, method(:bad_status_alt)) end it 'sends status UNKNOWN if other StandardErrors are raised' do expect(@call).to receive(:run_server_bidi).and_raise(StandardError) expect(@call).to receive(:send_status).once.with(UNKNOWN, @no_reason, - false) + false, {}) @bidi_streamer.run_server_method(@call, method(:other_error_alt)) end it 'closes the stream if there no errors' do expect(@call).to receive(:run_server_bidi) - expect(@call).to receive(:send_status).once.with(OK, 'OK', true) + expect(@call).to receive(:output_metadata).and_return(fake_md) + expect(@call).to receive(:send_status).once.with(OK, 'OK', true, + **fake_md) @bidi_streamer.run_server_method(@call, method(:fake_bidistream)) end end diff --git a/src/ruby/spec/generic/rpc_server_pool_spec.rb b/src/ruby/spec/generic/rpc_server_pool_spec.rb index 3bbc71b822fadf6cf070214d68b782644be4c0f6..aae3a7d7cbfaae3de5e05a62ec3adda62e3e9199 100644 --- a/src/ruby/spec/generic/rpc_server_pool_spec.rb +++ b/src/ruby/spec/generic/rpc_server_pool_spec.rb @@ -29,9 +29,9 @@ require 'grpc' -Pool = GRPC::RpcServer::Pool +describe GRPC::Pool do + Pool = GRPC::Pool -describe Pool do describe '#new' do it 'raises if a non-positive size is used' do expect { Pool.new(0) }.to raise_error diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb index e091c589f51463077471c3b080de434f72e1161e..2cd21a15e34dfe3fc2bf3b35416f5456abe0ed78 100644 --- a/src/ruby/spec/generic/rpc_server_spec.rb +++ b/src/ruby/spec/generic/rpc_server_spec.rb @@ -57,18 +57,20 @@ class NoRpcImplementation rpc :an_rpc, EchoMsg, EchoMsg end -# A test service with an implementation. +# A test service with an echo implementation. class EchoService include GRPC::GenericService rpc :an_rpc, EchoMsg, EchoMsg attr_reader :received_md - def initialize(_default_var = 'ignored') + def initialize(**kw) + @trailing_metadata = kw @received_md = [] end def an_rpc(req, call) logger.info('echo service received a request') + call.output_metadata.update(@trailing_metadata) @received_md << call.metadata unless call.metadata.nil? req end @@ -76,6 +78,25 @@ end EchoStub = EchoService.rpc_stub_class +# A test service with an implementation that fails with BadStatus +class FailingService + include GRPC::GenericService + rpc :an_rpc, EchoMsg, EchoMsg + attr_reader :details, :code, :md + + def initialize(_default_var = 'ignored') + @details = 'app error' + @code = 101 + @md = { failed_method: 'an_rpc' } + end + + def an_rpc(_req, _call) + fail GRPC::BadStatus.new(@code, @details, **@md) + end +end + +FailingStub = FailingService.rpc_stub_class + # A slow test service. class SlowService include GRPC::GenericService @@ -300,21 +321,20 @@ describe GRPC::RpcServer do end describe '#run' do - before(:each) do - @client_opts = { - channel_override: @ch - } - @marshal = EchoService.rpc_descs[:an_rpc].marshal_proc - @unmarshal = EchoService.rpc_descs[:an_rpc].unmarshal_proc(:output) - server_opts = { - server_override: @server, - completion_queue_override: @server_queue, - poll_period: 1 - } - @srv = RpcServer.new(**server_opts) - end + let(:client_opts) { { channel_override: @ch } } + let(:marshal) { EchoService.rpc_descs[:an_rpc].marshal_proc } + let(:unmarshal) { EchoService.rpc_descs[:an_rpc].unmarshal_proc(:output) } + + context 'with no connect_metadata' do + before(:each) do + server_opts = { + server_override: @server, + completion_queue_override: @server_queue, + poll_period: 1 + } + @srv = RpcServer.new(**server_opts) + end - describe 'when running' do it 'should return NOT_FOUND status on unknown methods', server: true do @srv.handle(EchoService) t = Thread.new { @srv.run } @@ -322,8 +342,8 @@ describe GRPC::RpcServer do req = EchoMsg.new blk = proc do cq = GRPC::Core::CompletionQueue.new - stub = GRPC::ClientStub.new(@host, cq, **@client_opts) - stub.request_response('/unknown', req, @marshal, @unmarshal) + stub = GRPC::ClientStub.new(@host, cq, **client_opts) + stub.request_response('/unknown', req, marshal, unmarshal) end expect(&blk).to raise_error GRPC::BadStatus @srv.stop @@ -336,7 +356,7 @@ describe GRPC::RpcServer do @srv.wait_till_running req = EchoMsg.new n = 5 # arbitrary - stub = EchoStub.new(@host, **@client_opts) + stub = EchoStub.new(@host, **client_opts) n.times { expect(stub.an_rpc(req)).to be_a(EchoMsg) } @srv.stop t.join @@ -348,7 +368,7 @@ describe GRPC::RpcServer do t = Thread.new { @srv.run } @srv.wait_till_running req = EchoMsg.new - stub = EchoStub.new(@host, **@client_opts) + stub = EchoStub.new(@host, **client_opts) expect(stub.an_rpc(req, k1: 'v1', k2: 'v2')).to be_a(EchoMsg) wanted_md = [{ 'k1' => 'v1', 'k2' => 'v2' }] expect(service.received_md).to eq(wanted_md) @@ -362,7 +382,7 @@ describe GRPC::RpcServer do t = Thread.new { @srv.run } @srv.wait_till_running req = EchoMsg.new - stub = SlowStub.new(@host, **@client_opts) + stub = SlowStub.new(@host, **client_opts) deadline = service.delay + 1.0 # wait for long enough expect(stub.an_rpc(req, deadline, k1: 'v1', k2: 'v2')).to be_a(EchoMsg) wanted_md = [{ 'k1' => 'v1', 'k2' => 'v2' }] @@ -377,7 +397,7 @@ describe GRPC::RpcServer do t = Thread.new { @srv.run } @srv.wait_till_running req = EchoMsg.new - stub = SlowStub.new(@host, **@client_opts) + stub = SlowStub.new(@host, **client_opts) deadline = 0.1 # too short for SlowService to respond blk = proc { stub.an_rpc(req, deadline, k1: 'v1', k2: 'v2') } expect(&blk).to raise_error GRPC::BadStatus @@ -393,7 +413,7 @@ describe GRPC::RpcServer do t = Thread.new { @srv.run } @srv.wait_till_running req = EchoMsg.new - stub = SlowStub.new(@host, **@client_opts) + stub = SlowStub.new(@host, **client_opts) op = stub.an_rpc(req, k1: 'v1', k2: 'v2', return_op: true) Thread.new do # cancel the call sleep 0.1 @@ -410,11 +430,11 @@ describe GRPC::RpcServer do t = Thread.new { @srv.run } @srv.wait_till_running req = EchoMsg.new - @client_opts[:update_metadata] = proc do |md| + client_opts[:update_metadata] = proc do |md| md[:k1] = 'updated-v1' md end - stub = EchoStub.new(@host, **@client_opts) + stub = EchoStub.new(@host, **client_opts) expect(stub.an_rpc(req, k1: 'v1', k2: 'v2')).to be_a(EchoMsg) wanted_md = [{ 'k1' => 'updated-v1', 'k2' => 'v2', 'jwt_aud_uri' => "https://#{@host}/EchoService" }] @@ -432,7 +452,7 @@ describe GRPC::RpcServer do threads = [] n.times do threads << Thread.new do - stub = EchoStub.new(@host, **@client_opts) + stub = EchoStub.new(@host, **client_opts) q << stub.an_rpc(req) end end @@ -460,7 +480,7 @@ describe GRPC::RpcServer do one_failed_as_unavailable = false n.times do threads << Thread.new do - stub = SlowStub.new(@host, **@client_opts) + stub = SlowStub.new(@host, **client_opts) begin stub.an_rpc(req) rescue GRPC::BadStatus => e @@ -473,5 +493,97 @@ describe GRPC::RpcServer do expect(one_failed_as_unavailable).to be(true) end end + + context 'with connect metadata' do + let(:test_md_proc) do + proc do |mth, md| + res = md.clone + res['method'] = mth + res['connect_k1'] = 'connect_v1' + res + end + end + before(:each) do + server_opts = { + server_override: @server, + completion_queue_override: @server_queue, + poll_period: 1, + connect_md_proc: test_md_proc + } + @srv = RpcServer.new(**server_opts) + end + + it 'should send connect metadata to the client', server: true do + service = EchoService.new + @srv.handle(service) + t = Thread.new { @srv.run } + @srv.wait_till_running + req = EchoMsg.new + stub = EchoStub.new(@host, **client_opts) + op = stub.an_rpc(req, k1: 'v1', k2: 'v2', return_op: true) + expect(op.metadata).to be nil + expect(op.execute).to be_a(EchoMsg) + wanted_md = { + 'k1' => 'v1', + 'k2' => 'v2', + 'method' => '/EchoService/an_rpc', + 'connect_k1' => 'connect_v1' + } + expect(op.metadata).to eq(wanted_md) + @srv.stop + t.join + end + end + + context 'with trailing metadata' do + before(:each) do + server_opts = { + server_override: @server, + completion_queue_override: @server_queue, + poll_period: 1 + } + @srv = RpcServer.new(**server_opts) + end + + it 'should be added to BadStatus when requests fail', server: true do + service = FailingService.new + @srv.handle(service) + t = Thread.new { @srv.run } + @srv.wait_till_running + req = EchoMsg.new + stub = FailingStub.new(@host, **client_opts) + blk = proc { stub.an_rpc(req) } + + # confirm it raise the expected error + expect(&blk).to raise_error GRPC::BadStatus + + # call again and confirm exception contained the trailing metadata. + begin + blk.call + rescue GRPC::BadStatus => e + expect(e.code).to eq(service.code) + expect(e.details).to eq(service.details) + expect(e.metadata).to eq(service.md) + end + @srv.stop + t.join + end + + it 'should be received by the client', server: true do + wanted_trailers = { 'k1' => 'out_v1', 'k2' => 'out_v2' } + service = EchoService.new(k1: 'out_v1', k2: 'out_v2') + @srv.handle(service) + t = Thread.new { @srv.run } + @srv.wait_till_running + req = EchoMsg.new + stub = EchoStub.new(@host, **client_opts) + op = stub.an_rpc(req, k1: 'v1', k2: 'v2', return_op: true) + expect(op.metadata).to be nil + expect(op.execute).to be_a(EchoMsg) + expect(op.metadata).to eq(wanted_trailers) + @srv.stop + t.join + end + end end end