diff --git a/src/cpp/rpcmanager/grpc_rpc_manager.cc b/src/cpp/rpcmanager/grpc_rpc_manager.cc index 58b337da63476030448696b6cb8172974174152b..2299dbdcd38537ab093e3ccefddd143816913375 100644 --- a/src/cpp/rpcmanager/grpc_rpc_manager.cc +++ b/src/cpp/rpcmanager/grpc_rpc_manager.cc @@ -59,12 +59,14 @@ GrpcRpcManager::GrpcRpcManager(int min_pollers, int max_pollers) : shutdown_(false), num_pollers_(0), min_pollers_(min_pollers), - max_pollers_(max_pollers == -1 ? INT_MAX: max_pollers), + max_pollers_(max_pollers == -1 ? INT_MAX : max_pollers), num_threads_(0) {} GrpcRpcManager::~GrpcRpcManager() { - std::unique_lock<grpc::mutex> lock(mu_); - GPR_ASSERT(num_threads_ == 0); + { + std::unique_lock<grpc::mutex> lock(mu_); + GPR_ASSERT(num_threads_ == 0); + } CleanupCompletedThreads(); } @@ -87,8 +89,16 @@ bool GrpcRpcManager::IsShutdown() { } void GrpcRpcManager::MarkAsCompleted(GrpcRpcManagerThread* thd) { - std::unique_lock<grpc::mutex> lock(list_mu_); - completed_threads_.push_back(thd); + { + std::unique_lock<grpc::mutex> list_lock(list_mu_); + completed_threads_.push_back(thd); + } + + grpc::unique_lock<grpc::mutex> lock(mu_); + num_threads_--; + if (num_threads_ == 0) { + shutdown_cv_.notify_one(); + } } void GrpcRpcManager::CleanupCompletedThreads() { @@ -169,17 +179,10 @@ void GrpcRpcManager::MainWorkLoop() { } } while (MaybeContinueAsPoller()); - // If we are here, either GrpcRpcManager is shutting down or it already has - // enough threads. In both cases, current thread can be terminated - { - grpc::unique_lock<grpc::mutex> lock(mu_); - num_threads_--; - if (num_threads_ == 0) { - shutdown_cv_.notify_one(); - } - } - CleanupCompletedThreads(); + + // If we are here, either GrpcRpcManager is shutting down or it already has + // enough threads. } } // namespace grpc