From 924d459c271acb6cf940da8183bcb8eaacb6b662 Mon Sep 17 00:00:00 2001
From: vjpai <vpai@google.com>
Date: Tue, 2 Jun 2015 10:26:52 -0700
Subject: [PATCH] Fix timer issues

---
 test/cpp/qps/client.h        | 15 ++++++++-------
 test/cpp/qps/client_async.cc | 21 ++++++++++++---------
 test/cpp/qps/client_sync.cc  |  2 +-
 3 files changed, 21 insertions(+), 17 deletions(-)

diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h
index 2b227ec909..dd37b88fb4 100644
--- a/test/cpp/qps/client.h
+++ b/test/cpp/qps/client.h
@@ -45,6 +45,9 @@
 namespace grpc {
 namespace testing {
 
+typedef std::chrono::system_clock grpc_time_source;
+typedef std::chrono::time_point<grpc_time_source> grpc_time;
+
 class Client {
  public:
   explicit Client(const ClientConfig& config) : timer_(new Timer),
@@ -145,19 +148,18 @@ class Client {
 
       interarrival_timer_.init(*random_dist, num_threads);
       for (size_t i = 0; i<num_threads; i++) {
-        next_time_.push_back(std::chrono::high_resolution_clock::now()
-                             + interarrival_timer_(i));
+        next_time_.push_back(grpc_time_source::now() +
+			     std::chrono::duration_cast<grpc_time_source::duration>(interarrival_timer_(i)));
       }
     }
   }
-  template<class Timepoint>
-    bool NextIssueTime(int thread_idx, Timepoint *time_delay) {
+  bool NextIssueTime(int thread_idx, grpc_time *time_delay) {
     if (closed_loop_) {
       return false;
     }
     else {
       *time_delay = next_time_[thread_idx];
-      next_time_[thread_idx] += interarrival_timer_(thread_idx);
+      next_time_[thread_idx] += std::chrono::duration_cast<grpc_time_source::duration>(interarrival_timer_(thread_idx));
       return true;
     }
   }
@@ -226,8 +228,7 @@ class Client {
   std::unique_ptr<Timer> timer_;
 
   InterarrivalTimer interarrival_timer_;
-  std::vector<std::chrono::time_point
-             <std::chrono::high_resolution_clock>> next_time_;
+  std::vector<grpc_time> next_time_;
 };
 
 std::unique_ptr<Client> CreateSynchronousUnaryClient(const ClientConfig& args);
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index 2d23192767..bd77424578 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -32,8 +32,10 @@
  */
 
 #include <cassert>
+#include <forward_list>
 #include <functional>
 #include <memory>
+#include <mutex>
 #include <string>
 #include <thread>
 #include <vector>
@@ -55,8 +57,6 @@
 namespace grpc {
 namespace testing {
 
-typedef std::chrono::high_resolution_clock grpc_time_source;
-typedef std::chrono::time_point<grpc_time_source> grpc_time;
 typedef std::forward_list<grpc_time> deadline_list;
 
 class ClientRpcContext {
@@ -98,7 +98,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
   }
   void Start() GRPC_OVERRIDE {
     start_ = Timer::Now();
-    response_reader_.reset(start_req(stub_, &context_, req_));
+    response_reader_ = start_req_(stub_, &context_, req_);
     response_reader_->Finish(&response_, &status_, ClientRpcContext::tag(this));
   }
   ~ClientRpcContextUnaryImpl() GRPC_OVERRIDE {}
@@ -142,7 +142,7 @@ class AsyncClient : public Client {
   explicit AsyncClient(const ClientConfig& config,
 		       std::function<ClientRpcContext*(CompletionQueue*, TestService::Stub*,
 					  const SimpleRequest&)> setup_ctx) :
-      Client(config) {
+    Client(config), channel_rpc_lock_(config.client_channels()) {
     for (int i = 0; i < config.async_client_threads(); i++) {
       cli_cqs_.emplace_back(new CompletionQueue);
       if (!closed_loop_) {
@@ -158,7 +158,6 @@ class AsyncClient : public Client {
     if (!closed_loop_) {
       for (auto channel = channels_.begin(); channel != channels_.end();
 	   channel++) {
-	channel_rpc_lock_.emplace_back();
 	rpcs_outstanding_.push_back(0);
       }
     }
@@ -202,6 +201,9 @@ class AsyncClient : public Client {
       short_deadline = issue_allowed_[thread_idx] ?
 	next_issue_[thread_idx] : deadline;
     }
+    
+    bool got_event;
+
     switch (cli_cqs_[thread_idx]->AsyncNext(&got_tag, &ok, short_deadline)) {
       case CompletionQueue::SHUTDOWN: return false;
       case CompletionQueue::TIMEOUT:
@@ -232,15 +234,16 @@ class AsyncClient : public Client {
      bool issued = false;
      for (int num_attempts = 0; num_attempts < channel_count_ && !issued;
 	  num_attempts++, next_channel_[thread_idx] = (next_channel_[thread_idx]+1)%channel_count_) {
-       std::lock_guard g(channel_rpc_lock_[next_channel_[thread_idx]]);
-       if (rpcs_outstanding[next_channel_[thread_idx]] < max_outstanding_per_channel_) {
+       std::lock_guard<std::mutex>
+	 g(channel_rpc_lock_[next_channel_[thread_idx]]);
+       if (rpcs_outstanding_[next_channel_[thread_idx]] < max_outstanding_per_channel_) {
 	 // do the work to issue
-	 rpcs_outstanding[next_channel_[thread_idx]]++;
+	 rpcs_outstanding_[next_channel_[thread_idx]]++;
 	 issued = true;
        }
      }
      if (!issued)
-       issue_allowed = false;   
+       issue_allowed_[thread_idx] = false;   
    }
    return true;
   }
diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc
index 98297d3abb..d1682caf06 100644
--- a/test/cpp/qps/client_sync.cc
+++ b/test/cpp/qps/client_sync.cc
@@ -77,7 +77,7 @@ class SynchronousClient : public Client {
 
  protected:
   void WaitToIssue(int thread_idx) {
-    std::chrono::time_point<std::chrono::high_resolution_clock> next_time;
+    grpc_time next_time;
     if (NextIssueTime(thread_idx, &next_time)) {
       std::this_thread::sleep_until(next_time);
     }
-- 
GitLab