From f782465fba11864293d858ba91d5e715fc481d7d Mon Sep 17 00:00:00 2001
From: Vijay Pai <vpai@ubivpai.dls.corp.google.com>
Date: Fri, 8 Jul 2016 10:33:10 -0700
Subject: [PATCH] Fix some shutdown errors related to CQ/join ordering

---
 test/cpp/qps/client_async.cc | 43 +++++++++++++++++++++++++-----------
 test/cpp/qps/qps_worker.cc   |  2 ++
 2 files changed, 32 insertions(+), 13 deletions(-)

diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index 963a1e1cd0..057e5a0d6b 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -189,14 +189,7 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
     }
   }
   virtual ~AsyncClient() {
-    for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
-      (*cq)->Shutdown();
-      void* got_tag;
-      bool ok;
-      while ((*cq)->Next(&got_tag, &ok)) {
-        delete ClientRpcContext::detag(got_tag);
-      }
-    }
+    FinalShutdownCQs();
   }
 
   bool ThreadFunc(HistogramEntry* entry,
@@ -216,14 +209,29 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
         delete ctx;
       }
       return true;
-    } else {  // queue is shutting down
-      return false;
+    } else {  // queue is shutting down, so we must be done
+      return true;
     }
   }
 
  protected:
   const int num_async_threads_;
 
+  void ShutdownCQs() {
+    for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
+      (*cq)->Shutdown();
+    }
+  }
+  void FinalShutdownCQs() {
+    for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
+      void* got_tag;
+      bool ok;
+      while ((*cq)->Next(&got_tag, &ok)) {
+        delete ClientRpcContext::detag(got_tag);
+      }
+    }
+  }
+
  private:
   int NumThreads(const ClientConfig& config) {
     int num_threads = config.async_client_threads();
@@ -251,7 +259,10 @@ class AsyncUnaryClient GRPC_FINAL
             config, SetupCtx, BenchmarkStubCreator) {
     StartThreads(num_async_threads_);
   }
-  ~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); }
+  ~AsyncUnaryClient() GRPC_OVERRIDE {
+    ShutdownCQs();
+    EndThreads();
+  }
 
  private:
   static void CheckDone(grpc::Status s, SimpleResponse* response) {}
@@ -380,7 +391,10 @@ class AsyncStreamingClient GRPC_FINAL
     StartThreads(num_async_threads_);
   }
 
-  ~AsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); }
+  ~AsyncStreamingClient() GRPC_OVERRIDE {
+    ShutdownCQs();
+    EndThreads();
+  }
 
  private:
   static void CheckDone(grpc::Status s, SimpleResponse* response) {}
@@ -516,7 +530,10 @@ class GenericAsyncStreamingClient GRPC_FINAL
     StartThreads(num_async_threads_);
   }
 
-  ~GenericAsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); }
+  ~GenericAsyncStreamingClient() GRPC_OVERRIDE {
+    ShutdownCQs();
+    EndThreads();
+  }
 
  private:
   static void CheckDone(grpc::Status s, ByteBuffer* response) {}
diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc
index 8456fde0ed..49ef52895c 100644
--- a/test/cpp/qps/qps_worker.cc
+++ b/test/cpp/qps/qps_worker.cc
@@ -128,6 +128,7 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
 
     ScopedProfile profile("qps_client.prof", false);
     Status ret = RunClientBody(ctx, stream);
+    gpr_log(GPR_INFO, "RunClient: Returning");
     return ret;
   }
 
@@ -141,6 +142,7 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
 
     ScopedProfile profile("qps_server.prof", false);
     Status ret = RunServerBody(ctx, stream);
+    gpr_log(GPR_INFO, "RunServer: Returning");
     return ret;
   }
 
-- 
GitLab