diff --git a/src/cpp/rpcmanager/grpc_rpc_manager.cc b/src/cpp/rpcmanager/grpc_rpc_manager.cc index 5bfd4eedadae47778c3454a97878e07ee825dd20..1c7d5adeaf2d93a2a8406ad881647c57103f1ba8 100644 --- a/src/cpp/rpcmanager/grpc_rpc_manager.cc +++ b/src/cpp/rpcmanager/grpc_rpc_manager.cc @@ -83,7 +83,7 @@ void GrpcRpcManager::Wait() { } // For testing only -void GrpcRpcManager::Shutdown() { +void GrpcRpcManager::ShutdownRpcManager() { std::unique_lock<grpc::mutex> lock(mu_); shutdown_ = true; } @@ -131,9 +131,10 @@ void GrpcRpcManager::MaybeCreatePoller() { void GrpcRpcManager::MainWorkLoop() { bool is_work_found = false; + void *tag; do { - PollForWork(is_work_found); + PollForWork(is_work_found, &tag); // Decrement num_pollers since this thread is no longer polling { @@ -146,7 +147,7 @@ void GrpcRpcManager::MainWorkLoop() { MaybeCreatePoller(); // Do actual work - DoWork(); + DoWork(tag); } // Continue to loop if this thread can continue as a poller @@ -158,7 +159,7 @@ void GrpcRpcManager::MainWorkLoop() { grpc::unique_lock<grpc::mutex> lock(mu_); num_threads_--; if (num_threads_ == 0) { - shutdown_cv_.notify_one(); + shutdown_cv_.notify_all(); } } diff --git a/src/cpp/rpcmanager/grpc_rpc_manager.h b/src/cpp/rpcmanager/grpc_rpc_manager.h index 5f89c1599d793952608e722041d0b6d069c1c4ab..a8cc6eb80f228a87749a133a886b99ba005ab816 100644 --- a/src/cpp/rpcmanager/grpc_rpc_manager.h +++ b/src/cpp/rpcmanager/grpc_rpc_manager.h @@ -50,12 +50,12 @@ class GrpcRpcManager { // This function MUST be called before using the object void Initialize(); - virtual void PollForWork(bool& is_work_found) = 0; - virtual void DoWork() = 0; + virtual void PollForWork(bool& is_work_found, void **tag) = 0; + virtual void DoWork(void *tag) = 0; - // Use this for testing purposes only + // Use the following two functions for testing purposes only void Wait(); - void Shutdown(); + void ShutdownRpcManager(); private: // Helper wrapper class around std::thread. This takes a GrpcRpcManager object diff --git a/test/cpp/rpcmanager/grpc_rpc_manager_test.cc b/test/cpp/rpcmanager/grpc_rpc_manager_test.cc index b2e601d95ee283c11ad90fed9c23cd6e35073259..2a306e48ad6852ce685ee2136189163296a37503 100644 --- a/test/cpp/rpcmanager/grpc_rpc_manager_test.cc +++ b/test/cpp/rpcmanager/grpc_rpc_manager_test.cc @@ -45,14 +45,15 @@ using grpc::testing::GrpcRpcManagerTest; // TODO: sreek - Rewrite this test. Find a better test case -void GrpcRpcManagerTest::PollForWork(bool& is_work_found) { +void GrpcRpcManagerTest::PollForWork(bool& is_work_found, void **tag) { { std::unique_lock<grpc::mutex> lock(mu_); std::cout << "Poll: " << std::this_thread::get_id() << std::endl; } is_work_found = true; + *tag = NULL; - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); { std::unique_lock<grpc::mutex> lock(mu_); @@ -60,12 +61,12 @@ void GrpcRpcManagerTest::PollForWork(bool& is_work_found) { if (num_calls_ > 50) { std::cout << "poll: False" << std::endl; is_work_found = false; - Shutdown(); + ShutdownRpcManager(); } } } -void GrpcRpcManagerTest::DoWork() { +void GrpcRpcManagerTest::DoWork(void *tag) { { std::unique_lock<grpc::mutex> lock(mu_); std::cout << "Work: " << std::this_thread::get_id() << std::endl; diff --git a/test/cpp/rpcmanager/grpc_rpc_manager_test.h b/test/cpp/rpcmanager/grpc_rpc_manager_test.h index 5073abd8f1138c64fb795786bd03de2cea6b53cf..42e3549ed13479cbe31b58b411f2d68aac6c3f0f 100644 --- a/test/cpp/rpcmanager/grpc_rpc_manager_test.h +++ b/test/cpp/rpcmanager/grpc_rpc_manager_test.h @@ -43,8 +43,8 @@ class GrpcRpcManagerTest GRPC_FINAL : public GrpcRpcManager { GrpcRpcManagerTest(int min_pollers, int max_pollers, int max_threads) : GrpcRpcManager(min_pollers, max_pollers, max_threads), num_calls_(0){}; - void PollForWork(bool &is_work_found) GRPC_OVERRIDE; - void DoWork() GRPC_OVERRIDE; + void PollForWork(bool &is_work_found, void **tag) GRPC_OVERRIDE; + void DoWork(void *tag) GRPC_OVERRIDE; private: grpc::mutex mu_;