diff --git a/include/grpc++/dynamic_thread_pool.h b/include/grpc++/dynamic_thread_pool.h index e01063ced2351b66eec74f9c9cdaed1ab633d0db..bc4e2d4d749da309e9e3c3888419bb1e6d23d492 100644 --- a/include/grpc++/dynamic_thread_pool.h +++ b/include/grpc++/dynamic_thread_pool.h @@ -70,7 +70,6 @@ class DynamicThreadPool GRPC_FINAL : public ThreadPoolInterface { int reserve_threads_; int nthreads_; int threads_waiting_; - std::list<DynamicThread*> live_threads_; std::list<DynamicThread*> dead_threads_; void ThreadFunc(); diff --git a/src/cpp/server/dynamic_thread_pool.cc b/src/cpp/server/dynamic_thread_pool.cc index c42563103c6f1a5561ed21676b11c9eba808e505..7e9b01143aa5bcfed2297dac5e9aec00411acec5 100644 --- a/src/cpp/server/dynamic_thread_pool.cc +++ b/src/cpp/server/dynamic_thread_pool.cc @@ -50,14 +50,9 @@ void DynamicThreadPool::DynamicThread::ThreadFunc() { // Now that we have killed ourselves, we should reduce the thread count grpc::unique_lock<grpc::mutex> lock(pool_->mu_); pool_->nthreads_--; - // Move ourselves from live list to dead list - for (auto t = pool_->live_threads_.begin(); t != pool_->live_threads_.end(); - t++) { - if ((*t) == this) { - t = pool_->live_threads_.erase(t); - pool_->dead_threads_.push_back(this); - } - } + // Move ourselves to dead list + pool_->dead_threads_.push_back(this); + if ((pool_->shutdown_) && (pool_->nthreads_ == 0)) { pool_->shutdown_cv_.notify_one(); } @@ -69,7 +64,7 @@ void DynamicThreadPool::ThreadFunc() { grpc::unique_lock<grpc::mutex> lock(mu_); if (!shutdown_ && callbacks_.empty()) { // If there are too many threads waiting, then quit this thread - if (threads_waiting_ == reserve_threads_) { + if (threads_waiting_ >= reserve_threads_) { break; } threads_waiting_++; @@ -90,11 +85,12 @@ void DynamicThreadPool::ThreadFunc() { } DynamicThreadPool::DynamicThreadPool(int reserve_threads) : - shutdown_(false), reserve_threads_(reserve_threads), threads_waiting_(0) { + shutdown_(false), reserve_threads_(reserve_threads), nthreads_(0), + threads_waiting_(0) { for (int i = 0; i < reserve_threads_; i++) { grpc::lock_guard<grpc::mutex> lock(mu_); nthreads_++; - live_threads_.push_back(new DynamicThread(this)); + new DynamicThread(this); } } @@ -117,13 +113,16 @@ DynamicThreadPool::~DynamicThreadPool() { void DynamicThreadPool::Add(const std::function<void()>& callback) { grpc::lock_guard<grpc::mutex> lock(mu_); + // Add works to the callbacks list + callbacks_.push(callback); + // Increase pool size or notify as needed if (threads_waiting_ == 0) { // Kick off a new thread nthreads_++; - live_threads_.push_back(new DynamicThread(this)); + new DynamicThread(this); + } else { + cv_.notify_one(); } - callbacks_.push(callback); - cv_.notify_one(); // Also use this chance to harvest dead threads if (!dead_threads_.empty()) { ReapThreads(&dead_threads_);