From 9c74487072abdef96ed1bd49c131aaf22fee5238 Mon Sep 17 00:00:00 2001
From: Alexander Polcyn <apolcyn@google.com>
Date: Fri, 12 Aug 2016 14:58:10 -0700
Subject: [PATCH] Use thread pool from concurrent-ruby on ruby server

---
 grpc.gemspec                                  |   1 +
 src/ruby/lib/grpc/generic/rpc_server.rb       | 143 +++---------------
 src/ruby/spec/generic/rpc_server_pool_spec.rb | 138 -----------------
 src/ruby/spec/generic/rpc_server_spec.rb      |  13 +-
 4 files changed, 32 insertions(+), 263 deletions(-)
 delete mode 100644 src/ruby/spec/generic/rpc_server_pool_spec.rb

diff --git a/grpc.gemspec b/grpc.gemspec
index 29d8afef9b..1b2c553dd3 100755
--- a/grpc.gemspec
+++ b/grpc.gemspec
@@ -29,6 +29,7 @@ Gem::Specification.new do |s|
 
   s.add_dependency 'google-protobuf', '~> 3.0.0.alpha.5.0.3'
   s.add_dependency 'googleauth',      '~> 0.5.1'
+  s.add_dependency 'concurrent-ruby'
 
   s.add_development_dependency 'bundler',            '~> 1.9'
   s.add_development_dependency 'facter',             '~> 2.4'
diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb
index 8ea798dce0..da0f6503db 100644
--- a/src/ruby/lib/grpc/generic/rpc_server.rb
+++ b/src/ruby/lib/grpc/generic/rpc_server.rb
@@ -31,113 +31,10 @@ require_relative '../grpc'
 require_relative 'active_call'
 require_relative 'service'
 require 'thread'
+require 'concurrent'
 
 # GRPC contains the General RPC module.
 module GRPC
-  # Pool is a simple thread pool.
-  class Pool
-    # Default keep alive period is 1s
-    DEFAULT_KEEP_ALIVE = 1
-
-    def initialize(size, keep_alive: DEFAULT_KEEP_ALIVE)
-      fail 'pool size must be positive' unless size > 0
-      @jobs = Queue.new
-      @size = size
-      @stopped = false
-      @stop_mutex = Mutex.new # needs to be held when accessing @stopped
-      @stop_cond = ConditionVariable.new
-      @workers = []
-      @keep_alive = keep_alive
-    end
-
-    # Returns the number of jobs waiting
-    def jobs_waiting
-      @jobs.size
-    end
-
-    # Runs the given block on the queue with the provided args.
-    #
-    # @param args the args passed blk when it is called
-    # @param blk the block to call
-    def schedule(*args, &blk)
-      return if blk.nil?
-      @stop_mutex.synchronize do
-        if @stopped
-          GRPC.logger.warn('did not schedule job, already stopped')
-          return
-        end
-        GRPC.logger.info('schedule another job')
-        @jobs << [blk, args]
-      end
-    end
-
-    # Starts running the jobs in the thread pool.
-    def start
-      @stop_mutex.synchronize do
-        fail 'already stopped' if @stopped
-      end
-      until @workers.size == @size.to_i
-        next_thread = Thread.new do
-          catch(:exit) do  # allows { throw :exit } to kill a thread
-            loop_execute_jobs
-          end
-          remove_current_thread
-        end
-        @workers << next_thread
-      end
-    end
-
-    # Stops the jobs in the pool
-    def stop
-      GRPC.logger.info('stopping, will wait for all the workers to exit')
-      @workers.size.times { schedule { throw :exit } }
-      @stop_mutex.synchronize do  # wait @keep_alive for works to stop
-        @stopped = true
-        @stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0
-      end
-      forcibly_stop_workers
-      GRPC.logger.info('stopped, all workers are shutdown')
-    end
-
-    protected
-
-    # Forcibly shutdown any threads that are still alive.
-    def forcibly_stop_workers
-      return unless @workers.size > 0
-      GRPC.logger.info("forcibly terminating #{@workers.size} worker(s)")
-      @workers.each do |t|
-        next unless t.alive?
-        begin
-          t.exit
-        rescue StandardError => e
-          GRPC.logger.warn('error while terminating a worker')
-          GRPC.logger.warn(e)
-        end
-      end
-    end
-
-    # removes the threads from workers, and signal when all the
-    # threads are complete.
-    def remove_current_thread
-      @stop_mutex.synchronize do
-        @workers.delete(Thread.current)
-        @stop_cond.signal if @workers.size.zero?
-      end
-    end
-
-    def loop_execute_jobs
-      loop do
-        begin
-          blk, args = @jobs.pop
-          blk.call(*args)
-        rescue StandardError => e
-          GRPC.logger.warn('Error in worker thread')
-          GRPC.logger.warn(e)
-        end
-      end
-    end
-  end
-
   # RpcServer hosts a number of services and makes them available on the
   # network.
   class RpcServer
@@ -147,11 +44,14 @@ module GRPC
 
     def_delegators :@server, :add_http2_port
 
-    # Default thread pool size is 3
-    DEFAULT_POOL_SIZE = 3
+    # Default max size of the thread pool size is 100
+    DEFAULT_MAX_POOL_SIZE = 100
+
+    # Default minimum size of the thread pool is 5
+    DEFAULT_MIN_POOL_SIZE = 5
 
-    # Default max_waiting_requests size is 20
-    DEFAULT_MAX_WAITING_REQUESTS = 20
+    # Default max_waiting_requests size is 60
+    DEFAULT_MAX_WAITING_REQUESTS = 60
 
     # Default poll period is 1s
     DEFAULT_POLL_PERIOD = 1
@@ -174,8 +74,8 @@ module GRPC
     # There are some specific keyword args used to configure the RpcServer
     # instance.
     #
-    # * pool_size: the size of the thread pool the server uses to run its
-    # threads
+    # * pool_size: the maximum size of the thread pool that the server's
+    # thread pool can reach.
     #
     # * max_waiting_requests: the maximum number of requests that are not
     # being handled to allow. When this limit is exceeded, the server responds
@@ -191,7 +91,8 @@ module GRPC
     #
     # * server_args:
     # A server arguments hash to be passed down to the underlying core server
-    def initialize(pool_size:DEFAULT_POOL_SIZE,
+    def initialize(pool_size:DEFAULT_MAX_POOL_SIZE,
+                   min_pool_size:DEFAULT_MIN_POOL_SIZE,
                    max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS,
                    poll_period:DEFAULT_POLL_PERIOD,
                    connect_md_proc:nil,
@@ -199,8 +100,12 @@ module GRPC
       @connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc)
       @max_waiting_requests = max_waiting_requests
       @poll_period = poll_period
-      @pool_size = pool_size
-      @pool = Pool.new(@pool_size)
+
+      @pool = Concurrent::ThreadPoolExecutor.new(
+        min_threads: [min_pool_size, pool_size].min,
+        max_threads: pool_size,
+        max_queue: max_waiting_requests,
+        fallback_policy: :discard)
       @run_cond = ConditionVariable.new
       @run_mutex = Mutex.new
       # running_state can take 4 values: :not_started, :running, :stopping, and
@@ -221,7 +126,8 @@ module GRPC
       end
       deadline = from_relative_time(@poll_period)
       @server.close(deadline)
-      @pool.stop
+      @pool.shutdown
+      @pool.wait_for_termination
     end
 
     def running_state
@@ -318,7 +224,6 @@ module GRPC
     def run
       @run_mutex.synchronize do
         fail 'cannot run without registering services' if rpc_descs.size.zero?
-        @pool.start
         @server.start
         transition_running_state(:running)
         @run_cond.broadcast
@@ -330,9 +235,11 @@ module GRPC
 
     # Sends RESOURCE_EXHAUSTED if there are too many unprocessed jobs
     def available?(an_rpc)
-      jobs_count, max = @pool.jobs_waiting, @max_waiting_requests
+      jobs_count, max = @pool.queue_length, @pool.max_queue
       GRPC.logger.info("waiting: #{jobs_count}, max: #{max}")
-      return an_rpc if @pool.jobs_waiting <= @max_waiting_requests
+
+      # remaining capacity for ThreadPoolExecutors is -1 if unbounded
+      return an_rpc if @pool.remaining_capacity != 0
       GRPC.logger.warn("NOT AVAILABLE: too many jobs_waiting: #{an_rpc}")
       noop = proc { |x| x }
 
@@ -368,7 +275,7 @@ module GRPC
           break if (!an_rpc.nil?) && an_rpc.call.nil?
           active_call = new_active_server_call(an_rpc)
           unless active_call.nil?
-            @pool.schedule(active_call) do |ac|
+            @pool.post(active_call) do |ac|
               c, mth = ac
               begin
                 rpc_descs[mth].run_server_method(c, rpc_handlers[mth])
diff --git a/src/ruby/spec/generic/rpc_server_pool_spec.rb b/src/ruby/spec/generic/rpc_server_pool_spec.rb
deleted file mode 100644
index b67008de48..0000000000
--- a/src/ruby/spec/generic/rpc_server_pool_spec.rb
+++ /dev/null
@@ -1,138 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-#     * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-#     * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-#     * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-require 'grpc'
-
-describe GRPC::Pool do
-  Pool = GRPC::Pool
-
-  describe '#new' do
-    it 'raises if a non-positive size is used' do
-      expect { Pool.new(0) }.to raise_error
-      expect { Pool.new(-1) }.to raise_error
-      expect { Pool.new(Object.new) }.to raise_error
-    end
-
-    it 'is constructed OK with a positive size' do
-      expect { Pool.new(1) }.not_to raise_error
-    end
-  end
-
-  describe '#jobs_waiting' do
-    it 'at start, it is zero' do
-      p = Pool.new(1)
-      expect(p.jobs_waiting).to be(0)
-    end
-
-    it 'it increases, with each scheduled job if the pool is not running' do
-      p = Pool.new(1)
-      job = proc {}
-      expect(p.jobs_waiting).to be(0)
-      5.times do |i|
-        p.schedule(&job)
-        expect(p.jobs_waiting).to be(i + 1)
-      end
-    end
-
-    it 'it decreases as jobs are run' do
-      p = Pool.new(1)
-      job = proc {}
-      expect(p.jobs_waiting).to be(0)
-      3.times do
-        p.schedule(&job)
-      end
-      p.start
-      sleep 2
-      expect(p.jobs_waiting).to be(0)
-    end
-  end
-
-  describe '#schedule' do
-    it 'return if the pool is already stopped' do
-      p = Pool.new(1)
-      p.stop
-      job = proc {}
-      expect { p.schedule(&job) }.to_not raise_error
-    end
-
-    it 'adds jobs that get run by the pool' do
-      p = Pool.new(1)
-      p.start
-      o, q = Object.new, Queue.new
-      job = proc { q.push(o) }
-      p.schedule(&job)
-      expect(q.pop).to be(o)
-      p.stop
-    end
-  end
-
-  describe '#stop' do
-    it 'works when there are no scheduled tasks' do
-      p = Pool.new(1)
-      expect { p.stop }.not_to raise_error
-    end
-
-    it 'stops jobs when there are long running jobs' do
-      p = Pool.new(1)
-      p.start
-      o, q = Object.new, Queue.new
-      job = proc do
-        sleep(5)  # long running
-        q.push(o)
-      end
-      p.schedule(&job)
-      sleep(1)  # should ensure the long job gets scheduled
-      expect { p.stop }.not_to raise_error
-    end
-  end
-
-  describe '#start' do
-    it 'runs pre-scheduled jobs' do
-      p = Pool.new(2)
-      o, q = Object.new, Queue.new
-      n = 5  # arbitrary
-      n.times { p.schedule(o, &q.method(:push)) }
-      p.start
-      n.times { expect(q.pop).to be(o) }
-      p.stop
-    end
-
-    it 'runs jobs as they are scheduled ' do
-      p = Pool.new(2)
-      o, q = Object.new, Queue.new
-      p.start
-      n = 5  # arbitrary
-      n.times do
-        p.schedule(o, &q.method(:push))
-        expect(q.pop).to be(o)
-      end
-      p.stop
-    end
-  end
-end
diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb
index 31157cf161..d362e48dee 100644
--- a/src/ruby/spec/generic/rpc_server_spec.rb
+++ b/src/ruby/spec/generic/rpc_server_spec.rb
@@ -395,9 +395,9 @@ describe GRPC::RpcServer do
       it 'should return RESOURCE_EXHAUSTED on too many jobs', server: true do
         opts = {
           server_args: { a_channel_arg: 'an_arg' },
-          pool_size: 1,
+          pool_size: 2,
           poll_period: 1,
-          max_waiting_requests: 0
+          max_waiting_requests: 1
         }
         alt_srv = RpcServer.new(**opts)
         alt_srv.handle(SlowService)
@@ -406,24 +406,23 @@ describe GRPC::RpcServer do
         t = Thread.new { alt_srv.run }
         alt_srv.wait_till_running
         req = EchoMsg.new
-        n = 5  # arbitrary, use as many to ensure the server pool is exceeded
+        n = 20 # arbitrary, use as many to ensure the server pool is exceeded
         threads = []
-        one_failed_as_unavailable = false
+        bad_status_code = nil
         n.times do
           threads << Thread.new do
             stub = SlowStub.new(alt_host, :this_channel_is_insecure)
             begin
               stub.an_rpc(req)
             rescue GRPC::BadStatus => e
-              one_failed_as_unavailable =
-                e.code == StatusCodes::RESOURCE_EXHAUSTED
+              bad_status_code = e.code
             end
           end
         end
         threads.each(&:join)
         alt_srv.stop
         t.join
-        expect(one_failed_as_unavailable).to be(true)
+        expect(bad_status_code).to be(StatusCodes::RESOURCE_EXHAUSTED)
       end
     end
 
-- 
GitLab