From 1132c6b7db990d83fd178c927c39d0b67fa02778 Mon Sep 17 00:00:00 2001
From: Vijay Pai <vpai@google.com>
Date: Thu, 11 Feb 2016 06:05:24 -0800
Subject: [PATCH] Simplify and properly implement openloop testing. Revive it
 in tests as well

---
 Makefile                              |   2 +
 build.yaml                            |   1 -
 test/cpp/qps/client.h                 |  44 +--
 test/cpp/qps/client_async.cc          | 441 +++++++++++---------------
 test/cpp/qps/client_sync.cc           |   7 +-
 test/cpp/qps/interarrival.h           |   9 +-
 test/cpp/qps/qps_interarrival_test.cc |   2 +-
 test/cpp/qps/qps_openloop_test.cc     |   2 +-
 test/cpp/qps/qps_test.cc              |   2 +-
 tools/run_tests/tests.json            |  18 ++
 10 files changed, 218 insertions(+), 310 deletions(-)

diff --git a/Makefile b/Makefile
index 3c215b35b6..b377448581 100644
--- a/Makefile
+++ b/Makefile
@@ -1597,6 +1597,8 @@ test_cxx: test_zookeeper buildtests_cxx
 	$(Q) $(BINDIR)/$(CONFIG)/interop_test || ( echo test interop_test failed ; exit 1 )
 	$(E) "[RUN]     Testing mock_test"
 	$(Q) $(BINDIR)/$(CONFIG)/mock_test || ( echo test mock_test failed ; exit 1 )
+	$(E) "[RUN]     Testing qps_openloop_test"
+	$(Q) $(BINDIR)/$(CONFIG)/qps_openloop_test || ( echo test qps_openloop_test failed ; exit 1 )
 	$(E) "[RUN]     Testing qps_test"
 	$(Q) $(BINDIR)/$(CONFIG)/qps_test || ( echo test qps_test failed ; exit 1 )
 	$(E) "[RUN]     Testing secure_auth_context_test"
diff --git a/build.yaml b/build.yaml
index 7f33ef3f0e..eacd2f0c23 100644
--- a/build.yaml
+++ b/build.yaml
@@ -2299,7 +2299,6 @@ targets:
   - posix
 - name: qps_openloop_test
   build: test
-  run: false
   language: c++
   src:
   - test/cpp/qps/qps_openloop_test.cc
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h
index f62afffa47..fac610a32b 100644
--- a/test/cpp/qps/client.h
+++ b/test/cpp/qps/client.h
@@ -41,6 +41,7 @@
 #include <grpc++/support/byte_buffer.h>
 #include <grpc++/support/slice.h>
 #include <grpc/support/log.h>
+#include <grpc/support/time.h>
 
 #include "src/proto/grpc/testing/payloads.grpc.pb.h"
 #include "src/proto/grpc/testing/services.grpc.pb.h"
@@ -52,27 +53,8 @@
 #include "test/cpp/util/create_test_channel.h"
 
 namespace grpc {
-
-#if defined(__APPLE__)
-// Specialize Timepoint for high res clock as we need that
-template <>
-class TimePoint<std::chrono::high_resolution_clock::time_point> {
- public:
-  TimePoint(const std::chrono::high_resolution_clock::time_point& time) {
-    TimepointHR2Timespec(time, &time_);
-  }
-  gpr_timespec raw_time() const { return time_; }
-
- private:
-  gpr_timespec time_;
-};
-#endif
-
 namespace testing {
 
-typedef std::chrono::high_resolution_clock grpc_time_source;
-typedef std::chrono::time_point<grpc_time_source> grpc_time;
-
 template <class RequestType>
 class ClientRequestCreator {
  public:
@@ -218,26 +200,20 @@ class Client {
       closed_loop_ = false;
       // set up interarrival timer according to random dist
       interarrival_timer_.init(*random_dist, num_threads);
-      auto now = grpc_time_source::now();
+      auto now = gpr_now(GPR_CLOCK_MONOTONIC);
       for (size_t i = 0; i < num_threads; i++) {
         next_time_.push_back(
-            now +
-            std::chrono::duration_cast<grpc_time_source::duration>(
-                interarrival_timer_.next(i)));
+            gpr_time_add(now, gpr_time_from_nanos(interarrival_timer_.next(i), GPR_TIMESPAN)));
       }
     }
   }
 
-  bool NextIssueTime(int thread_idx, grpc_time* time_delay) {
-    if (closed_loop_) {
-      return false;
-    } else {
-      *time_delay = next_time_[thread_idx];
-      next_time_[thread_idx] +=
-          std::chrono::duration_cast<grpc_time_source::duration>(
-              interarrival_timer_.next(thread_idx));
-      return true;
-    }
+  gpr_timespec NextIssueTime(int thread_idx) {
+    gpr_timespec result = next_time_[thread_idx];
+    next_time_[thread_idx] =
+        gpr_time_add(next_time_[thread_idx],
+                     gpr_time_from_nanos(interarrival_timer_.next(thread_idx), GPR_TIMESPAN));
+    return result;
   }
 
  private:
@@ -315,7 +291,7 @@ class Client {
   std::unique_ptr<Timer> timer_;
 
   InterarrivalTimer interarrival_timer_;
-  std::vector<grpc_time> next_time_;
+  std::vector<gpr_timespec> next_time_;
 };
 
 template <class StubType, class RequestType>
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index f3f8f37051..7a09dd27a4 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -43,9 +43,9 @@
 #include <vector>
 
 #include <gflags/gflags.h>
+#include <grpc++/alarm.h>
 #include <grpc++/channel.h>
 #include <grpc++/client_context.h>
-#include <grpc++/client_context.h>
 #include <grpc++/generic/generic_stub.h>
 #include <grpc/grpc.h>
 #include <grpc/support/cpu.h>
@@ -60,11 +60,9 @@
 namespace grpc {
 namespace testing {
 
-typedef std::list<grpc_time> deadline_list;
-
 class ClientRpcContext {
  public:
-  explicit ClientRpcContext(int ch) : channel_id_(ch) {}
+  ClientRpcContext() {}
   virtual ~ClientRpcContext() {}
   // next state, return false if done. Collect stats when appropriate
   virtual bool RunNextState(bool, Histogram* hist) = 0;
@@ -74,72 +72,72 @@ class ClientRpcContext {
     return reinterpret_cast<ClientRpcContext*>(t);
   }
 
-  deadline_list::iterator deadline_posn() const { return deadline_posn_; }
-  void set_deadline_posn(const deadline_list::iterator& it) {
-    deadline_posn_ = it;
-  }
   virtual void Start(CompletionQueue* cq) = 0;
-  int channel_id() const { return channel_id_; }
-
- protected:
-  int channel_id_;
-
- private:
-  deadline_list::iterator deadline_posn_;
 };
 
 template <class RequestType, class ResponseType>
 class ClientRpcContextUnaryImpl : public ClientRpcContext {
  public:
   ClientRpcContextUnaryImpl(
-      int channel_id, BenchmarkService::Stub* stub, const RequestType& req,
+      BenchmarkService::Stub* stub, const RequestType& req,
+      std::function<gpr_timespec()> next_issue,
       std::function<
-          std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
-              BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
-              CompletionQueue*)> start_req,
+      std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
+          BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
+          CompletionQueue*)> start_req,
       std::function<void(grpc::Status, ResponseType*)> on_done)
-      : ClientRpcContext(channel_id),
-        context_(),
+      : context_(),
         stub_(stub),
+        cq_(nullptr),
         req_(req),
         response_(),
-        next_state_(&ClientRpcContextUnaryImpl::RespDone),
-        callback_(on_done),
-        start_req_(start_req) {}
+        next_state_(State::READY),
+    callback_(on_done),
+    next_issue_(next_issue),
+    start_req_(start_req) {}
+  ~ClientRpcContextUnaryImpl() GRPC_OVERRIDE {}
   void Start(CompletionQueue* cq) GRPC_OVERRIDE {
-    start_ = Timer::Now();
-    response_reader_ = start_req_(stub_, &context_, req_, cq);
-    response_reader_->Finish(&response_, &status_, ClientRpcContext::tag(this));
+    cq_ = cq;
+    if (!next_issue_) { // ready to issue
+      RunNextState(true, nullptr);
+    } else { // wait for the issue time
+      alarm_.reset(new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this)));
+    }
   }
-  ~ClientRpcContextUnaryImpl() GRPC_OVERRIDE {}
   bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
-    bool ret = (this->*next_state_)(ok);
-    if (!ret) {
-      hist->Add((Timer::Now() - start_) * 1e9);
+    switch (next_state_) {
+      case State::READY:
+        start_ = Timer::Now();
+        response_reader_ = start_req_(stub_, &context_, req_, cq_);
+        response_reader_->Finish(&response_, &status_,
+                                 ClientRpcContext::tag(this));
+        next_state_ = State::RESP_DONE;
+        return true;
+      case State::RESP_DONE:
+        hist->Add((Timer::Now() - start_) * 1e9);
+        callback_(status_, &response_);
+        next_state_ = State::INVALID;
+        return false;
+      default:
+        GPR_ASSERT(false);
+        return false;
     }
-    return ret;
   }
-
   ClientRpcContext* StartNewClone() GRPC_OVERRIDE {
-    return new ClientRpcContextUnaryImpl(channel_id_, stub_, req_, start_req_,
-                                         callback_);
+    return new ClientRpcContextUnaryImpl(stub_, req_, next_issue_,
+                                         start_req_, callback_);
   }
-
  private:
-  bool RespDone(bool) {
-    next_state_ = &ClientRpcContextUnaryImpl::DoCallBack;
-    return false;
-  }
-  bool DoCallBack(bool) {
-    callback_(status_, &response_);
-    return true;  // we're done, this'll be ignored
-  }
   grpc::ClientContext context_;
   BenchmarkService::Stub* stub_;
+  CompletionQueue *cq_;
+  std::unique_ptr<Alarm> alarm_;
   RequestType req_;
   ResponseType response_;
-  bool (ClientRpcContextUnaryImpl::*next_state_)(bool);
+  enum State {INVALID, READY, RESP_DONE};
+  State next_state_;
   std::function<void(grpc::Status, ResponseType*)> callback_;
+  std::function<gpr_timespec()> next_issue_;
   std::function<std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
       BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
       CompletionQueue*)> start_req_;
@@ -157,49 +155,37 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
   // member name resolution until the template types are fully resolved
  public:
   using Client::SetupLoadTest;
-  using Client::NextIssueTime;
   using Client::closed_loop_;
   using ClientImpl<StubType, RequestType>::cores_;
   using ClientImpl<StubType, RequestType>::channels_;
   using ClientImpl<StubType, RequestType>::request_;
   AsyncClient(const ClientConfig& config,
-              std::function<ClientRpcContext*(int, StubType*,
+              std::function<ClientRpcContext*(StubType*,
+                                              std::function<gpr_timespec()> next_issue,
                                               const RequestType&)> setup_ctx,
               std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)>
                   create_stub)
       : ClientImpl<StubType, RequestType>(config, create_stub),
-        num_async_threads_(NumThreads(config)),
-        channel_lock_(new std::mutex[config.client_channels()]),
-        contexts_(config.client_channels()),
-        max_outstanding_per_channel_(config.outstanding_rpcs_per_channel()),
-        channel_count_(config.client_channels()),
-        pref_channel_inc_(num_async_threads_) {
+        num_async_threads_(NumThreads(config)) {
     SetupLoadTest(config, num_async_threads_);
 
     for (int i = 0; i < num_async_threads_; i++) {
       cli_cqs_.emplace_back(new CompletionQueue);
-      if (!closed_loop_) {
-        rpc_deadlines_.emplace_back();
-        next_channel_.push_back(i % channel_count_);
-        issue_allowed_.emplace_back(true);
-
-        grpc_time next_issue;
-        NextIssueTime(i, &next_issue);
-        next_issue_.push_back(next_issue);
-      }
     }
 
+    using namespace std::placeholders;
     int t = 0;
     for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
-      for (int ch = 0; ch < channel_count_; ch++) {
+      for (int ch = 0; ch < config.client_channels(); ch++) {
         auto* cq = cli_cqs_[t].get();
-        t = (t + 1) % cli_cqs_.size();
-        auto ctx = setup_ctx(ch, channels_[ch].get_stub(), request_);
-        if (closed_loop_) {
-          ctx->Start(cq);
-        } else {
-          contexts_[ch].push_front(ctx);
+        std::function<gpr_timespec()> next_issue;
+        if (!closed_loop_) {
+          next_issue = std::bind(&Client::NextIssueTime, this, t);
         }
+        auto ctx = setup_ctx(channels_[ch].get_stub(),
+                             next_issue, request_);
+        ctx->Start(cq);
+        t = (t + 1) % cli_cqs_.size();
       }
     }
   }
@@ -212,44 +198,17 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
         delete ClientRpcContext::detag(got_tag);
       }
     }
-    // Now clear out all the pre-allocated idle contexts
-    for (int ch = 0; ch < channel_count_; ch++) {
-      while (!contexts_[ch].empty()) {
-        // Get an idle context from the front of the list
-        auto* ctx = *(contexts_[ch].begin());
-        contexts_[ch].pop_front();
-        delete ctx;
-      }
-    }
-    delete[] channel_lock_;
   }
 
   bool ThreadFunc(Histogram* histogram,
                   size_t thread_idx) GRPC_OVERRIDE GRPC_FINAL {
     void* got_tag;
     bool ok;
-    grpc_time deadline, short_deadline;
-    if (closed_loop_) {
-      deadline = grpc_time_source::now() + std::chrono::seconds(1);
-      short_deadline = deadline;
-    } else {
-      if (rpc_deadlines_[thread_idx].empty()) {
-        deadline = grpc_time_source::now() + std::chrono::seconds(1);
-      } else {
-        deadline = *(rpc_deadlines_[thread_idx].begin());
-      }
-      short_deadline =
-          issue_allowed_[thread_idx] ? next_issue_[thread_idx] : deadline;
-    }
-
     bool got_event;
 
-    switch (cli_cqs_[thread_idx]->AsyncNext(&got_tag, &ok, short_deadline)) {
+    switch (cli_cqs_[thread_idx]->Next(&got_tag, &ok)) {
       case CompletionQueue::SHUTDOWN:
         return false;
-      case CompletionQueue::TIMEOUT:
-        got_event = false;
-        break;
       case CompletionQueue::GOT_EVENT:
         got_event = true;
         break;
@@ -260,71 +219,12 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
     if (got_event) {
       ClientRpcContext* ctx = ClientRpcContext::detag(got_tag);
       if (ctx->RunNextState(ok, histogram) == false) {
-        // call the callback and then clone the ctx
-        ctx->RunNextState(ok, histogram);
+        // The RPC and callback are done, so clone the ctx
         ClientRpcContext* clone_ctx = ctx->StartNewClone();
-        if (closed_loop_) {
-          clone_ctx->Start(cli_cqs_[thread_idx].get());
-        } else {
-          // Remove the entry from the rpc deadlines list
-          rpc_deadlines_[thread_idx].erase(ctx->deadline_posn());
-          // Put the clone_ctx in the list of idle contexts for this channel
-          // Under lock
-          int ch = clone_ctx->channel_id();
-          std::lock_guard<std::mutex> g(channel_lock_[ch]);
-          contexts_[ch].push_front(clone_ctx);
-        }
+        clone_ctx->Start(cli_cqs_[thread_idx].get());
         // delete the old version
         delete ctx;
       }
-      if (!closed_loop_)
-        issue_allowed_[thread_idx] =
-            true;  // may be ok now even if it hadn't been
-    }
-    if (!closed_loop_ && issue_allowed_[thread_idx] &&
-        grpc_time_source::now() >= next_issue_[thread_idx]) {
-      // Attempt to issue
-      bool issued = false;
-      for (int num_attempts = 0, channel_attempt = next_channel_[thread_idx];
-           num_attempts < channel_count_ && !issued; num_attempts++) {
-        bool can_issue = false;
-        ClientRpcContext* ctx = nullptr;
-        {
-          std::lock_guard<std::mutex> g(channel_lock_[channel_attempt]);
-          if (!contexts_[channel_attempt].empty()) {
-            // Get an idle context from the front of the list
-            ctx = *(contexts_[channel_attempt].begin());
-            contexts_[channel_attempt].pop_front();
-            can_issue = true;
-          }
-        }
-        if (can_issue) {
-          // do the work to issue
-          rpc_deadlines_[thread_idx].emplace_back(grpc_time_source::now() +
-                                                  std::chrono::seconds(1));
-          auto it = rpc_deadlines_[thread_idx].end();
-          --it;
-          ctx->set_deadline_posn(it);
-          ctx->Start(cli_cqs_[thread_idx].get());
-          issued = true;
-          // If we did issue, then next time, try our thread's next
-          // preferred channel
-          next_channel_[thread_idx] += pref_channel_inc_;
-          if (next_channel_[thread_idx] >= channel_count_)
-            next_channel_[thread_idx] = (thread_idx % channel_count_);
-        } else {
-          // Do a modular increment of channel attempt if we couldn't issue
-          channel_attempt = (channel_attempt + 1) % channel_count_;
-        }
-      }
-      if (issued) {
-        // We issued one; see when we can issue the next
-        grpc_time next_issue;
-        NextIssueTime(thread_idx, &next_issue);
-        next_issue_[thread_idx] = next_issue;
-      } else {
-        issue_allowed_[thread_idx] = false;
-      }
     }
     return true;
   }
@@ -333,19 +233,6 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
   int num_async_threads_;
 
  private:
-  class boolean {  // exists only to avoid data-race on vector<bool>
-   public:
-    boolean() : val_(false) {}
-    boolean(bool b) : val_(b) {}
-    operator bool() const { return val_; }
-    boolean& operator=(bool b) {
-      val_ = b;
-      return *this;
-    }
-
-   private:
-    bool val_;
-  };
   int NumThreads(const ClientConfig& config) {
     int num_threads = config.async_client_threads();
     if (num_threads <= 0) {  // Use dynamic sizing
@@ -356,18 +243,6 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
   }
 
   std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;
-
-  std::vector<deadline_list> rpc_deadlines_;  // per thread deadlines
-  std::vector<int> next_channel_;       // per thread round-robin channel ctr
-  std::vector<boolean> issue_allowed_;  // may this thread attempt to issue
-  std::vector<grpc_time> next_issue_;   // when should it issue?
-
-  std::mutex*
-      channel_lock_;  // a vector, but avoid std::vector for old compilers
-  std::vector<context_list> contexts_;  // per-channel list of idle contexts
-  int max_outstanding_per_channel_;
-  int channel_count_;
-  int pref_channel_inc_;
 };
 
 static std::unique_ptr<BenchmarkService::Stub> BenchmarkStubCreator(
@@ -391,11 +266,11 @@ class AsyncUnaryClient GRPC_FINAL
            const SimpleRequest& request, CompletionQueue* cq) {
     return stub->AsyncUnaryCall(ctx, request, cq);
   };
-  static ClientRpcContext* SetupCtx(int channel_id,
-                                    BenchmarkService::Stub* stub,
+  static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub,
+                                    std::function<gpr_timespec()> next_issue,
                                     const SimpleRequest& req) {
     return new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
-        channel_id, stub, req, AsyncUnaryClient::StartReq,
+        stub, req, next_issue, AsyncUnaryClient::StartReq,
         AsyncUnaryClient::CheckDone);
   }
 };
@@ -404,62 +279,86 @@ template <class RequestType, class ResponseType>
 class ClientRpcContextStreamingImpl : public ClientRpcContext {
  public:
   ClientRpcContextStreamingImpl(
-      int channel_id, BenchmarkService::Stub* stub, const RequestType& req,
+      BenchmarkService::Stub* stub, const RequestType& req,
+      std::function<gpr_timespec()> next_issue,
       std::function<std::unique_ptr<
           grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(
           BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*,
           void*)> start_req,
       std::function<void(grpc::Status, ResponseType*)> on_done)
-      : ClientRpcContext(channel_id),
-        context_(),
+      : context_(),
         stub_(stub),
+        cq_(nullptr),
         req_(req),
         response_(),
-        next_state_(&ClientRpcContextStreamingImpl::ReqSent),
+        next_state_(State::INVALID),
         callback_(on_done),
+    next_issue_(next_issue),
         start_req_(start_req),
         start_(Timer::Now()) {}
   ~ClientRpcContextStreamingImpl() GRPC_OVERRIDE {}
+  void Start(CompletionQueue* cq) GRPC_OVERRIDE {
+    cq_ = cq;
+    stream_ = start_req_(stub_, &context_, cq, ClientRpcContext::tag(this));
+    next_state_ = State::STREAM_IDLE;
+  }
   bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
-    return (this->*next_state_)(ok, hist);
+    while (true) {
+      switch (next_state_) {
+        case State::STREAM_IDLE:
+          if (!next_issue_) { // ready to issue
+            next_state_ = State::READY_TO_WRITE;
+          } else {
+            next_state_ = State::WAIT;
+          }
+          break; // loop around, don't return
+        case State::WAIT:
+          alarm_.reset(new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this)));
+          next_state_ = State::READY_TO_WRITE;
+          return true;
+        case State::READY_TO_WRITE:
+          if (!ok) {
+            return false;
+          }
+          start_ = Timer::Now();
+          next_state_ = State::WRITE_DONE;
+          stream_->Write(req_, ClientRpcContext::tag(this));
+          return true;
+        case State::WRITE_DONE:
+          if (!ok) {
+            return false;
+          }
+          next_state_ = State::READ_DONE;
+          stream_->Read(&response_, ClientRpcContext::tag(this));
+          return true;
+          break;
+        case State::READ_DONE:
+          hist->Add((Timer::Now() - start_) * 1e9);
+          callback_(status_, &response_);
+          next_state_ = State::STREAM_IDLE;
+          break; // loop around
+        default:
+          GPR_ASSERT(false);
+          return false;
+      }
+    }
   }
   ClientRpcContext* StartNewClone() GRPC_OVERRIDE {
-    return new ClientRpcContextStreamingImpl(channel_id_, stub_, req_,
+    return new ClientRpcContextStreamingImpl(stub_, req_, next_issue_,
                                              start_req_, callback_);
   }
-  void Start(CompletionQueue* cq) GRPC_OVERRIDE {
-    stream_ = start_req_(stub_, &context_, cq, ClientRpcContext::tag(this));
-  }
 
  private:
-  bool ReqSent(bool ok, Histogram*) { return StartWrite(ok); }
-  bool StartWrite(bool ok) {
-    if (!ok) {
-      return (false);
-    }
-    start_ = Timer::Now();
-    next_state_ = &ClientRpcContextStreamingImpl::WriteDone;
-    stream_->Write(req_, ClientRpcContext::tag(this));
-    return true;
-  }
-  bool WriteDone(bool ok, Histogram*) {
-    if (!ok) {
-      return (false);
-    }
-    next_state_ = &ClientRpcContextStreamingImpl::ReadDone;
-    stream_->Read(&response_, ClientRpcContext::tag(this));
-    return true;
-  }
-  bool ReadDone(bool ok, Histogram* hist) {
-    hist->Add((Timer::Now() - start_) * 1e9);
-    return StartWrite(ok);
-  }
   grpc::ClientContext context_;
   BenchmarkService::Stub* stub_;
+  CompletionQueue *cq_;
+  std::unique_ptr<Alarm> alarm_;
   RequestType req_;
   ResponseType response_;
-  bool (ClientRpcContextStreamingImpl::*next_state_)(bool, Histogram*);
+  enum State {INVALID, STREAM_IDLE, WAIT, READY_TO_WRITE, WRITE_DONE, READ_DONE};
+  State next_state_;
   std::function<void(grpc::Status, ResponseType*)> callback_;
+  std::function<gpr_timespec()> next_issue_;
   std::function<
       std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(
           BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*,
@@ -475,9 +374,6 @@ class AsyncStreamingClient GRPC_FINAL
  public:
   explicit AsyncStreamingClient(const ClientConfig& config)
       : AsyncClient(config, SetupCtx, BenchmarkStubCreator) {
-    // async streaming currently only supports closed loop
-    GPR_ASSERT(closed_loop_);
-
     StartThreads(num_async_threads_);
   }
 
@@ -492,11 +388,11 @@ class AsyncStreamingClient GRPC_FINAL
     auto stream = stub->AsyncStreamingCall(ctx, cq, tag);
     return stream;
   };
-  static ClientRpcContext* SetupCtx(int channel_id,
-                                    BenchmarkService::Stub* stub,
+  static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub,
+                                    std::function<gpr_timespec()> next_issue,
                                     const SimpleRequest& req) {
     return new ClientRpcContextStreamingImpl<SimpleRequest, SimpleResponse>(
-        channel_id, stub, req, AsyncStreamingClient::StartReq,
+        stub, req, next_issue, AsyncStreamingClient::StartReq,
         AsyncStreamingClient::CheckDone);
   }
 };
@@ -504,64 +400,87 @@ class AsyncStreamingClient GRPC_FINAL
 class ClientRpcContextGenericStreamingImpl : public ClientRpcContext {
  public:
   ClientRpcContextGenericStreamingImpl(
-      int channel_id, grpc::GenericStub* stub, const ByteBuffer& req,
+      grpc::GenericStub* stub, const ByteBuffer& req,
+      std::function<gpr_timespec()> next_issue,
       std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>(
           grpc::GenericStub*, grpc::ClientContext*,
           const grpc::string& method_name, CompletionQueue*, void*)> start_req,
       std::function<void(grpc::Status, ByteBuffer*)> on_done)
-      : ClientRpcContext(channel_id),
-        context_(),
+      : context_(),
         stub_(stub),
+        cq_(nullptr),
         req_(req),
         response_(),
-        next_state_(&ClientRpcContextGenericStreamingImpl::ReqSent),
+        next_state_(State::INVALID),
         callback_(on_done),
+    next_issue_(next_issue),
         start_req_(start_req),
         start_(Timer::Now()) {}
   ~ClientRpcContextGenericStreamingImpl() GRPC_OVERRIDE {}
-  bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
-    return (this->*next_state_)(ok, hist);
-  }
-  ClientRpcContext* StartNewClone() GRPC_OVERRIDE {
-    return new ClientRpcContextGenericStreamingImpl(channel_id_, stub_, req_,
-                                                    start_req_, callback_);
-  }
   void Start(CompletionQueue* cq) GRPC_OVERRIDE {
+    cq_ = cq;
     const grpc::string kMethodName(
         "/grpc.testing.BenchmarkService/StreamingCall");
     stream_ = start_req_(stub_, &context_, kMethodName, cq,
                          ClientRpcContext::tag(this));
+    next_state_ = State::STREAM_IDLE;
   }
-
- private:
-  bool ReqSent(bool ok, Histogram*) { return StartWrite(ok); }
-  bool StartWrite(bool ok) {
-    if (!ok) {
-      return (false);
-    }
-    start_ = Timer::Now();
-    next_state_ = &ClientRpcContextGenericStreamingImpl::WriteDone;
-    stream_->Write(req_, ClientRpcContext::tag(this));
-    return true;
-  }
-  bool WriteDone(bool ok, Histogram*) {
-    if (!ok) {
-      return (false);
+  bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
+    while (true) {
+      switch (next_state_) {
+        case State::STREAM_IDLE:
+          if (!next_issue_) { // ready to issue
+            next_state_ = State::READY_TO_WRITE;
+          } else {
+            next_state_ = State::WAIT;
+          }
+          break; // loop around, don't return
+        case State::WAIT:
+          alarm_.reset(new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this)));
+          next_state_ = State::READY_TO_WRITE;
+          return true;
+        case State::READY_TO_WRITE:
+          if (!ok) {
+            return false;
+          }
+          start_ = Timer::Now();
+          next_state_ = State::WRITE_DONE;
+          stream_->Write(req_, ClientRpcContext::tag(this));
+          return true;
+        case State::WRITE_DONE:
+          if (!ok) {
+            return false;
+          }
+          next_state_ = State::READ_DONE;
+          stream_->Read(&response_, ClientRpcContext::tag(this));
+          return true;
+          break;
+        case State::READ_DONE:
+          hist->Add((Timer::Now() - start_) * 1e9);
+          callback_(status_, &response_);
+          next_state_ = State::STREAM_IDLE;
+          break; // loop around
+        default:
+          GPR_ASSERT(false);
+          return false;
+      }
     }
-    next_state_ = &ClientRpcContextGenericStreamingImpl::ReadDone;
-    stream_->Read(&response_, ClientRpcContext::tag(this));
-    return true;
   }
-  bool ReadDone(bool ok, Histogram* hist) {
-    hist->Add((Timer::Now() - start_) * 1e9);
-    return StartWrite(ok);
+  ClientRpcContext* StartNewClone() GRPC_OVERRIDE {
+    return new ClientRpcContextGenericStreamingImpl(stub_, req_, next_issue_,
+                                                    start_req_, callback_);
   }
+ private:
   grpc::ClientContext context_;
   grpc::GenericStub* stub_;
+  CompletionQueue *cq_;
+  std::unique_ptr<Alarm> alarm_;
   ByteBuffer req_;
   ByteBuffer response_;
-  bool (ClientRpcContextGenericStreamingImpl::*next_state_)(bool, Histogram*);
+  enum State {INVALID, STREAM_IDLE, WAIT, READY_TO_WRITE, WRITE_DONE, READ_DONE};
+  State next_state_;
   std::function<void(grpc::Status, ByteBuffer*)> callback_;
+  std::function<gpr_timespec()> next_issue_;
   std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>(
       grpc::GenericStub*, grpc::ClientContext*, const grpc::string&,
       CompletionQueue*, void*)> start_req_;
@@ -580,9 +499,6 @@ class GenericAsyncStreamingClient GRPC_FINAL
  public:
   explicit GenericAsyncStreamingClient(const ClientConfig& config)
       : AsyncClient(config, SetupCtx, GenericStubCreator) {
-    // async streaming currently only supports closed loop
-    GPR_ASSERT(closed_loop_);
-
     StartThreads(num_async_threads_);
   }
 
@@ -596,10 +512,11 @@ class GenericAsyncStreamingClient GRPC_FINAL
     auto stream = stub->Call(ctx, method_name, cq, tag);
     return stream;
   };
-  static ClientRpcContext* SetupCtx(int channel_id, grpc::GenericStub* stub,
+  static ClientRpcContext* SetupCtx(grpc::GenericStub* stub,
+                                    std::function<gpr_timespec()> next_issue,
                                     const ByteBuffer& req) {
     return new ClientRpcContextGenericStreamingImpl(
-        channel_id, stub, req, GenericAsyncStreamingClient::StartReq,
+        stub, req, next_issue, GenericAsyncStreamingClient::StartReq,
         GenericAsyncStreamingClient::CheckDone);
   }
 };
diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc
index d93537b279..edfc246a25 100644
--- a/test/cpp/qps/client_sync.cc
+++ b/test/cpp/qps/client_sync.cc
@@ -84,11 +84,8 @@ class SynchronousClient
 
  protected:
   void WaitToIssue(int thread_idx) {
-    grpc_time next_time;
-    if (NextIssueTime(thread_idx, &next_time)) {
-      gpr_timespec next_timespec;
-      TimepointHR2Timespec(next_time, &next_timespec);
-      gpr_sleep_until(next_timespec);
+    if (!closed_loop_) {
+      gpr_sleep_until(NextIssueTime(thread_idx));
     }
   }
 
diff --git a/test/cpp/qps/interarrival.h b/test/cpp/qps/interarrival.h
index d1bfdcd756..33f102fe97 100644
--- a/test/cpp/qps/interarrival.h
+++ b/test/cpp/qps/interarrival.h
@@ -151,9 +151,8 @@ class InterarrivalTimer {
       // and that supports new and old compilers
       const double uniform_0_1 = static_cast<double>(rand())
           / static_cast<double>(RAND_MAX);
-      random_table_.push_back(
-          std::chrono::nanoseconds(static_cast<int64_t>(
-              1e9 * r.transform(uniform_0_1))));
+      random_table_.push_back(static_cast<int64_t>(
+          1e9 * r.transform(uniform_0_1)));
     }
     // Now set up the thread positions
     for (int i = 0; i < threads; i++) {
@@ -162,7 +161,7 @@ class InterarrivalTimer {
   }
   virtual ~InterarrivalTimer(){};
 
-  std::chrono::nanoseconds next(int thread_num) {
+  int64_t next(int thread_num) {
     auto ret = *(thread_posns_[thread_num]++);
     if (thread_posns_[thread_num] == random_table_.end())
       thread_posns_[thread_num] = random_table_.begin();
@@ -170,7 +169,7 @@ class InterarrivalTimer {
   }
 
  private:
-  typedef std::vector<std::chrono::nanoseconds> time_table;
+  typedef std::vector<int64_t> time_table;
   std::vector<time_table::const_iterator> thread_posns_;
   time_table random_table_;
 };
diff --git a/test/cpp/qps/qps_interarrival_test.cc b/test/cpp/qps/qps_interarrival_test.cc
index dd32bcc0e3..48585af756 100644
--- a/test/cpp/qps/qps_interarrival_test.cc
+++ b/test/cpp/qps/qps_interarrival_test.cc
@@ -49,7 +49,7 @@ static void RunTest(RandomDistInterface &&r, int threads, std::string title) {
 
   for (int i = 0; i < 10000000; i++) {
     for (int j = 0; j < threads; j++) {
-      gpr_histogram_add(h, timer.next(j).count());
+      gpr_histogram_add(h, timer.next(j));
     }
   }
 
diff --git a/test/cpp/qps/qps_openloop_test.cc b/test/cpp/qps/qps_openloop_test.cc
index fe5f685b6e..0ac41d9f96 100644
--- a/test/cpp/qps/qps_openloop_test.cc
+++ b/test/cpp/qps/qps_openloop_test.cc
@@ -53,7 +53,7 @@ static void RunQPS() {
   client_config.set_outstanding_rpcs_per_channel(1000);
   client_config.set_client_channels(8);
   client_config.set_async_client_threads(8);
-  client_config.set_rpc_type(UNARY);
+  client_config.set_rpc_type(STREAMING);
   client_config.mutable_load_params()->mutable_poisson()->set_offered_load(
       1000.0);
 
diff --git a/test/cpp/qps/qps_test.cc b/test/cpp/qps/qps_test.cc
index 15054db892..27aaf137f6 100644
--- a/test/cpp/qps/qps_test.cc
+++ b/test/cpp/qps/qps_test.cc
@@ -53,7 +53,7 @@ static void RunQPS() {
   client_config.set_outstanding_rpcs_per_channel(1000);
   client_config.set_client_channels(8);
   client_config.set_async_client_threads(8);
-  client_config.set_rpc_type(UNARY);
+  client_config.set_rpc_type(STREAMING);
   client_config.mutable_load_params()->mutable_closed_loop();
 
   ServerConfig server_config;
diff --git a/tools/run_tests/tests.json b/tools/run_tests/tests.json
index 2c73c40d1f..ddf4a1293f 100644
--- a/tools/run_tests/tests.json
+++ b/tools/run_tests/tests.json
@@ -2077,6 +2077,24 @@
       "windows"
     ]
   }, 
+  {
+    "args": [], 
+    "ci_platforms": [
+      "linux", 
+      "mac", 
+      "posix"
+    ], 
+    "cpu_cost": 1.0, 
+    "exclude_configs": [], 
+    "flaky": false, 
+    "language": "c++", 
+    "name": "qps_openloop_test", 
+    "platforms": [
+      "linux", 
+      "mac", 
+      "posix"
+    ]
+  }, 
   {
     "args": [], 
     "ci_platforms": [
-- 
GitLab