From 862acb9f3a42cf4bacf75ba9dd831a539c93a4f1 Mon Sep 17 00:00:00 2001
From: Sree Kuchibhotla <sreek@google.com>
Date: Mon, 26 Sep 2016 09:48:48 -0700
Subject: [PATCH] fix shutdown crash

---
 include/grpc++/server.h                |  3 --
 src/cpp/rpcmanager/grpc_rpc_manager.cc |  2 -
 src/cpp/server/server_cc.cc            | 61 +++++++++++++-------------
 3 files changed, 31 insertions(+), 35 deletions(-)

diff --git a/include/grpc++/server.h b/include/grpc++/server.h
index 6bbc265bc4..5b4cb6f214 100644
--- a/include/grpc++/server.h
+++ b/include/grpc++/server.h
@@ -208,9 +208,6 @@ class Server GRPC_FINAL : public ServerInterface, private GrpcLibraryCodegen {
   bool shutdown_;
   bool shutdown_notified_;
 
-  /// The completion queue to use for server shutdown completion notification
-  CompletionQueue shutdown_cq_;
-
   // TODO (sreek) : Remove num_running_cb_ and callback_cv_;
   // The number of threads which are running callbacks.
   // int num_running_cb_;
diff --git a/src/cpp/rpcmanager/grpc_rpc_manager.cc b/src/cpp/rpcmanager/grpc_rpc_manager.cc
index c47f76b5af..58b337da63 100644
--- a/src/cpp/rpcmanager/grpc_rpc_manager.cc
+++ b/src/cpp/rpcmanager/grpc_rpc_manager.cc
@@ -64,8 +64,6 @@ GrpcRpcManager::GrpcRpcManager(int min_pollers, int max_pollers)
 
 GrpcRpcManager::~GrpcRpcManager() {
   std::unique_lock<grpc::mutex> lock(mu_);
-  // ShutdownRpcManager() and Wait() must be called before destroying the object
-  GPR_ASSERT(shutdown_);
   GPR_ASSERT(num_threads_ == 0);
 
   CleanupCompletedThreads();
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc
index 4ab531df42..54ac25d76b 100644
--- a/src/cpp/server/server_cc.cc
+++ b/src/cpp/server/server_cc.cc
@@ -129,9 +129,7 @@ class Server::ShutdownRequest GRPC_FINAL : public CompletionQueueTag {
 
 class ShutdownTag : public CompletionQueueTag {
  public:
-  bool FinalizeResult(void** tag, bool *status) {
-    return false;
-  }
+  bool FinalizeResult(void** tag, bool* status) { return false; }
 };
 
 class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
@@ -196,9 +194,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
     cq_ = nullptr;
   }
 
-  void ResetRequest() {
-    in_flight_ = false;
-  }
+  void ResetRequest() { in_flight_ = false; }
 
   void Request(grpc_server* server, grpc_completion_queue* notify_cq) {
     GPR_ASSERT(cq_ && !in_flight_);
@@ -301,7 +297,7 @@ class Server::SyncRequestManager : public GrpcRpcManager {
         server_cq_(server_cq),
         global_callbacks_(global_callbacks) {}
 
-  static const int kRpcPollingTimeoutMsec = 500;
+  static const int kRpcPollingTimeoutMsec = 10;
 
   WorkStatus PollForWork(void** tag, bool* ok) GRPC_OVERRIDE {
     *tag = nullptr;
@@ -368,6 +364,17 @@ class Server::SyncRequestManager : public GrpcRpcManager {
     }
   }
 
+  void ShutdownAndDrainCompletionQueue() {
+    server_cq_->Shutdown();
+
+    // Drain any pending items from the queue
+    void* tag;
+    bool ok;
+    while (server_cq_->Next(&tag, &ok)) {
+      // Nothing to be done here
+    }
+  }
+
   void Start() {
     if (!sync_methods_.empty()) {
       for (auto m = sync_methods_.begin(); m != sync_methods_.end(); m++) {
@@ -420,23 +427,17 @@ Server::Server(
 
 Server::~Server() {
   {
+    // TODO (sreek) Check if we can just call Shutdown() even in case where
+    // started_ == false. This will make things much simpler
     grpc::unique_lock<grpc::mutex> lock(mu_);
     if (started_ && !shutdown_) {
       lock.unlock();
       Shutdown();
     } else if (!started_) {
-      // TODO (sreek): Check if we can just do this once in ~Server() (i.e
-      // Do not 'shutdown' queues in Shutdown() function and do it here in the
-      // destructor
-      for (auto it = sync_server_cqs_->begin(); it != sync_server_cqs_->end();
-           it++) {
-        (*it)->Shutdown();
+      // Shutdown the completion queues
+      for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
+        (*it)->ShutdownAndDrainCompletionQueue();
       }
-
-      // TODO (sreek) Delete this
-      /*
-      cq_.Shutdown();
-      */
     }
   }
 
@@ -571,8 +572,10 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
   if (started_ && !shutdown_) {
     shutdown_ = true;
 
+    /// The completion queue to use for server shutdown completion notification
+    CompletionQueue shutdown_cq;
     ShutdownTag shutdown_tag;  // Dummy shutdown tag
-    grpc_server_shutdown_and_notify(server_, shutdown_cq_.cq(), &shutdown_tag);
+    grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag);
 
     // Shutdown all RpcManagers. This will try to gracefully stop all the
     // threads in the RpcManagers (once they process any inflight requests)
@@ -580,16 +583,15 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
       (*it)->ShutdownRpcManager();
     }
 
-    shutdown_cq_.Shutdown();
+    shutdown_cq.Shutdown();
 
     void* tag;
     bool ok;
     CompletionQueue::NextStatus status =
-        shutdown_cq_.AsyncNext(&tag, &ok, deadline);
+        shutdown_cq.AsyncNext(&tag, &ok, deadline);
 
-    // If this timed out, it means we are done with the grace-period for
-    // a clean shutdown. We should force a shutdown now by cancelling all
-    // inflight calls
+    // If this timed out, it means we are done with the grace period for a clean
+    // shutdown. We should force a shutdown now by cancelling all inflight calls
     if (status == CompletionQueue::NextStatus::TIMEOUT) {
       grpc_server_cancel_all_calls(server_);
     }
@@ -599,14 +601,13 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
     // Wait for threads in all RpcManagers to terminate
     for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
       (*it)->Wait();
+      (*it)->ShutdownAndDrainCompletionQueue();
     }
 
-    // Shutdown the completion queues
-    // TODO (sreek) Move this into SyncRequestManager (or move it to Server
-    // destructor)
-    for (auto it = sync_server_cqs_->begin(); it != sync_server_cqs_->end();
-         it++) {
-      (*it)->Shutdown();
+    // Drain the shutdown queue (if the previous call to AsyncNext() timed out
+    // and we didn't remove the tag from the queue yet)
+    while(shutdown_cq.Next(&tag, &ok)) {
+      // Nothing to be done here
     }
 
     /*
-- 
GitLab