From cdd420304207bd0c60f23580a04f1ddcfd649fd4 Mon Sep 17 00:00:00 2001
From: Alexander Polcyn <apolcyn@google.com>
Date: Tue, 22 Nov 2016 11:28:28 +0100
Subject: [PATCH] use thread pool from v1.0.x but keep server call life cycle
 in master

---
 grpc.gemspec                            |   1 -
 src/ruby/lib/grpc/generic/rpc_server.rb | 155 ++++++++++++++++++++----
 2 files changed, 134 insertions(+), 22 deletions(-)

diff --git a/grpc.gemspec b/grpc.gemspec
index e2a2ffc21a..cce6f2cfcc 100755
--- a/grpc.gemspec
+++ b/grpc.gemspec
@@ -29,7 +29,6 @@ Gem::Specification.new do |s|
 
   s.add_dependency 'google-protobuf', '~> 3.0.2'
   s.add_dependency 'googleauth',      '~> 0.5.1'
-  s.add_dependency 'concurrent-ruby'
 
   s.add_development_dependency 'bundler',            '~> 1.9'
   s.add_development_dependency 'facter',             '~> 2.4'
diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb
index ae07439fcf..00e0db71be 100644
--- a/src/ruby/lib/grpc/generic/rpc_server.rb
+++ b/src/ruby/lib/grpc/generic/rpc_server.rb
@@ -31,10 +31,132 @@ require_relative '../grpc'
 require_relative 'active_call'
 require_relative 'service'
 require 'thread'
-require 'concurrent'
 
 # GRPC contains the General RPC module.
 module GRPC
+  # Pool is a simple thread pool.
+  class Pool
+    # Default keep alive period is 1s
+    DEFAULT_KEEP_ALIVE = 1
+
+    def initialize(size, keep_alive: DEFAULT_KEEP_ALIVE)
+      fail 'pool size must be positive' unless size > 0
+      @jobs = Queue.new
+      @size = size
+      @stopped = false
+      @stop_mutex = Mutex.new # needs to be held when accessing @stopped
+      @stop_cond = ConditionVariable.new
+      @workers = []
+      @keep_alive = keep_alive
+
+      # Each worker thread has its own queue to push and pull jobs
+      # these queues are put into @ready_queues when that worker is idle
+      @ready_workers = Queue.new
+    end
+
+    # Returns the number of jobs waiting
+    def jobs_waiting
+      @jobs.size
+    end
+
+    def ready_for_work?
+      # Busy worker threads are either doing work, or have a single job
+      # waiting on them. Workers that are idle with no jobs waiting
+      # have their "queues" in @ready_workers
+      !@ready_workers.empty?
+    end
+
+    # Runs the given block on the queue with the provided args.
+    #
+    # @param args the args passed blk when it is called
+    # @param blk the block to call
+    def schedule(*args, &blk)
+      return if blk.nil?
+      @stop_mutex.synchronize do
+        if @stopped
+          GRPC.logger.warn('did not schedule job, already stopped')
+          return
+        end
+        GRPC.logger.info('schedule another job')
+        fail 'No worker threads available' if @ready_workers.empty?
+        worker_queue = @ready_workers.pop
+
+        fail 'worker already has a task waiting' unless worker_queue.empty?
+        worker_queue << [blk, args]
+      end
+    end
+
+    # Starts running the jobs in the thread pool.
+    def start
+      @stop_mutex.synchronize do
+        fail 'already stopped' if @stopped
+      end
+      until @workers.size == @size.to_i
+        new_worker_queue = Queue.new
+        @ready_workers << new_worker_queue
+        next_thread = Thread.new(new_worker_queue) do |jobs|
+          catch(:exit) do  # allows { throw :exit } to kill a thread
+            loop_execute_jobs(jobs)
+          end
+          remove_current_thread
+        end
+        @workers << next_thread
+      end
+    end
+
+    # Stops the jobs in the pool
+    def stop
+      GRPC.logger.info('stopping, will wait for all the workers to exit')
+      schedule { throw :exit } while ready_for_work?
+      @stop_mutex.synchronize do  # wait @keep_alive for works to stop
+        @stopped = true
+        @stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0
+      end
+      forcibly_stop_workers
+      GRPC.logger.info('stopped, all workers are shutdown')
+    end
+
+    protected
+
+    # Forcibly shutdown any threads that are still alive.
+    def forcibly_stop_workers
+      return unless @workers.size > 0
+      GRPC.logger.info("forcibly terminating #{@workers.size} worker(s)")
+      @workers.each do |t|
+        next unless t.alive?
+        begin
+          t.exit
+        rescue StandardError => e
+          GRPC.logger.warn('error while terminating a worker')
+          GRPC.logger.warn(e)
+        end
+      end
+    end
+
+    # removes the threads from workers, and signal when all the
+    # threads are complete.
+    def remove_current_thread
+      @stop_mutex.synchronize do
+        @workers.delete(Thread.current)
+        @stop_cond.signal if @workers.size.zero?
+      end
+    end
+
+    def loop_execute_jobs(worker_queue)
+      loop do
+        begin
+          blk, args = worker_queue.pop
+          blk.call(*args)
+        rescue StandardError => e
+          GRPC.logger.warn('Error in worker thread')
+          GRPC.logger.warn(e)
+        end
+        # there shouldn't be any work given to this thread while its busy
+        fail('received a task while busy') unless worker_queue.empty?
+        @ready_workers << worker_queue
+      end
+    end
+  end
 
   # RpcServer hosts a number of services and makes them available on the
   # network.
@@ -45,17 +167,13 @@ module GRPC
 
     def_delegators :@server, :add_http2_port
 
-    # Default max size of the thread pool size is 100
-    DEFAULT_MAX_POOL_SIZE = 100
-
-    # Default minimum size of the thread pool is 5
-    DEFAULT_MIN_POOL_SIZE = 5
+    # Default thread pool size is 30
+    DEFAULT_POOL_SIZE = 30
 
     # Deprecated due to internal changes to the thread pool
     DEFAULT_MAX_WAITING_REQUESTS = 20
 
     # Default poll period is 1s
-    # Used for grpc server shutdown and thread pool shutdown timeouts
     DEFAULT_POLL_PERIOD = 1
 
     # Signal check period is 0.25s
@@ -76,9 +194,9 @@ module GRPC
     # There are some specific keyword args used to configure the RpcServer
     # instance.
     #
-    # * pool_size: the maximum size of the thread pool that the server's
-    # thread pool can reach. No more concurrent requests can be made than
-    # the size of the thread pool
+    # * pool_size: the size of the thread pool the server uses to run its
+    # threads. No more concurrent requests can be made than the size
+    # of the thread pool
     #
     # * max_waiting_requests: Deprecated due to internal changes to the thread
     # pool. This is still an argument for compatibility but is ignored.
@@ -93,8 +211,7 @@ module GRPC
     #
     # * server_args:
     # A server arguments hash to be passed down to the underlying core server
-    def initialize(pool_size:DEFAULT_MAX_POOL_SIZE,
-                   min_pool_size:DEFAULT_MIN_POOL_SIZE,
+    def initialize(pool_size:DEFAULT_POOL_SIZE,
                    max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS,
                    poll_period:DEFAULT_POLL_PERIOD,
                    connect_md_proc:nil,
@@ -102,12 +219,8 @@ module GRPC
       @connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc)
       @max_waiting_requests = max_waiting_requests
       @poll_period = poll_period
-
-      @pool = Concurrent::ThreadPoolExecutor.new(
-        min_threads: [min_pool_size, pool_size].min,
-        max_threads: pool_size,
-        max_queue: max_waiting_requests,
-        fallback_policy: :discard)
+      @pool_size = pool_size
+      @pool = Pool.new(@pool_size)
       @run_cond = ConditionVariable.new
       @run_mutex = Mutex.new
       # running_state can take 4 values: :not_started, :running, :stopping, and
@@ -128,8 +241,7 @@ module GRPC
       end
       deadline = from_relative_time(@poll_period)
       @server.close(deadline)
-      @pool.shutdown
-      @pool.wait_for_termination(@poll_period)
+      @pool.stop
     end
 
     def running_state
@@ -226,6 +338,7 @@ module GRPC
     def run
       @run_mutex.synchronize do
         fail 'cannot run without registering services' if rpc_descs.size.zero?
+        @pool.start
         @server.start
         transition_running_state(:running)
         @run_cond.broadcast
@@ -273,7 +386,7 @@ module GRPC
           break if (!an_rpc.nil?) && an_rpc.call.nil?
           active_call = new_active_server_call(an_rpc)
           unless active_call.nil?
-            @pool.post(active_call) do |ac|
+            @pool.schedule(active_call) do |ac|
               c, mth = ac
               begin
                 rpc_descs[mth].run_server_method(c, rpc_handlers[mth])
-- 
GitLab