Skip to content
Snippets Groups Projects
Commit 3fd2be2e authored by Tim Emiola's avatar Tim Emiola
Browse files

Adds a hook for returning the client connect metadata

parent f9e77b39
No related branches found
No related tags found
No related merge requests found
...@@ -38,6 +38,23 @@ $grpc_signals = [] ...@@ -38,6 +38,23 @@ $grpc_signals = []
# GRPC contains the General RPC module. # GRPC contains the General RPC module.
module GRPC module GRPC
# Handles the signals in $grpc_signals.
#
# @return false if the server should exit, true if not.
def handle_signals
loop do
sig = $grpc_signals.shift
case sig
when 'INT'
return false
when 'TERM'
return false
end
end
true
end
module_function :handle_signals
# Pool is a simple thread pool. # Pool is a simple thread pool.
class Pool class Pool
# Default keep alive period is 1s # Default keep alive period is 1s
...@@ -185,6 +202,14 @@ module GRPC ...@@ -185,6 +202,14 @@ module GRPC
alt_srv alt_srv
end end
# setup_connect_md_proc is used by #initialize to validate the
# connect_md_proc.
def self.setup_connect_md_proc(a_proc)
return nil if a_proc.nil?
fail(TypeError, '!Proc') unless a_proc.is_a? Proc
a_proc
end
# Creates a new RpcServer. # Creates a new RpcServer.
# #
# The RPC server is configured using keyword arguments. # The RPC server is configured using keyword arguments.
...@@ -212,14 +237,21 @@ module GRPC ...@@ -212,14 +237,21 @@ module GRPC
# * max_waiting_requests: the maximum number of requests that are not # * max_waiting_requests: the maximum number of requests that are not
# being handled to allow. When this limit is exceeded, the server responds # being handled to allow. When this limit is exceeded, the server responds
# with not available to new requests # with not available to new requests
#
# * connect_md_proc:
# when non-nil is a proc for determining metadata to to send back the client
# on receiving an invocation req. The proc signature is:
# {key: val, ..} func(method_name, {key: val, ...})
def initialize(pool_size:DEFAULT_POOL_SIZE, def initialize(pool_size:DEFAULT_POOL_SIZE,
max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS, max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS,
poll_period:DEFAULT_POLL_PERIOD, poll_period:DEFAULT_POLL_PERIOD,
completion_queue_override:nil, completion_queue_override:nil,
server_override:nil, server_override:nil,
connect_md_proc:nil,
**kw) **kw)
@cq = RpcServer.setup_cq(completion_queue_override) @cq = RpcServer.setup_cq(completion_queue_override)
@server = RpcServer.setup_srv(server_override, @cq, **kw) @server = RpcServer.setup_srv(server_override, @cq, **kw)
@connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc)
@pool_size = pool_size @pool_size = pool_size
@max_waiting_requests = max_waiting_requests @max_waiting_requests = max_waiting_requests
@poll_period = poll_period @poll_period = poll_period
...@@ -279,22 +311,6 @@ module GRPC ...@@ -279,22 +311,6 @@ module GRPC
t.join t.join
end end
# Handles the signals in $grpc_signals.
#
# @return false if the server should exit, true if not.
def handle_signals
loop do
sig = $grpc_signals.shift
case sig
when 'INT'
return false
when 'TERM'
return false
end
end
true
end
# Determines if the server is currently stopped # Determines if the server is currently stopped
def stopped? def stopped?
@stopped ||= false @stopped ||= false
...@@ -403,16 +419,17 @@ module GRPC ...@@ -403,16 +419,17 @@ module GRPC
end end
def new_active_server_call(an_rpc) def new_active_server_call(an_rpc)
# Accept the call. This is necessary even if a status is to be sent
# back immediately
return nil if an_rpc.nil? || an_rpc.call.nil? return nil if an_rpc.nil? || an_rpc.call.nil?
# allow the metadata to be accessed from the call # allow the metadata to be accessed from the call
handle_call_tag = Object.new handle_call_tag = Object.new
an_rpc.call.metadata = an_rpc.metadata an_rpc.call.metadata = an_rpc.metadata # attaches md to call for handlers
# TODO: add a hook to send md connect_md = nil
unless @connect_md_proc.nil?
connect_md = @connect_md_proc.call(an_rpc.method, an_rpc.metadata)
end
an_rpc.call.run_batch(@cq, handle_call_tag, INFINITE_FUTURE, an_rpc.call.run_batch(@cq, handle_call_tag, INFINITE_FUTURE,
SEND_INITIAL_METADATA => nil) SEND_INITIAL_METADATA => connect_md)
return nil unless available?(an_rpc) return nil unless available?(an_rpc)
return nil unless found?(an_rpc) return nil unless found?(an_rpc)
......
...@@ -301,21 +301,20 @@ describe GRPC::RpcServer do ...@@ -301,21 +301,20 @@ describe GRPC::RpcServer do
end end
describe '#run' do describe '#run' do
before(:each) do let(:client_opts) { { channel_override: @ch } }
@client_opts = { let(:marshal) { EchoService.rpc_descs[:an_rpc].marshal_proc }
channel_override: @ch let(:unmarshal) { EchoService.rpc_descs[:an_rpc].unmarshal_proc(:output) }
}
@marshal = EchoService.rpc_descs[:an_rpc].marshal_proc context 'with no connect_metadata' do
@unmarshal = EchoService.rpc_descs[:an_rpc].unmarshal_proc(:output) before(:each) do
server_opts = { server_opts = {
server_override: @server, server_override: @server,
completion_queue_override: @server_queue, completion_queue_override: @server_queue,
poll_period: 1 poll_period: 1
} }
@srv = RpcServer.new(**server_opts) @srv = RpcServer.new(**server_opts)
end end
describe 'when running' do
it 'should return NOT_FOUND status on unknown methods', server: true do it 'should return NOT_FOUND status on unknown methods', server: true do
@srv.handle(EchoService) @srv.handle(EchoService)
t = Thread.new { @srv.run } t = Thread.new { @srv.run }
...@@ -323,8 +322,8 @@ describe GRPC::RpcServer do ...@@ -323,8 +322,8 @@ describe GRPC::RpcServer do
req = EchoMsg.new req = EchoMsg.new
blk = proc do blk = proc do
cq = GRPC::Core::CompletionQueue.new cq = GRPC::Core::CompletionQueue.new
stub = GRPC::ClientStub.new(@host, cq, **@client_opts) stub = GRPC::ClientStub.new(@host, cq, **client_opts)
stub.request_response('/unknown', req, @marshal, @unmarshal) stub.request_response('/unknown', req, marshal, unmarshal)
end end
expect(&blk).to raise_error GRPC::BadStatus expect(&blk).to raise_error GRPC::BadStatus
@srv.stop @srv.stop
...@@ -337,7 +336,7 @@ describe GRPC::RpcServer do ...@@ -337,7 +336,7 @@ describe GRPC::RpcServer do
@srv.wait_till_running @srv.wait_till_running
req = EchoMsg.new req = EchoMsg.new
n = 5 # arbitrary n = 5 # arbitrary
stub = EchoStub.new(@host, **@client_opts) stub = EchoStub.new(@host, **client_opts)
n.times { expect(stub.an_rpc(req)).to be_a(EchoMsg) } n.times { expect(stub.an_rpc(req)).to be_a(EchoMsg) }
@srv.stop @srv.stop
t.join t.join
...@@ -349,7 +348,7 @@ describe GRPC::RpcServer do ...@@ -349,7 +348,7 @@ describe GRPC::RpcServer do
t = Thread.new { @srv.run } t = Thread.new { @srv.run }
@srv.wait_till_running @srv.wait_till_running
req = EchoMsg.new req = EchoMsg.new
stub = EchoStub.new(@host, **@client_opts) stub = EchoStub.new(@host, **client_opts)
expect(stub.an_rpc(req, k1: 'v1', k2: 'v2')).to be_a(EchoMsg) expect(stub.an_rpc(req, k1: 'v1', k2: 'v2')).to be_a(EchoMsg)
wanted_md = [{ 'k1' => 'v1', 'k2' => 'v2' }] wanted_md = [{ 'k1' => 'v1', 'k2' => 'v2' }]
expect(service.received_md).to eq(wanted_md) expect(service.received_md).to eq(wanted_md)
...@@ -363,7 +362,7 @@ describe GRPC::RpcServer do ...@@ -363,7 +362,7 @@ describe GRPC::RpcServer do
t = Thread.new { @srv.run } t = Thread.new { @srv.run }
@srv.wait_till_running @srv.wait_till_running
req = EchoMsg.new req = EchoMsg.new
stub = SlowStub.new(@host, **@client_opts) stub = SlowStub.new(@host, **client_opts)
deadline = service.delay + 1.0 # wait for long enough deadline = service.delay + 1.0 # wait for long enough
expect(stub.an_rpc(req, deadline, k1: 'v1', k2: 'v2')).to be_a(EchoMsg) expect(stub.an_rpc(req, deadline, k1: 'v1', k2: 'v2')).to be_a(EchoMsg)
wanted_md = [{ 'k1' => 'v1', 'k2' => 'v2' }] wanted_md = [{ 'k1' => 'v1', 'k2' => 'v2' }]
...@@ -378,7 +377,7 @@ describe GRPC::RpcServer do ...@@ -378,7 +377,7 @@ describe GRPC::RpcServer do
t = Thread.new { @srv.run } t = Thread.new { @srv.run }
@srv.wait_till_running @srv.wait_till_running
req = EchoMsg.new req = EchoMsg.new
stub = SlowStub.new(@host, **@client_opts) stub = SlowStub.new(@host, **client_opts)
deadline = 0.1 # too short for SlowService to respond deadline = 0.1 # too short for SlowService to respond
blk = proc { stub.an_rpc(req, deadline, k1: 'v1', k2: 'v2') } blk = proc { stub.an_rpc(req, deadline, k1: 'v1', k2: 'v2') }
expect(&blk).to raise_error GRPC::BadStatus expect(&blk).to raise_error GRPC::BadStatus
...@@ -394,7 +393,7 @@ describe GRPC::RpcServer do ...@@ -394,7 +393,7 @@ describe GRPC::RpcServer do
t = Thread.new { @srv.run } t = Thread.new { @srv.run }
@srv.wait_till_running @srv.wait_till_running
req = EchoMsg.new req = EchoMsg.new
stub = SlowStub.new(@host, **@client_opts) stub = SlowStub.new(@host, **client_opts)
op = stub.an_rpc(req, k1: 'v1', k2: 'v2', return_op: true) op = stub.an_rpc(req, k1: 'v1', k2: 'v2', return_op: true)
Thread.new do # cancel the call Thread.new do # cancel the call
sleep 0.1 sleep 0.1
...@@ -411,11 +410,11 @@ describe GRPC::RpcServer do ...@@ -411,11 +410,11 @@ describe GRPC::RpcServer do
t = Thread.new { @srv.run } t = Thread.new { @srv.run }
@srv.wait_till_running @srv.wait_till_running
req = EchoMsg.new req = EchoMsg.new
@client_opts[:update_metadata] = proc do |md| client_opts[:update_metadata] = proc do |md|
md[:k1] = 'updated-v1' md[:k1] = 'updated-v1'
md md
end end
stub = EchoStub.new(@host, **@client_opts) stub = EchoStub.new(@host, **client_opts)
expect(stub.an_rpc(req, k1: 'v1', k2: 'v2')).to be_a(EchoMsg) expect(stub.an_rpc(req, k1: 'v1', k2: 'v2')).to be_a(EchoMsg)
wanted_md = [{ 'k1' => 'updated-v1', 'k2' => 'v2', wanted_md = [{ 'k1' => 'updated-v1', 'k2' => 'v2',
'jwt_aud_uri' => "https://#{@host}/EchoService" }] 'jwt_aud_uri' => "https://#{@host}/EchoService" }]
...@@ -433,7 +432,7 @@ describe GRPC::RpcServer do ...@@ -433,7 +432,7 @@ describe GRPC::RpcServer do
threads = [] threads = []
n.times do n.times do
threads << Thread.new do threads << Thread.new do
stub = EchoStub.new(@host, **@client_opts) stub = EchoStub.new(@host, **client_opts)
q << stub.an_rpc(req) q << stub.an_rpc(req)
end end
end end
...@@ -461,7 +460,7 @@ describe GRPC::RpcServer do ...@@ -461,7 +460,7 @@ describe GRPC::RpcServer do
one_failed_as_unavailable = false one_failed_as_unavailable = false
n.times do n.times do
threads << Thread.new do threads << Thread.new do
stub = SlowStub.new(@host, **@client_opts) stub = SlowStub.new(@host, **client_opts)
begin begin
stub.an_rpc(req) stub.an_rpc(req)
rescue GRPC::BadStatus => e rescue GRPC::BadStatus => e
...@@ -474,5 +473,46 @@ describe GRPC::RpcServer do ...@@ -474,5 +473,46 @@ describe GRPC::RpcServer do
expect(one_failed_as_unavailable).to be(true) expect(one_failed_as_unavailable).to be(true)
end end
end end
context 'with connect metadata' do
let(:test_md_proc) do
proc do |mth, md|
res = md.clone
res['method'] = mth
res['connect_k1'] = 'connect_v1'
res
end
end
before(:each) do
server_opts = {
server_override: @server,
completion_queue_override: @server_queue,
poll_period: 1,
connect_md_proc: test_md_proc
}
@srv = RpcServer.new(**server_opts)
end
it 'should send connect metadata to the client', server: true do
service = EchoService.new
@srv.handle(service)
t = Thread.new { @srv.run }
@srv.wait_till_running
req = EchoMsg.new
stub = EchoStub.new(@host, **client_opts)
op = stub.an_rpc(req, k1: 'v1', k2: 'v2', return_op: true)
expect(op.metadata).to be nil
expect(op.execute).to be_a(EchoMsg)
wanted_md = {
'k1' => 'v1',
'k2' => 'v2',
'method' => '/EchoService/an_rpc',
'connect_k1' => 'connect_v1'
}
expect(op.metadata).to eq(wanted_md)
@srv.stop
t.join
end
end
end end
end end
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment