From 74e6e135e90b1ab47d14f9f835d90e93ba59f533 Mon Sep 17 00:00:00 2001
From: vjpai <vpai@google.com>
Date: Wed, 8 Jun 2016 13:32:16 -0700
Subject: [PATCH] I was trigger-happy with changes to qps_worker in the hope of
 improving stability. I believe that this change was unneeded and actually may
 hurt matters by holding a lock for too long. Partially undoes #5444 ,
 particular commit e7042b5

---
 test/cpp/qps/client.h | 35 ++++++++++++++++++++++++++---------
 1 file changed, 26 insertions(+), 9 deletions(-)

diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h
index 2a89eb8018..047bd16408 100644
--- a/test/cpp/qps/client.h
+++ b/test/cpp/qps/client.h
@@ -125,13 +125,15 @@ class Client {
     if (reset) {
       Histogram* to_merge = new Histogram[threads_.size()];
       for (size_t i = 0; i < threads_.size(); i++) {
-        threads_[i]->Swap(&to_merge[i]);
-        latencies.Merge(to_merge[i]);
+        threads_[i]->BeginSwap(&to_merge[i]);
       }
-      delete[] to_merge;
-
       std::unique_ptr<UsageTimer> timer(new UsageTimer);
       timer_.swap(timer);
+      for (size_t i = 0; i < threads_.size(); i++) {
+        threads_[i]->EndSwap();
+        latencies.Merge(to_merge[i]);
+      }
+      delete[] to_merge;
       timer_result = timer->Mark();
     } else {
       // merge snapshots of each thread histogram
@@ -213,6 +215,7 @@ class Client {
    public:
     Thread(Client* client, size_t idx)
         : done_(false),
+          new_stats_(nullptr),
           client_(client),
           idx_(idx),
           impl_(&Thread::ThreadFunc, this) {}
@@ -225,9 +228,16 @@ class Client {
       impl_.join();
     }
 
-    void Swap(Histogram* n) {
+    void BeginSwap(Histogram* n) {
       std::lock_guard<std::mutex> g(mu_);
-      n->Swap(&histogram_);
+      new_stats_ = n;
+    }
+
+    void EndSwap() {
+      std::unique_lock<std::mutex> g(mu_);
+      while (new_stats_ != nullptr) {
+        cv_.wait(g);
+      };
     }
 
     void MergeStatsInto(Histogram* hist) {
@@ -241,11 +251,10 @@ class Client {
 
     void ThreadFunc() {
       for (;;) {
-        // lock since the thread should only be doing one thing at a time
-        std::lock_guard<std::mutex> g(mu_);
         // run the loop body
         const bool thread_still_ok = client_->ThreadFunc(&histogram_, idx_);
-        // see if we're done
+        // lock, see if we're done
+        std::lock_guard<std::mutex> g(mu_);
         if (!thread_still_ok) {
           gpr_log(GPR_ERROR, "Finishing client thread due to RPC error");
           done_ = true;
@@ -253,11 +262,19 @@ class Client {
         if (done_) {
           return;
         }
+        // check if we're resetting stats, swap out the histogram if so
+        if (new_stats_) {
+          new_stats_->Swap(&histogram_);
+          new_stats_ = nullptr;
+          cv_.notify_one();
+        }
       }
     }
 
     std::mutex mu_;
+    std::condition_variable cv_;
     bool done_;
+    Histogram* new_stats_;
     Histogram histogram_;
     Client* client_;
     const size_t idx_;
-- 
GitLab