diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index 8d91c31a651ee500a82b1b05f04d2ee8979e8f22..aa6c7e098951af954bb3652ef1e1cdf2ae85d528 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -38,6 +38,23 @@ $grpc_signals = [] # GRPC contains the General RPC module. module GRPC + # Handles the signals in $grpc_signals. + # + # @return false if the server should exit, true if not. + def handle_signals + loop do + sig = $grpc_signals.shift + case sig + when 'INT' + return false + when 'TERM' + return false + end + end + true + end + module_function :handle_signals + # Pool is a simple thread pool. class Pool # Default keep alive period is 1s @@ -185,6 +202,14 @@ module GRPC alt_srv end + # setup_connect_md_proc is used by #initialize to validate the + # connect_md_proc. + def self.setup_connect_md_proc(a_proc) + return nil if a_proc.nil? + fail(TypeError, '!Proc') unless a_proc.is_a? Proc + a_proc + end + # Creates a new RpcServer. # # The RPC server is configured using keyword arguments. @@ -212,14 +237,21 @@ module GRPC # * max_waiting_requests: the maximum number of requests that are not # being handled to allow. When this limit is exceeded, the server responds # with not available to new requests + # + # * connect_md_proc: + # when non-nil is a proc for determining metadata to to send back the client + # on receiving an invocation req. The proc signature is: + # {key: val, ..} func(method_name, {key: val, ...}) def initialize(pool_size:DEFAULT_POOL_SIZE, max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS, poll_period:DEFAULT_POLL_PERIOD, completion_queue_override:nil, server_override:nil, + connect_md_proc:nil, **kw) @cq = RpcServer.setup_cq(completion_queue_override) @server = RpcServer.setup_srv(server_override, @cq, **kw) + @connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc) @pool_size = pool_size @max_waiting_requests = max_waiting_requests @poll_period = poll_period @@ -279,22 +311,6 @@ module GRPC t.join end - # Handles the signals in $grpc_signals. - # - # @return false if the server should exit, true if not. - def handle_signals - loop do - sig = $grpc_signals.shift - case sig - when 'INT' - return false - when 'TERM' - return false - end - end - true - end - # Determines if the server is currently stopped def stopped? @stopped ||= false @@ -403,16 +419,17 @@ module GRPC end def new_active_server_call(an_rpc) - # Accept the call. This is necessary even if a status is to be sent - # back immediately return nil if an_rpc.nil? || an_rpc.call.nil? # allow the metadata to be accessed from the call handle_call_tag = Object.new - an_rpc.call.metadata = an_rpc.metadata - # TODO: add a hook to send md + an_rpc.call.metadata = an_rpc.metadata # attaches md to call for handlers + connect_md = nil + unless @connect_md_proc.nil? + connect_md = @connect_md_proc.call(an_rpc.method, an_rpc.metadata) + end an_rpc.call.run_batch(@cq, handle_call_tag, INFINITE_FUTURE, - SEND_INITIAL_METADATA => nil) + SEND_INITIAL_METADATA => connect_md) return nil unless available?(an_rpc) return nil unless found?(an_rpc) diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb index 1323bacfa60e9fc40b39bdf8f77a37bdc82c0dd9..202576c673d18e793a16f07a6352905faf167f92 100644 --- a/src/ruby/spec/generic/rpc_server_spec.rb +++ b/src/ruby/spec/generic/rpc_server_spec.rb @@ -301,21 +301,20 @@ describe GRPC::RpcServer do end describe '#run' do - before(:each) do - @client_opts = { - channel_override: @ch - } - @marshal = EchoService.rpc_descs[:an_rpc].marshal_proc - @unmarshal = EchoService.rpc_descs[:an_rpc].unmarshal_proc(:output) - server_opts = { - server_override: @server, - completion_queue_override: @server_queue, - poll_period: 1 - } - @srv = RpcServer.new(**server_opts) - end + let(:client_opts) { { channel_override: @ch } } + let(:marshal) { EchoService.rpc_descs[:an_rpc].marshal_proc } + let(:unmarshal) { EchoService.rpc_descs[:an_rpc].unmarshal_proc(:output) } + + context 'with no connect_metadata' do + before(:each) do + server_opts = { + server_override: @server, + completion_queue_override: @server_queue, + poll_period: 1 + } + @srv = RpcServer.new(**server_opts) + end - describe 'when running' do it 'should return NOT_FOUND status on unknown methods', server: true do @srv.handle(EchoService) t = Thread.new { @srv.run } @@ -323,8 +322,8 @@ describe GRPC::RpcServer do req = EchoMsg.new blk = proc do cq = GRPC::Core::CompletionQueue.new - stub = GRPC::ClientStub.new(@host, cq, **@client_opts) - stub.request_response('/unknown', req, @marshal, @unmarshal) + stub = GRPC::ClientStub.new(@host, cq, **client_opts) + stub.request_response('/unknown', req, marshal, unmarshal) end expect(&blk).to raise_error GRPC::BadStatus @srv.stop @@ -337,7 +336,7 @@ describe GRPC::RpcServer do @srv.wait_till_running req = EchoMsg.new n = 5 # arbitrary - stub = EchoStub.new(@host, **@client_opts) + stub = EchoStub.new(@host, **client_opts) n.times { expect(stub.an_rpc(req)).to be_a(EchoMsg) } @srv.stop t.join @@ -349,7 +348,7 @@ describe GRPC::RpcServer do t = Thread.new { @srv.run } @srv.wait_till_running req = EchoMsg.new - stub = EchoStub.new(@host, **@client_opts) + stub = EchoStub.new(@host, **client_opts) expect(stub.an_rpc(req, k1: 'v1', k2: 'v2')).to be_a(EchoMsg) wanted_md = [{ 'k1' => 'v1', 'k2' => 'v2' }] expect(service.received_md).to eq(wanted_md) @@ -363,7 +362,7 @@ describe GRPC::RpcServer do t = Thread.new { @srv.run } @srv.wait_till_running req = EchoMsg.new - stub = SlowStub.new(@host, **@client_opts) + stub = SlowStub.new(@host, **client_opts) deadline = service.delay + 1.0 # wait for long enough expect(stub.an_rpc(req, deadline, k1: 'v1', k2: 'v2')).to be_a(EchoMsg) wanted_md = [{ 'k1' => 'v1', 'k2' => 'v2' }] @@ -378,7 +377,7 @@ describe GRPC::RpcServer do t = Thread.new { @srv.run } @srv.wait_till_running req = EchoMsg.new - stub = SlowStub.new(@host, **@client_opts) + stub = SlowStub.new(@host, **client_opts) deadline = 0.1 # too short for SlowService to respond blk = proc { stub.an_rpc(req, deadline, k1: 'v1', k2: 'v2') } expect(&blk).to raise_error GRPC::BadStatus @@ -394,7 +393,7 @@ describe GRPC::RpcServer do t = Thread.new { @srv.run } @srv.wait_till_running req = EchoMsg.new - stub = SlowStub.new(@host, **@client_opts) + stub = SlowStub.new(@host, **client_opts) op = stub.an_rpc(req, k1: 'v1', k2: 'v2', return_op: true) Thread.new do # cancel the call sleep 0.1 @@ -411,11 +410,11 @@ describe GRPC::RpcServer do t = Thread.new { @srv.run } @srv.wait_till_running req = EchoMsg.new - @client_opts[:update_metadata] = proc do |md| + client_opts[:update_metadata] = proc do |md| md[:k1] = 'updated-v1' md end - stub = EchoStub.new(@host, **@client_opts) + stub = EchoStub.new(@host, **client_opts) expect(stub.an_rpc(req, k1: 'v1', k2: 'v2')).to be_a(EchoMsg) wanted_md = [{ 'k1' => 'updated-v1', 'k2' => 'v2', 'jwt_aud_uri' => "https://#{@host}/EchoService" }] @@ -433,7 +432,7 @@ describe GRPC::RpcServer do threads = [] n.times do threads << Thread.new do - stub = EchoStub.new(@host, **@client_opts) + stub = EchoStub.new(@host, **client_opts) q << stub.an_rpc(req) end end @@ -461,7 +460,7 @@ describe GRPC::RpcServer do one_failed_as_unavailable = false n.times do threads << Thread.new do - stub = SlowStub.new(@host, **@client_opts) + stub = SlowStub.new(@host, **client_opts) begin stub.an_rpc(req) rescue GRPC::BadStatus => e @@ -474,5 +473,46 @@ describe GRPC::RpcServer do expect(one_failed_as_unavailable).to be(true) end end + + context 'with connect metadata' do + let(:test_md_proc) do + proc do |mth, md| + res = md.clone + res['method'] = mth + res['connect_k1'] = 'connect_v1' + res + end + end + before(:each) do + server_opts = { + server_override: @server, + completion_queue_override: @server_queue, + poll_period: 1, + connect_md_proc: test_md_proc + } + @srv = RpcServer.new(**server_opts) + end + + it 'should send connect metadata to the client', server: true do + service = EchoService.new + @srv.handle(service) + t = Thread.new { @srv.run } + @srv.wait_till_running + req = EchoMsg.new + stub = EchoStub.new(@host, **client_opts) + op = stub.an_rpc(req, k1: 'v1', k2: 'v2', return_op: true) + expect(op.metadata).to be nil + expect(op.execute).to be_a(EchoMsg) + wanted_md = { + 'k1' => 'v1', + 'k2' => 'v2', + 'method' => '/EchoService/an_rpc', + 'connect_k1' => 'connect_v1' + } + expect(op.metadata).to eq(wanted_md) + @srv.stop + t.join + end + end end end