Skip to content
Snippets Groups Projects
Commit 269e6ccb authored by Alexander Polcyn's avatar Alexander Polcyn
Browse files

remove wait queue from ruby thread pool to avoid deadlock

parent aa3863b4
No related branches found
No related tags found
No related merge requests found
......@@ -48,6 +48,10 @@ module GRPC
@stop_cond = ConditionVariable.new
@workers = []
@keep_alive = keep_alive
# Each worker thread has its own queue to push and pull jobs
# these queues are put into @ready_queues when that worker is idle
@ready_workers = Queue.new
end
# Returns the number of jobs waiting
......@@ -55,6 +59,13 @@ module GRPC
@jobs.size
end
def ready_for_work?
# Busy worker threads are either doing work, or have a single job
# waiting on them. Workers that are idle with no jobs waiting
# have their "queues" in @ready_workers
!@ready_workers.empty?
end
# Runs the given block on the queue with the provided args.
#
# @param args the args passed blk when it is called
......@@ -67,7 +78,11 @@ module GRPC
return
end
GRPC.logger.info('schedule another job')
@jobs << [blk, args]
fail 'No worker threads available' if @ready_workers.empty?
worker_queue = @ready_workers.pop
fail 'worker already has a task waiting' unless worker_queue.empty?
worker_queue << [blk, args]
end
end
......@@ -77,9 +92,11 @@ module GRPC
fail 'already stopped' if @stopped
end
until @workers.size == @size.to_i
next_thread = Thread.new do
new_worker_queue = Queue.new
@ready_workers << new_worker_queue
next_thread = Thread.new(new_worker_queue) do |jobs|
catch(:exit) do # allows { throw :exit } to kill a thread
loop_execute_jobs
loop_execute_jobs(jobs)
end
remove_current_thread
end
......@@ -90,7 +107,7 @@ module GRPC
# Stops the jobs in the pool
def stop
GRPC.logger.info('stopping, will wait for all the workers to exit')
@workers.size.times { schedule { throw :exit } }
schedule { throw :exit } while ready_for_work?
@stop_mutex.synchronize do # wait @keep_alive for works to stop
@stopped = true
@stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0
......@@ -125,15 +142,18 @@ module GRPC
end
end
def loop_execute_jobs
def loop_execute_jobs(worker_queue)
loop do
begin
blk, args = @jobs.pop
blk, args = worker_queue.pop
blk.call(*args)
rescue StandardError => e
GRPC.logger.warn('Error in worker thread')
GRPC.logger.warn(e)
end
# there shouldn't be any work given to this thread while its busy
fail('received a task while busy') unless worker_queue.empty?
@ready_workers << worker_queue
end
end
end
......@@ -147,10 +167,10 @@ module GRPC
def_delegators :@server, :add_http2_port
# Default thread pool size is 3
DEFAULT_POOL_SIZE = 3
# Default thread pool size is 30
DEFAULT_POOL_SIZE = 30
# Default max_waiting_requests size is 20
# Deprecated due to internal changes to the thread pool
DEFAULT_MAX_WAITING_REQUESTS = 20
# Default poll period is 1s
......@@ -175,11 +195,11 @@ module GRPC
# instance.
#
# * pool_size: the size of the thread pool the server uses to run its
# threads
# threads. No more concurrent requests can be made than the size
# of the thread pool
#
# * max_waiting_requests: the maximum number of requests that are not
# being handled to allow. When this limit is exceeded, the server responds
# with not available to new requests
# * max_waiting_requests: Deprecated due to internal changes to the thread
# pool. This is still an argument for compatibility but is ignored.
#
# * poll_period: when present, the server polls for new events with this
# period
......@@ -330,10 +350,8 @@ module GRPC
# Sends RESOURCE_EXHAUSTED if there are too many unprocessed jobs
def available?(an_rpc)
jobs_count, max = @pool.jobs_waiting, @max_waiting_requests
GRPC.logger.info("waiting: #{jobs_count}, max: #{max}")
return an_rpc if @pool.jobs_waiting <= @max_waiting_requests
GRPC.logger.warn("NOT AVAILABLE: too many jobs_waiting: #{an_rpc}")
return an_rpc if @pool.ready_for_work?
GRPC.logger.warn('no free worker threads currently')
noop = proc { |x| x }
c = ActiveCall.new(an_rpc.call, noop, noop, an_rpc.deadline,
metadata_received: true)
......
......@@ -29,6 +29,8 @@
require 'grpc'
Thread.abort_on_exception = true
describe GRPC::Pool do
Pool = GRPC::Pool
......@@ -44,32 +46,34 @@ describe GRPC::Pool do
end
end
describe '#jobs_waiting' do
it 'at start, it is zero' do
describe '#ready_for_work?' do
it 'before start it is not ready' do
p = Pool.new(1)
expect(p.jobs_waiting).to be(0)
expect(p.ready_for_work?).to be(false)
end
it 'it increases, with each scheduled job if the pool is not running' do
p = Pool.new(1)
job = proc {}
expect(p.jobs_waiting).to be(0)
5.times do |i|
it 'it stops being ready after all workers jobs waiting or running' do
p = Pool.new(5)
p.start
job = proc { sleep(3) } # sleep so workers busy when done scheduling
5.times do
expect(p.ready_for_work?).to be(true)
p.schedule(&job)
expect(p.jobs_waiting).to be(i + 1)
end
expect(p.ready_for_work?).to be(false)
end
it 'it decreases as jobs are run' do
p = Pool.new(1)
p = Pool.new(5)
p.start
job = proc {}
expect(p.jobs_waiting).to be(0)
3.times do
5.times do
expect(p.ready_for_work?).to be(true)
p.schedule(&job)
end
p.start
sleep 2
expect(p.jobs_waiting).to be(0)
expect(p.ready_for_work?).to be(false)
sleep 2 # give the pool time do get at least one task done
expect(p.ready_for_work?).to be(true)
end
end
......@@ -90,6 +94,18 @@ describe GRPC::Pool do
expect(q.pop).to be(o)
p.stop
end
it 'it throws an error if all opf the workers have tasks to do' do
p = Pool.new(5)
p.start
job = proc {}
5.times do
expect(p.ready_for_work?).to be(true)
p.schedule(&job)
end
expect { p.schedule(&job) }.to raise_error
expect { p.schedule(&job) }.to raise_error
end
end
describe '#stop' do
......@@ -114,17 +130,17 @@ describe GRPC::Pool do
describe '#start' do
it 'runs pre-scheduled jobs' do
p = Pool.new(2)
p = Pool.new(5)
p.start
o, q = Object.new, Queue.new
n = 5 # arbitrary
n.times { p.schedule(o, &q.method(:push)) }
p.start
n.times { expect(q.pop).to be(o) }
p.stop
end
it 'runs jobs as they are scheduled ' do
p = Pool.new(2)
it 'runs jobs as they are scheduled' do
p = Pool.new(5)
o, q = Object.new, Queue.new
p.start
n = 5 # arbitrary
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment