Skip to content
Snippets Groups Projects
Commit 4aba3566 authored by Tim Emiola's avatar Tim Emiola
Browse files

Various tweaks to improve server stability

parent b79f33c8
No related branches found
No related tags found
No related merge requests found
# This configuration was generated by `rubocop --auto-gen-config`
# on 2015-04-17 14:43:27 -0700 using RuboCop version 0.30.0.
# on 2015-05-22 13:23:34 -0700 using RuboCop version 0.30.1.
# The point is for the user to remove these configuration records
# one by one as the offenses are removed from the code base.
# Note that changes in the inspected code, or installation of new
......@@ -7,12 +7,12 @@
# Offense count: 30
Metrics/AbcSize:
Max: 40
Max: 38
# Offense count: 3
# Configuration parameters: CountComments.
Metrics/ClassLength:
Max: 184
Max: 192
# Offense count: 35
# Configuration parameters: CountComments.
......
......@@ -76,7 +76,7 @@ module GRPC
@jobs = Queue.new
@size = size
@stopped = false
@stop_mutex = Mutex.new
@stop_mutex = Mutex.new # needs to be held when accessing @stopped
@stop_cond = ConditionVariable.new
@workers = []
@keep_alive = keep_alive
......@@ -92,10 +92,15 @@ module GRPC
# @param args the args passed blk when it is called
# @param blk the block to call
def schedule(*args, &blk)
fail 'already stopped' if @stopped
return if blk.nil?
GRPC.logger.info('schedule another job')
@jobs << [blk, args]
@stop_mutex.synchronize do
if @stopped
GRPC.logger.warn('did not schedule job, already stopped')
return
end
GRPC.logger.info('schedule another job')
@jobs << [blk, args]
end
end
# Starts running the jobs in the thread pool.
......@@ -116,8 +121,8 @@ module GRPC
def stop
GRPC.logger.info('stopping, will wait for all the workers to exit')
@workers.size.times { schedule { throw :exit } }
@stopped = true
@stop_mutex.synchronize do # wait @keep_alive for works to stop
@stopped = true
@stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0
end
forcibly_stop_workers
......@@ -249,15 +254,18 @@ module GRPC
server_override:nil,
connect_md_proc:nil,
**kw)
@cq = RpcServer.setup_cq(completion_queue_override)
@server = RpcServer.setup_srv(server_override, @cq, **kw)
@connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc)
@pool_size = pool_size
@cq = RpcServer.setup_cq(completion_queue_override)
@max_waiting_requests = max_waiting_requests
@poll_period = poll_period
@run_mutex = Mutex.new
@run_cond = ConditionVariable.new
@pool_size = pool_size
@pool = Pool.new(@pool_size)
@run_cond = ConditionVariable.new
@run_mutex = Mutex.new
@running = false
@server = RpcServer.setup_srv(server_override, @cq, **kw)
@stopped = false
@stop_mutex = Mutex.new
end
# stops a running server
......@@ -266,20 +274,23 @@ module GRPC
# server's current call loop is it's last.
def stop
return unless @running
@stopped = true
@stop_mutex.synchronize do
@stopped = true
end
@pool.stop
@server.close
end
# TODO: uncomment this:
#
# This segfaults in the c layer, so its commented out for now. Shutdown
# still occurs, but the c layer has to do the cleanup.
#
# @server.close
# determines if the server has been stopped
def stopped?
@stop_mutex.synchronize do
return @stopped
end
end
# determines if the server is currently running
def running?
@running ||= false
@running
end
# Is called from other threads to wait for #run to start up the server.
......@@ -311,11 +322,6 @@ module GRPC
t.join
end
# Determines if the server is currently stopped
def stopped?
@stopped ||= false
end
# handle registration of classes
#
# service is either a class that includes GRPC::GenericService and whose
......@@ -407,7 +413,13 @@ module GRPC
request_call_tag = Object.new
until stopped?
deadline = from_relative_time(@poll_period)
an_rpc = @server.request_call(@cq, request_call_tag, deadline)
begin
an_rpc = @server.request_call(@cq, request_call_tag, deadline)
rescue Core::CallError, RuntimeError => e
# can happen during server shutdown
GRPC.logger.warn("server call failed: #{e}")
next
end
c = new_active_server_call(an_rpc)
unless c.nil?
mth = an_rpc.method.to_sym
......
......@@ -74,11 +74,11 @@ describe GRPC::Pool do
end
describe '#schedule' do
it 'throws if the pool is already stopped' do
it 'return if the pool is already stopped' do
p = Pool.new(1)
p.stop
job = proc {}
expect { p.schedule(&job) }.to raise_error
expect { p.schedule(&job) }.to_not raise_error
end
it 'adds jobs that get run by the pool' do
......
......@@ -212,10 +212,14 @@ describe GRPC::RpcServer do
describe '#stopped?' do
before(:each) do
opts = { a_channel_arg: 'an_arg', poll_period: 1 }
opts = { a_channel_arg: 'an_arg', poll_period: 1.5 }
@srv = RpcServer.new(**opts)
end
after(:each) do
@srv.stop
end
it 'starts out false' do
expect(@srv.stopped?).to be(false)
end
......@@ -225,7 +229,7 @@ describe GRPC::RpcServer do
expect(@srv.stopped?).to be(false)
end
it 'stays false after the server starts running' do
it 'stays false after the server starts running', server: true do
@srv.handle(EchoService)
t = Thread.new { @srv.run }
@srv.wait_till_running
......@@ -234,7 +238,7 @@ describe GRPC::RpcServer do
t.join
end
it 'is true after a running server is stopped' do
it 'is true after a running server is stopped', server: true do
@srv.handle(EchoService)
t = Thread.new { @srv.run }
@srv.wait_till_running
......@@ -251,21 +255,22 @@ describe GRPC::RpcServer do
expect(r.running?).to be(false)
end
it 'is false after run is called with no services registered' do
it 'is false if run is called with no services registered', server: true do
opts = {
a_channel_arg: 'an_arg',
poll_period: 1,
poll_period: 2,
server_override: @server
}
r = RpcServer.new(**opts)
r.run
expect(r.running?).to be(false)
r.stop
end
it 'is true after run is called with a registered service' do
opts = {
a_channel_arg: 'an_arg',
poll_period: 1,
poll_period: 2.5,
server_override: @server
}
r = RpcServer.new(**opts)
......@@ -284,6 +289,10 @@ describe GRPC::RpcServer do
@srv = RpcServer.new(**@opts)
end
after(:each) do
@srv.stop
end
it 'raises if #run has already been called' do
@srv.handle(EchoService)
t = Thread.new { @srv.run }
......@@ -335,6 +344,10 @@ describe GRPC::RpcServer do
@srv = RpcServer.new(**server_opts)
end
after(:each) do
@srv.stop
end
it 'should return NOT_FOUND status on unknown methods', server: true do
@srv.handle(EchoService)
t = Thread.new { @srv.run }
......@@ -376,7 +389,7 @@ describe GRPC::RpcServer do
t.join
end
it 'should receive metadata when a deadline is specified', server: true do
it 'should receive metadata if a deadline is specified', server: true do
service = SlowService.new
@srv.handle(service)
t = Thread.new { @srv.run }
......@@ -445,11 +458,11 @@ describe GRPC::RpcServer do
it 'should handle multiple parallel requests', server: true do
@srv.handle(EchoService)
Thread.new { @srv.run }
t = Thread.new { @srv.run }
@srv.wait_till_running
req, q = EchoMsg.new, Queue.new
n = 5 # arbitrary
threads = []
threads = [t]
n.times do
threads << Thread.new do
stub = EchoStub.new(@host, **client_opts)
......@@ -472,7 +485,7 @@ describe GRPC::RpcServer do
}
alt_srv = RpcServer.new(**opts)
alt_srv.handle(SlowService)
Thread.new { alt_srv.run }
t = Thread.new { alt_srv.run }
alt_srv.wait_till_running
req = EchoMsg.new
n = 5 # arbitrary, use as many to ensure the server pool is exceeded
......@@ -490,6 +503,7 @@ describe GRPC::RpcServer do
end
threads.each(&:join)
alt_srv.stop
t.join
expect(one_failed_as_unavailable).to be(true)
end
end
......@@ -513,6 +527,10 @@ describe GRPC::RpcServer do
@srv = RpcServer.new(**server_opts)
end
after(:each) do
@srv.stop
end
it 'should send connect metadata to the client', server: true do
service = EchoService.new
@srv.handle(service)
......@@ -545,6 +563,10 @@ describe GRPC::RpcServer do
@srv = RpcServer.new(**server_opts)
end
after(:each) do
@srv.stop
end
it 'should be added to BadStatus when requests fail', server: true do
service = FailingService.new
@srv.handle(service)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment