Skip to content
Snippets Groups Projects
Commit 4306eeee authored by Sree Kuchibhotla's avatar Sree Kuchibhotla
Browse files

Minor changes

parent aabada97
No related branches found
No related tags found
No related merge requests found
...@@ -34,6 +34,7 @@ ...@@ -34,6 +34,7 @@
#include <grpc++/impl/sync.h> #include <grpc++/impl/sync.h>
#include <grpc++/impl/thd.h> #include <grpc++/impl/thd.h>
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include <climits>
#include "src/cpp/rpcmanager/grpc_rpc_manager.h" #include "src/cpp/rpcmanager/grpc_rpc_manager.h"
...@@ -58,7 +59,7 @@ GrpcRpcManager::GrpcRpcManager(int min_pollers, int max_pollers) ...@@ -58,7 +59,7 @@ GrpcRpcManager::GrpcRpcManager(int min_pollers, int max_pollers)
: shutdown_(false), : shutdown_(false),
num_pollers_(0), num_pollers_(0),
min_pollers_(min_pollers), min_pollers_(min_pollers),
max_pollers_(max_pollers), max_pollers_(max_pollers == -1 ? INT_MAX: max_pollers),
num_threads_(0) {} num_threads_(0) {}
GrpcRpcManager::~GrpcRpcManager() { GrpcRpcManager::~GrpcRpcManager() {
...@@ -111,6 +112,7 @@ void GrpcRpcManager::Initialize() { ...@@ -111,6 +112,7 @@ void GrpcRpcManager::Initialize() {
// below the maximum threshold, we can let the current thread continue as poller // below the maximum threshold, we can let the current thread continue as poller
bool GrpcRpcManager::MaybeContinueAsPoller() { bool GrpcRpcManager::MaybeContinueAsPoller() {
std::unique_lock<grpc::mutex> lock(mu_); std::unique_lock<grpc::mutex> lock(mu_);
if (shutdown_ || num_pollers_ > max_pollers_) { if (shutdown_ || num_pollers_ > max_pollers_) {
return false; return false;
} }
...@@ -169,8 +171,8 @@ void GrpcRpcManager::MainWorkLoop() { ...@@ -169,8 +171,8 @@ void GrpcRpcManager::MainWorkLoop() {
} }
} while (MaybeContinueAsPoller()); } while (MaybeContinueAsPoller());
// If we are here, it means that the GrpcRpcManager already has enough threads // If we are here, either GrpcRpcManager is shutting down or it already has
// and that the current thread can be terminated // enough threads. In both cases, current thread can be terminated
{ {
grpc::unique_lock<grpc::mutex> lock(mu_); grpc::unique_lock<grpc::mutex> lock(mu_);
num_threads_--; num_threads_--;
......
...@@ -306,7 +306,14 @@ class Server::SyncRequestManager : public GrpcRpcManager { ...@@ -306,7 +306,14 @@ class Server::SyncRequestManager : public GrpcRpcManager {
void DoWork(void* tag, bool ok) GRPC_OVERRIDE { void DoWork(void* tag, bool ok) GRPC_OVERRIDE {
SyncRequest* sync_req = static_cast<SyncRequest*>(tag); SyncRequest* sync_req = static_cast<SyncRequest*>(tag);
if (ok && sync_req) {
if (!sync_req) {
// No tag. Nothing to work on
// TODO (sreek) - Log a warning here since this is an unlikely case
return;
}
if (ok) {
SyncRequest::CallData cd(server_, sync_req); SyncRequest::CallData cd(server_, sync_req);
{ {
sync_req->SetupRequest(); sync_req->SetupRequest();
...@@ -318,9 +325,13 @@ class Server::SyncRequestManager : public GrpcRpcManager { ...@@ -318,9 +325,13 @@ class Server::SyncRequestManager : public GrpcRpcManager {
} }
GPR_TIMER_SCOPE("cd.Run()", 0); GPR_TIMER_SCOPE("cd.Run()", 0);
cd.Run(global_callbacks_); cd.Run(global_callbacks_);
} else {
// ok is false. For some reason, the tag was returned but event was not
// successful. In this case, request again unless we are shutting down
if (!IsShutdown()) {
sync_req->Request(server_->c_server(), server_cq_->cq());
}
} }
// TODO (sreek): If ok == false, log an error
} }
void AddSyncMethod(RpcServiceMethod* method, void* tag) { void AddSyncMethod(RpcServiceMethod* method, void* tag) {
...@@ -395,7 +406,15 @@ Server::~Server() { ...@@ -395,7 +406,15 @@ Server::~Server() {
lock.unlock(); lock.unlock();
Shutdown(); Shutdown();
} else if (!started_) { } else if (!started_) {
// TODO (sreek): Shutdown all cqs // TODO (sreek): Check if we can just do this once in ~Server() (i.e
// Do not 'shutdown' queues in Shutdown() function and do it here in the
// destructor
for (auto it = sync_server_cqs_->begin(); it != sync_server_cqs_->end();
it++) {
(*it).Shutdown();
}
// TODO (sreek) Delete this
/* /*
cq_.Shutdown(); cq_.Shutdown();
*/ */
...@@ -511,7 +530,7 @@ bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { ...@@ -511,7 +530,7 @@ bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
(*it)->Start(); (*it)->Start();
} }
/* TODO (Sreek) - Do this for all cqs */ /* TODO (Sreek) - No longer needed (being done in (*it)->Start above) */
/* /*
// Start processing rpcs. // Start processing rpcs.
if (!sync_methods_->empty()) { if (!sync_methods_->empty()) {
...@@ -527,7 +546,7 @@ bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { ...@@ -527,7 +546,7 @@ bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
return true; return true;
} }
// TODO (sreek) - Reimplement this /* TODO (sreek) check if started_ and shutdown_ are needed anymore */
void Server::ShutdownInternal(gpr_timespec deadline) { void Server::ShutdownInternal(gpr_timespec deadline) {
grpc::unique_lock<grpc::mutex> lock(mu_); grpc::unique_lock<grpc::mutex> lock(mu_);
if (started_ && !shutdown_) { if (started_ && !shutdown_) {
...@@ -564,7 +583,8 @@ void Server::ShutdownInternal(gpr_timespec deadline) { ...@@ -564,7 +583,8 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
} }
// Shutdown the completion queues // Shutdown the completion queues
// TODO (sreek) Move this into SyncRequestManager // TODO (sreek) Move this into SyncRequestManager (or move it to Server
// destructor)
for (auto it = sync_server_cqs_->begin(); it != sync_server_cqs_->end(); for (auto it = sync_server_cqs_->begin(); it != sync_server_cqs_->end();
it++) { it++) {
(*it).Shutdown(); (*it).Shutdown();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment