Skip to content
Snippets Groups Projects
Commit fec23f2c authored by Jan Tattermusch's avatar Jan Tattermusch
Browse files

Merge pull request #1302 from tbetbetbe/grpc_ruby_rpc_server_simple_signal_handling

Grpc ruby rpc server simple signal handling
parents 720a208b bf6d78c8
No related branches found
No related tags found
No related merge requests found
# This configuration was generated by `rubocop --auto-gen-config` # This configuration was generated by `rubocop --auto-gen-config`
# on 2015-04-15 18:43:23 -0700 using RuboCop version 0.30.0. # on 2015-04-16 12:30:09 -0700 using RuboCop version 0.30.0.
# The point is for the user to remove these configuration records # The point is for the user to remove these configuration records
# one by one as the offenses are removed from the code base. # one by one as the offenses are removed from the code base.
# Note that changes in the inspected code, or installation of new # Note that changes in the inspected code, or installation of new
# versions of RuboCop, may require this file to be generated again. # versions of RuboCop, may require this file to be generated again.
# Offense count: 32 # Offense count: 34
Metrics/AbcSize: Metrics/AbcSize:
Max: 36 Max: 36
# Offense count: 3 # Offense count: 3
# Configuration parameters: CountComments. # Configuration parameters: CountComments.
Metrics/ClassLength: Metrics/ClassLength:
Max: 183 Max: 185
# Offense count: 35 # Offense count: 35
# Configuration parameters: CountComments. # Configuration parameters: CountComments.
...@@ -24,7 +24,7 @@ Metrics/MethodLength: ...@@ -24,7 +24,7 @@ Metrics/MethodLength:
Metrics/ParameterLists: Metrics/ParameterLists:
Max: 8 Max: 8
# Offense count: 6 # Offense count: 9
# Configuration parameters: AllowedVariables. # Configuration parameters: AllowedVariables.
Style/GlobalVars: Style/GlobalVars:
Enabled: false Enabled: false
......
...@@ -185,7 +185,7 @@ def main ...@@ -185,7 +185,7 @@ def main
logger.info("... running insecurely on #{host}") logger.info("... running insecurely on #{host}")
end end
s.handle(TestTarget) s.handle(TestTarget)
s.run s.run_till_terminated
end end
main main
...@@ -183,7 +183,7 @@ def main ...@@ -183,7 +183,7 @@ def main
end end
s.handle(Calculator) s.handle(Calculator)
s.run s.run_till_terminated
end end
main main
...@@ -105,7 +105,7 @@ def main ...@@ -105,7 +105,7 @@ def main
end end
s.handle(NoProto) s.handle(NoProto)
s.run s.run_till_terminated
end end
main main
...@@ -33,6 +33,9 @@ require 'grpc/generic/service' ...@@ -33,6 +33,9 @@ require 'grpc/generic/service'
require 'thread' require 'thread'
require 'xray/thread_dump_signal_handler' require 'xray/thread_dump_signal_handler'
# A global that contains signals the gRPC servers should respond to.
$grpc_signals = []
# GRPC contains the General RPC module. # GRPC contains the General RPC module.
module GRPC module GRPC
# RpcServer hosts a number of services and makes them available on the # RpcServer hosts a number of services and makes them available on the
...@@ -50,6 +53,23 @@ module GRPC ...@@ -50,6 +53,23 @@ module GRPC
# Default max_waiting_requests size is 20 # Default max_waiting_requests size is 20
DEFAULT_MAX_WAITING_REQUESTS = 20 DEFAULT_MAX_WAITING_REQUESTS = 20
# Default poll period is 1s
DEFAULT_POLL_PERIOD = 1
# Signal check period is 0.25s
SIGNAL_CHECK_PERIOD = 0.25
# Sets up a signal handler that adds signals to the signal handling global.
#
# Signal handlers should do as little as humanly possible.
# Here, they just add themselves to $grpc_signals
#
# RpcServer (and later other parts of gRPC) monitors the signals
# $grpc_signals in its own non-signal context.
def self.trap_signals
%w(INT TERM).each { |sig| trap(sig) { $grpc_signals << sig } }
end
# Creates a new RpcServer. # Creates a new RpcServer.
# #
# The RPC server is configured using keyword arguments. # The RPC server is configured using keyword arguments.
...@@ -79,7 +99,7 @@ module GRPC ...@@ -79,7 +99,7 @@ module GRPC
# with not available to new requests # with not available to new requests
def initialize(pool_size:DEFAULT_POOL_SIZE, def initialize(pool_size:DEFAULT_POOL_SIZE,
max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS, max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS,
poll_period:INFINITE_FUTURE, poll_period:DEFAULT_POLL_PERIOD,
completion_queue_override:nil, completion_queue_override:nil,
server_override:nil, server_override:nil,
**kw) **kw)
...@@ -117,6 +137,13 @@ module GRPC ...@@ -117,6 +137,13 @@ module GRPC
return unless @running return unless @running
@stopped = true @stopped = true
@pool.stop @pool.stop
# TODO: uncomment this:
#
# This segfaults in the c layer, so its commented out for now. Shutdown
# still occurs, but the c layer has to do the cleanup.
#
# @server.close
end end
# determines if the server is currently running # determines if the server is currently running
...@@ -139,7 +166,37 @@ module GRPC ...@@ -139,7 +166,37 @@ module GRPC
running? running?
end end
# determines if the server is currently stopped # Runs the server in its own thread, then waits for signal INT or TERM on
# the current thread to terminate it.
def run_till_terminated
self.class.trap_signals
t = Thread.new { run }
wait_till_running
loop do
sleep SIGNAL_CHECK_PERIOD
break unless handle_signals
end
stop
t.join
end
# Handles the signals in $grpc_signals.
#
# @return false if the server should exit, true if not.
def handle_signals
loop do
sig = $grpc_signals.shift
case sig
when 'INT'
return false
when 'TERM'
return false
end
end
true
end
# Determines if the server is currently stopped
def stopped? def stopped?
@stopped ||= false @stopped ||= false
end end
...@@ -265,7 +322,10 @@ module GRPC ...@@ -265,7 +322,10 @@ module GRPC
# Pool is a simple thread pool for running server requests. # Pool is a simple thread pool for running server requests.
class Pool class Pool
def initialize(size) # Default keep alive period is 1s
DEFAULT_KEEP_ALIVE = 1
def initialize(size, keep_alive: DEFAULT_KEEP_ALIVE)
fail 'pool size must be positive' unless size > 0 fail 'pool size must be positive' unless size > 0
@jobs = Queue.new @jobs = Queue.new
@size = size @size = size
...@@ -273,6 +333,7 @@ module GRPC ...@@ -273,6 +333,7 @@ module GRPC
@stop_mutex = Mutex.new @stop_mutex = Mutex.new
@stop_cond = ConditionVariable.new @stop_cond = ConditionVariable.new
@workers = [] @workers = []
@keep_alive = keep_alive
end end
# Returns the number of jobs waiting # Returns the number of jobs waiting
...@@ -325,15 +386,13 @@ module GRPC ...@@ -325,15 +386,13 @@ module GRPC
@workers.size.times { schedule { throw :exit } } @workers.size.times { schedule { throw :exit } }
@stopped = true @stopped = true
# TODO: allow configuration of the keepalive period
keep_alive = 5
@stop_mutex.synchronize do @stop_mutex.synchronize do
@stop_cond.wait(@stop_mutex, keep_alive) if @workers.size > 0 @stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0
end end
# Forcibly shutdown any threads that are still alive. # Forcibly shutdown any threads that are still alive.
if @workers.size > 0 if @workers.size > 0
logger.warn("forcibly terminating #{@workers.size} worker(s)") logger.info("forcibly terminating #{@workers.size} worker(s)")
@workers.each do |t| @workers.each do |t|
next unless t.alive? next unless t.alive?
begin begin
...@@ -344,7 +403,6 @@ module GRPC ...@@ -344,7 +403,6 @@ module GRPC
end end
end end
end end
logger.info('stopped, all workers are shutdown') logger.info('stopped, all workers are shutdown')
end end
end end
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment