diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 6fea908b66bc3d2a58d2c7663fd69cc25da62a3c..423f347acd29af05b69490d159e21ef0e041a3fb 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -330,7 +330,17 @@ class Server::SyncRequestThreadManager : public ThreadManager { void Shutdown() override { server_cq_->Shutdown(); -ThreadManager::Shutdown(); + ThreadManager::Shutdown(); + } + + void Wait() override { + ThreadManager::Wait(); + // Drain any pending items from the queue + void* tag; + bool ok; + while (server_cq_->Next(&tag, &ok)) { + // Do nothing + } } void Start() { diff --git a/src/cpp/thread_manager/thread_manager.cc b/src/cpp/thread_manager/thread_manager.cc index b6d57545113a5225c6d422e551dab47fa3c038cd..39b9691b5f524f054c6b72ce0d72d6a803c9ab8a 100644 --- a/src/cpp/thread_manager/thread_manager.cc +++ b/src/cpp/thread_manager/thread_manager.cc @@ -100,6 +100,8 @@ void ThreadManager::MarkAsCompleted(WorkerThread* thd) { void ThreadManager::CleanupCompletedThreads() { std::list<WorkerThread*> completed_threads; { + // swap out the completed threads list: allows other threads to clean up + // more quickly std::unique_lock<std::mutex> lock(list_mu_); completed_threads.swap(completed_threads_); } @@ -112,20 +114,6 @@ void ThreadManager::Initialize() { } } -// If the number of pollers (i.e threads currently blocked in PollForWork()) is -// less than max threshold (i.e max_pollers_) and the total number of threads is -// below the maximum threshold, we can let the current thread continue as poller -bool ThreadManager::MaybeContinueAsPoller(bool work_found) { - std::unique_lock<std::mutex> lock(mu_); - gpr_log(GPR_DEBUG, "s=%d wf=%d np=%d mp=%d", shutdown_, work_found, num_pollers_, max_pollers_); - if (shutdown_ || (!work_found && num_pollers_ > max_pollers_)) { - return false; - } - - num_pollers_++; - return true; -} - // Create a new poller if the current number of pollers i.e num_pollers_ (i.e // threads currently blocked in PollForWork()) is below the threshold (i.e // min_pollers_) and the total number of threads is below the maximum threshold @@ -143,48 +131,48 @@ void ThreadManager::MaybeCreatePoller() { } void ThreadManager::MainWorkLoop() { - void* tag; - bool ok; - - /* - 1. Poll for work (i.e PollForWork()) - 2. After returning from PollForWork, reduce the number of pollers by 1. If - PollForWork() returned a TIMEOUT, then it may indicate that we have more - polling threads than needed. Check if the number of pollers is greater - than min_pollers and if so, terminate the thread. - 3. Since we are short of one poller now, see if a new poller has to be - created (i.e see MaybeCreatePoller() for more details) - 4. Do the actual work (DoWork()) - 5. After doing the work, see it this thread can resume polling work (i.e - see MaybeContinueAsPoller() for more details) */ - WorkStatus work_status; while (true) { - bool done = false; - work_status = PollForWork(&tag, &ok); + void* tag; + bool ok; + WorkStatus work_status = PollForWork(&tag, &ok); std::unique_lock<std::mutex> lock(mu_); + // Reduce the number of pollers by 1 and check what happened with the poll num_pollers_--; - gpr_log(GPR_DEBUG, "%p: work_status:%d num_pollers:%d min_pollers:%d max_pollers:%d num_threads:%d shutdown:%d", this, work_status, num_pollers_, min_pollers_, max_pollers_, num_threads_, shutdown_); + bool done = false; switch (work_status) { - case TIMEOUT: - if (shutdown_ || num_pollers_ >= max_pollers_) done = true; - break; - case SHUTDOWN: done = true; break; - case WORK_FOUND: - if (!shutdown_ && num_pollers_ < min_pollers_) { - num_pollers_++; - num_threads_++; - lock.unlock(); - new WorkerThread(this); - } else { - lock.unlock(); - } - DoWork(tag, ok); - lock.lock(); - if (shutdown_) done = true; - break; + case TIMEOUT: + // If we timed out and we have more pollers than we need (or we are + // shutdown), finish this thread + if (shutdown_ || num_pollers_ > max_pollers_) done = true; + break; + case SHUTDOWN: + // If the thread manager is shutdown, finish this thread + done = true; + break; + case WORK_FOUND: + // If we got work and there are now insufficient pollers, start a new + // one + if (!shutdown_ && num_pollers_ < min_pollers_) { + num_pollers_++; + num_threads_++; + lock.unlock(); + new WorkerThread(this); + } else { + lock.unlock(); + } + DoWork(tag, ok); + lock.lock(); + // If we're shutdown, we should finish at this point. + // If not, there's a chance that we'll exceed the max poller count: that + // is explicitly ok - we'll decrease after one poll timeout, and prevent + // some thrashing starting up and shutting down threads + if (shutdown_) done = true; + break; } -if (done) break; + // If we decided to finish the thread, break out of the while loop + if (done) break; + // ... otherwise increase poller count and continue num_pollers_++; }; diff --git a/src/cpp/thread_manager/thread_manager.h b/src/cpp/thread_manager/thread_manager.h index 7d832ad16a6c1b3d5222806ec03e32753c2539c0..c9435011f95ba321abe86b65b8c23a293e1d2177 100644 --- a/src/cpp/thread_manager/thread_manager.h +++ b/src/cpp/thread_manager/thread_manager.h @@ -96,7 +96,7 @@ class ThreadManager { // A blocking call that returns only after the ThreadManager has shutdown and // all the threads have drained all the outstanding work - void Wait(); + virtual void Wait(); private: // Helper wrapper class around std::thread. This takes a ThreadManager object @@ -126,10 +126,6 @@ class ThreadManager { // minimum number of pollers needed (i.e min_pollers). void MaybeCreatePoller(); - // Returns true if the current thread can resume as a poller. i.e if the - // current number of pollers is less than the max_pollers. - bool MaybeContinueAsPoller(bool work_found); - void MarkAsCompleted(WorkerThread* thd); void CleanupCompletedThreads();