Skip to content
Snippets Groups Projects
Commit f9e77b39 authored by Tim Emiola's avatar Tim Emiola
Browse files

Refactor: Shorter methods in RpcServer

parent e6be7f31
No related branches found
No related tags found
No related merge requests found
...@@ -167,6 +167,24 @@ module GRPC ...@@ -167,6 +167,24 @@ module GRPC
%w(INT TERM).each { |sig| trap(sig) { $grpc_signals << sig } } %w(INT TERM).each { |sig| trap(sig) { $grpc_signals << sig } }
end end
# setup_cq is used by #initialize to constuct a Core::CompletionQueue from
# its arguments.
def self.setup_cq(alt_cq)
return Core::CompletionQueue.new if alt_cq.nil?
unless alt_cq.is_a? Core::CompletionQueue
fail(TypeError, '!CompletionQueue')
end
alt_cq
end
# setup_srv is used by #initialize to constuct a Core::Server from its
# arguments.
def self.setup_srv(alt_srv, cq, **kw)
return Core::Server.new(cq, kw) if alt_srv.nil?
fail(TypeError, '!Server') unless alt_srv.is_a? Core::Server
alt_srv
end
# Creates a new RpcServer. # Creates a new RpcServer.
# #
# The RPC server is configured using keyword arguments. # The RPC server is configured using keyword arguments.
...@@ -200,24 +218,8 @@ module GRPC ...@@ -200,24 +218,8 @@ module GRPC
completion_queue_override:nil, completion_queue_override:nil,
server_override:nil, server_override:nil,
**kw) **kw)
if completion_queue_override.nil? @cq = RpcServer.setup_cq(completion_queue_override)
cq = Core::CompletionQueue.new @server = RpcServer.setup_srv(server_override, @cq, **kw)
else
cq = completion_queue_override
unless cq.is_a? Core::CompletionQueue
fail(ArgumentError, 'not a CompletionQueue')
end
end
@cq = cq
if server_override.nil?
srv = Core::Server.new(@cq, kw)
else
srv = server_override
fail(ArgumentError, 'not a Server') unless srv.is_a? Core::Server
end
@server = srv
@pool_size = pool_size @pool_size = pool_size
@max_waiting_requests = max_waiting_requests @max_waiting_requests = max_waiting_requests
@poll_period = poll_period @poll_period = poll_period
...@@ -356,19 +358,7 @@ module GRPC ...@@ -356,19 +358,7 @@ module GRPC
end end
@pool.start @pool.start
@server.start @server.start
request_call_tag = Object.new loop_handle_server_calls
until stopped?
deadline = from_relative_time(@poll_period)
an_rpc = @server.request_call(@cq, request_call_tag, deadline)
next if an_rpc.nil?
c = new_active_server_call(an_rpc)
unless c.nil?
mth = an_rpc.method.to_sym
@pool.schedule(c) do |call|
rpc_descs[mth].run_server_method(call, rpc_handlers[mth])
end
end
end
@running = false @running = false
end end
...@@ -395,6 +385,23 @@ module GRPC ...@@ -395,6 +385,23 @@ module GRPC
nil nil
end end
# handles calls to the server
def loop_handle_server_calls
fail 'not running' unless @running
request_call_tag = Object.new
until stopped?
deadline = from_relative_time(@poll_period)
an_rpc = @server.request_call(@cq, request_call_tag, deadline)
c = new_active_server_call(an_rpc)
unless c.nil?
mth = an_rpc.method.to_sym
@pool.schedule(c) do |call|
rpc_descs[mth].run_server_method(call, rpc_handlers[mth])
end
end
end
end
def new_active_server_call(an_rpc) def new_active_server_call(an_rpc)
# Accept the call. This is necessary even if a status is to be sent # Accept the call. This is necessary even if a status is to be sent
# back immediately # back immediately
...@@ -427,11 +434,9 @@ module GRPC ...@@ -427,11 +434,9 @@ module GRPC
@rpc_handlers ||= {} @rpc_handlers ||= {}
end end
private
def assert_valid_service_class(cls) def assert_valid_service_class(cls)
unless cls.include?(GenericService) unless cls.include?(GenericService)
fail "#{cls} should 'include GenericService'" fail "#{cls} must 'include GenericService'"
end end
if cls.rpc_descs.size == 0 if cls.rpc_descs.size == 0
fail "#{cls} should specify some rpc descriptions" fail "#{cls} should specify some rpc descriptions"
...@@ -441,21 +446,17 @@ module GRPC ...@@ -441,21 +446,17 @@ module GRPC
def add_rpc_descs_for(service) def add_rpc_descs_for(service)
cls = service.is_a?(Class) ? service : service.class cls = service.is_a?(Class) ? service : service.class
specs = rpc_descs specs, handlers = rpc_descs, rpc_handlers
handlers = rpc_handlers
cls.rpc_descs.each_pair do |name, spec| cls.rpc_descs.each_pair do |name, spec|
route = "/#{cls.service_name}/#{name}".to_sym route = "/#{cls.service_name}/#{name}".to_sym
if specs.key? route fail "already registered: rpc #{route} from #{spec}" if specs.key? route
fail "Cannot add rpc #{route} from #{spec}, already registered" specs[route] = spec
if service.is_a?(Class)
handlers[route] = cls.new.method(name.to_s.underscore.to_sym)
else else
specs[route] = spec handlers[route] = service.method(name.to_s.underscore.to_sym)
if service.is_a?(Class)
handlers[route] = cls.new.method(name.to_s.underscore.to_sym)
else
handlers[route] = service.method(name.to_s.underscore.to_sym)
end
logger.info("handling #{route} with #{handlers[route]}")
end end
logger.info("handling #{route} with #{handlers[route]}")
end end
end end
end end
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment