Skip to content
Snippets Groups Projects
Commit 02b80549 authored by vjpai's avatar vjpai
Browse files

Bug fixes

parent 67ab9105
No related branches found
No related tags found
No related merge requests found
...@@ -70,7 +70,6 @@ class DynamicThreadPool GRPC_FINAL : public ThreadPoolInterface { ...@@ -70,7 +70,6 @@ class DynamicThreadPool GRPC_FINAL : public ThreadPoolInterface {
int reserve_threads_; int reserve_threads_;
int nthreads_; int nthreads_;
int threads_waiting_; int threads_waiting_;
std::list<DynamicThread*> live_threads_;
std::list<DynamicThread*> dead_threads_; std::list<DynamicThread*> dead_threads_;
void ThreadFunc(); void ThreadFunc();
......
...@@ -50,14 +50,9 @@ void DynamicThreadPool::DynamicThread::ThreadFunc() { ...@@ -50,14 +50,9 @@ void DynamicThreadPool::DynamicThread::ThreadFunc() {
// Now that we have killed ourselves, we should reduce the thread count // Now that we have killed ourselves, we should reduce the thread count
grpc::unique_lock<grpc::mutex> lock(pool_->mu_); grpc::unique_lock<grpc::mutex> lock(pool_->mu_);
pool_->nthreads_--; pool_->nthreads_--;
// Move ourselves from live list to dead list // Move ourselves to dead list
for (auto t = pool_->live_threads_.begin(); t != pool_->live_threads_.end(); pool_->dead_threads_.push_back(this);
t++) {
if ((*t) == this) {
t = pool_->live_threads_.erase(t);
pool_->dead_threads_.push_back(this);
}
}
if ((pool_->shutdown_) && (pool_->nthreads_ == 0)) { if ((pool_->shutdown_) && (pool_->nthreads_ == 0)) {
pool_->shutdown_cv_.notify_one(); pool_->shutdown_cv_.notify_one();
} }
...@@ -69,7 +64,7 @@ void DynamicThreadPool::ThreadFunc() { ...@@ -69,7 +64,7 @@ void DynamicThreadPool::ThreadFunc() {
grpc::unique_lock<grpc::mutex> lock(mu_); grpc::unique_lock<grpc::mutex> lock(mu_);
if (!shutdown_ && callbacks_.empty()) { if (!shutdown_ && callbacks_.empty()) {
// If there are too many threads waiting, then quit this thread // If there are too many threads waiting, then quit this thread
if (threads_waiting_ == reserve_threads_) { if (threads_waiting_ >= reserve_threads_) {
break; break;
} }
threads_waiting_++; threads_waiting_++;
...@@ -90,11 +85,12 @@ void DynamicThreadPool::ThreadFunc() { ...@@ -90,11 +85,12 @@ void DynamicThreadPool::ThreadFunc() {
} }
DynamicThreadPool::DynamicThreadPool(int reserve_threads) : DynamicThreadPool::DynamicThreadPool(int reserve_threads) :
shutdown_(false), reserve_threads_(reserve_threads), threads_waiting_(0) { shutdown_(false), reserve_threads_(reserve_threads), nthreads_(0),
threads_waiting_(0) {
for (int i = 0; i < reserve_threads_; i++) { for (int i = 0; i < reserve_threads_; i++) {
grpc::lock_guard<grpc::mutex> lock(mu_); grpc::lock_guard<grpc::mutex> lock(mu_);
nthreads_++; nthreads_++;
live_threads_.push_back(new DynamicThread(this)); new DynamicThread(this);
} }
} }
...@@ -117,13 +113,16 @@ DynamicThreadPool::~DynamicThreadPool() { ...@@ -117,13 +113,16 @@ DynamicThreadPool::~DynamicThreadPool() {
void DynamicThreadPool::Add(const std::function<void()>& callback) { void DynamicThreadPool::Add(const std::function<void()>& callback) {
grpc::lock_guard<grpc::mutex> lock(mu_); grpc::lock_guard<grpc::mutex> lock(mu_);
// Add works to the callbacks list
callbacks_.push(callback);
// Increase pool size or notify as needed
if (threads_waiting_ == 0) { if (threads_waiting_ == 0) {
// Kick off a new thread // Kick off a new thread
nthreads_++; nthreads_++;
live_threads_.push_back(new DynamicThread(this)); new DynamicThread(this);
} else {
cv_.notify_one();
} }
callbacks_.push(callback);
cv_.notify_one();
// Also use this chance to harvest dead threads // Also use this chance to harvest dead threads
if (!dead_threads_.empty()) { if (!dead_threads_.empty()) {
ReapThreads(&dead_threads_); ReapThreads(&dead_threads_);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment