From 6a608020b0e10f258d64f7b2a5b5cc02643ed9bc Mon Sep 17 00:00:00 2001
From: vjpai <vpai@google.com>
Date: Mon, 18 May 2015 09:16:53 -0700
Subject: [PATCH] WIP

---
 test/cpp/qps/client_async.cc | 41 ++++++++++++++++++++----------------
 1 file changed, 23 insertions(+), 18 deletions(-)

diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index fa1a799f1b..d0510ec67a 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -140,7 +140,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
 class AsyncClient : public Client {
  public:
   explicit AsyncClient(const ClientConfig& config,
-		       std::function<void(CompletionQueue*, TestService::Stub*,
+		       std::function<ClientRpcContext*(CompletionQueue*, TestService::Stub*,
 					  const SimpleRequest&)> setup_ctx) :
       Client(config) {
     for (int i = 0; i < config.async_client_threads(); i++) {
@@ -158,18 +158,22 @@ class AsyncClient : public Client {
     if (!closed_loop_) {
       for (auto channel = channels_.begin(); channel != channels_.end();
 	   channel++) {
-	channel_rpc_count_lock.emplace_back();
+	channel_rpc_lock_.emplace_back();
 	rpcs_outstanding_.push_back(0);
       }
     }
-    else {
-      int t = 0;
-      for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
-	for (auto channel = channels_.begin(); channel != channels_.end();
-	     channel++) {
-	  auto* cq = cli_cqs_[t].get();
-	  t = (t + 1) % cli_cqs_.size();
-	  setup_ctx(cq, channel->get_stub(), request_);
+
+    int t = 0;
+    for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
+      for (auto channel = channels_.begin(); channel != channels_.end();
+	   channel++) {
+	auto* cq = cli_cqs_[t].get();
+	t = (t + 1) % cli_cqs_.size();
+	ClientRpcContext *ctx = setup_ctx(cq, channel->get_stub(), request_);
+	if (closed_loop_) {
+	  // only relevant for closed_loop unary, but harmless for
+	  // closed_loop streaming
+	  ctx->Start();
 	}
       }
     }
@@ -222,12 +226,13 @@ class AsyncClient : public Client {
      }
      issue_allowed_[thread_idx] = true; // may be ok now even if it hadn't been
    }
-   if (issue_allowed && grpc_time_source::now() >= next_issue_[thread_idx]) {
+   if (issue_allowed_[thread_idx] &&
+       grpc_time_source::now() >= next_issue_[thread_idx]) {
      // Attempt to issue                                                                                                                 
      bool issued = false;
      for (int num_attempts = 0; num_attempts < channel_count_ && !issued;
 	  num_attempts++, next_channel_[thread_idx] = (next_channel_[thread_idx]+1)%channel_count_) {
-       std::lock_guard g(channel_rpc_count_lock_[next_channel_[thread_idx]]);
+       std::lock_guard g(channel_rpc_lock_[next_channel_[thread_idx]]);
        if (rpcs_outstanding[next_channel_[thread_idx]] < max_outstanding_per_channel_) {
 	 // do the work to issue
 	 rpcs_outstanding[next_channel_[thread_idx]]++;
@@ -247,7 +252,7 @@ class AsyncClient : public Client {
   std::vector<bool> issue_allowed_; // may this thread attempt to issue
   std::vector<grpc_time> next_issue_; // when should it issue?
 
-  std::vector<std::mutex> channel_rpc_count_lock_;
+  std::vector<std::mutex> channel_rpc_lock_;
   std::vector<int> rpcs_outstanding_; // per-channel vector
   int max_outstanding_per_channel_;
   int channel_count_;
@@ -261,15 +266,15 @@ class AsyncUnaryClient GRPC_FINAL : public AsyncClient {
   }
   ~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); }
 private:
-  static void SetupCtx(CompletionQueue* cq, TestService::Stub* stub,
+  static ClientRpcContext *SetupCtx(CompletionQueue* cq, TestService::Stub* stub,
                        const SimpleRequest& req) {
     auto check_done = [](grpc::Status s, SimpleResponse* response) {};
     auto start_req = [cq](TestService::Stub* stub, grpc::ClientContext* ctx,
                           const SimpleRequest& request) {
       return stub->AsyncUnaryCall(ctx, request, cq);
     };
-    new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
-        stub, req, start_req, check_done);
+    return new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
+       stub, req, start_req, check_done);
   }
   
 };
@@ -350,7 +355,7 @@ class AsyncStreamingClient GRPC_FINAL : public AsyncClient {
 
   ~AsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); }
 private:
-  static void SetupCtx(CompletionQueue* cq, TestService::Stub* stub,
+  static ClientRpcContext *SetupCtx(CompletionQueue* cq, TestService::Stub* stub,
                        const SimpleRequest& req)  {
     auto check_done = [](grpc::Status s, SimpleResponse* response) {};
     auto start_req = [cq](TestService::Stub *stub, grpc::ClientContext *ctx,
@@ -358,7 +363,7 @@ private:
       auto stream = stub->AsyncStreamingCall(ctx, cq, tag);
       return stream;
     };
-    new ClientRpcContextStreamingImpl<SimpleRequest, SimpleResponse>(
+    return new ClientRpcContextStreamingImpl<SimpleRequest, SimpleResponse>(
         stub, req, start_req, check_done);
   }
 };
-- 
GitLab