From 7b172b2411717d564f201c13317687132822964b Mon Sep 17 00:00:00 2001
From: Vijay Pai <vpai@google.com>
Date: Fri, 5 Jun 2015 02:03:18 -0700
Subject: [PATCH] Get the code to stop crashing by fixing bugs

---
 test/cpp/qps/client_async.cc | 119 +++++++++++++++++++++--------------
 1 file changed, 72 insertions(+), 47 deletions(-)

diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index 3c3814df23..ad0cffabda 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -34,6 +34,7 @@
 #include <cassert>
 #include <forward_list>
 #include <functional>
+#include <list>
 #include <memory>
 #include <mutex>
 #include <string>
@@ -57,8 +58,8 @@
 namespace grpc {
 namespace testing {
 
-typedef std::forward_list<grpc_time> deadline_list;
-  
+typedef std::list<grpc_time> deadline_list;
+
 class ClientRpcContext {
  public:
   ClientRpcContext(int ch): channel_id_(ch) {}
@@ -72,8 +73,8 @@ class ClientRpcContext {
   }
 
   deadline_list::iterator deadline_posn() const {return deadline_posn_;}
-  void set_deadline_posn(deadline_list::iterator&& it) {deadline_posn_ = it;}
-  virtual void Start() = 0;
+  void set_deadline_posn(const deadline_list::iterator& it) {deadline_posn_ = it;}
+  virtual void Start(CompletionQueue *cq) = 0;
   int channel_id() const {return channel_id_;}
  protected:
   int channel_id_;
@@ -88,7 +89,8 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
       TestService::Stub* stub, const RequestType& req,
       std::function<
           std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
-              TestService::Stub*, grpc::ClientContext*, const RequestType&)>
+              TestService::Stub*, grpc::ClientContext*, const RequestType&,
+              CompletionQueue*)>
           start_req,
       std::function<void(grpc::Status, ResponseType*)> on_done)
     : ClientRpcContext(channel_id), context_(),
@@ -99,9 +101,9 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
         callback_(on_done),
         start_req_(start_req) {
   }
-  void Start() GRPC_OVERRIDE {
+  void Start(CompletionQueue *cq) GRPC_OVERRIDE {
     start_ = Timer::Now();
-    response_reader_ = start_req_(stub_, &context_, req_);
+    response_reader_ = start_req_(stub_, &context_, req_, cq);
     response_reader_->Finish(&response_, &status_, ClientRpcContext::tag(this));
   }
   ~ClientRpcContextUnaryImpl() GRPC_OVERRIDE {}
@@ -115,7 +117,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
 
   ClientRpcContext* StartNewClone() GRPC_OVERRIDE {
     return new ClientRpcContextUnaryImpl(channel_id_,
-					 stub_, req_, start_req_, callback_);
+                                         stub_, req_, start_req_, callback_);
   }
 
  private:
@@ -123,9 +125,9 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
     next_state_ = &ClientRpcContextUnaryImpl::DoCallBack;
     return false;
   }
-  bool DoCallBack(bool) {
+  bool DoCallBack (bool) {
     callback_(status_, &response_);
-    return false;
+    return true; // we're done, this'll be ignored
   }
   grpc::ClientContext context_;
   TestService::Stub* stub_;
@@ -134,7 +136,8 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
   bool (ClientRpcContextUnaryImpl::*next_state_)(bool);
   std::function<void(grpc::Status, ResponseType*)> callback_;
   std::function<std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
-      TestService::Stub*, grpc::ClientContext*, const RequestType&)> start_req_;
+      TestService::Stub*, grpc::ClientContext*,
+      const RequestType&, CompletionQueue *)> start_req_;
   grpc::Status status_;
   double start_;
   std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>
@@ -146,11 +149,11 @@ typedef std::forward_list<ClientRpcContext *> context_list;
 class AsyncClient : public Client {
  public:
   explicit AsyncClient(const ClientConfig& config,
-		       std::function<ClientRpcContext*(int, CompletionQueue*, TestService::Stub*,
+		       std::function<ClientRpcContext*(int, TestService::Stub*,
 					  const SimpleRequest&)> setup_ctx) :
       Client(config), channel_lock_(config.client_channels()),
-      max_outstanding_per_channel_(config.outstanding_rpcs_per_channel()),
       contexts_(config.client_channels()),
+      max_outstanding_per_channel_(config.outstanding_rpcs_per_channel()),
       channel_count_(config.client_channels()) {
 
     SetupLoadTest(config, config.async_client_threads());
@@ -169,8 +172,8 @@ class AsyncClient : public Client {
     }
     if (!closed_loop_) {
       for (auto channel = channels_.begin(); channel != channels_.end();
-	   channel++) {
-	rpcs_outstanding_.push_back(0);
+           channel++) {
+        rpcs_outstanding_.push_back(0);
       }
     }
 
@@ -178,14 +181,12 @@ class AsyncClient : public Client {
     for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
       for (int ch = 0; ch < channel_count_; ch++) {
         auto& channel = channels_[ch];
-	auto* cq = cli_cqs_[t].get();
-	t = (t + 1) % cli_cqs_.size();
-	auto ctx = setup_ctx(ch, cq, channel.get_stub(), request_);
-	if (closed_loop_) {
-	  // only relevant for closed_loop unary, but harmless for
-	  // closed_loop streaming
-	  ctx->Start();
-	}
+        auto* cq = cli_cqs_[t].get();
+        t = (t + 1) % cli_cqs_.size();
+        auto ctx = setup_ctx(ch, channel.get_stub(), request_);
+        if (closed_loop_) {
+          ctx->Start(cq);
+        }
         else {
           contexts_[ch].push_front(ctx);
         }
@@ -238,28 +239,36 @@ class AsyncClient : public Client {
     }
     if ((closed_loop_ || !rpc_deadlines_[thread_idx].empty()) &&
         grpc_time_source::now() > deadline) {
-      // we have missed some 1-second deadline, which is too much                            gpr_log(GPR_INFO, "Missed an RPC deadline, giving up");
+      // we have missed some 1-second deadline, which is too much
+      gpr_log(GPR_INFO, "Missed an RPC deadline, giving up");
       return false;
     }
     if (got_event) {
      ClientRpcContext* ctx = ClientRpcContext::detag(got_tag);
      if (ctx->RunNextState(ok, histogram) == false) {
-       // call the callback and then delete it
-       rpc_deadlines_[thread_idx].erase_after(ctx->deadline_posn());
+       // call the callback and then clone the ctx
        ctx->RunNextState(ok, histogram);
        ClientRpcContext *clone_ctx = ctx->StartNewClone();
-       delete ctx;
-       if (!closed_loop_) {
-         // Put this in the list of idle contexts for this channel
+       if (closed_loop_) {
+         clone_ctx->Start(cli_cqs_[thread_idx].get());
+       }
+       else {
+         // Remove the entry from the rpc deadlines list
+         rpc_deadlines_[thread_idx].erase(ctx->deadline_posn());
+         // Put the clone_ctx in the list of idle contexts for this channel
 	 // Under lock
 	 int ch = clone_ctx->channel_id();
 	 std::lock_guard<std::mutex> g(channel_lock_[ch]);
-	 contexts_[ch].push_front(ctx);
+         rpcs_outstanding_[ch]--;
+	 contexts_[ch].push_front(clone_ctx);
        }
+       // delete the old version
+       delete ctx;
      }
-     issue_allowed_[thread_idx] = true; // may be ok now even if it hadn't been
+     if (!closed_loop_)
+       issue_allowed_[thread_idx] = true; // may be ok now even if it hadn't been
    }
-   if (issue_allowed_[thread_idx] &&
+   if (!closed_loop_ && issue_allowed_[thread_idx] &&
        grpc_time_source::now() >= next_issue_[thread_idx]) {
      // Attempt to issue
      bool issued = false;
@@ -273,16 +282,27 @@ class AsyncClient : public Client {
             max_outstanding_per_channel_) &&
            !contexts_[next_channel_[thread_idx]].empty()) {
          // Get an idle context from the front of the list
-         auto ctx = contexts_[next_channel_[thread_idx]].begin();
+         auto ctx = *(contexts_[next_channel_[thread_idx]].begin());
          contexts_[next_channel_[thread_idx]].pop_front();
 	 // do the work to issue
-         (*ctx)->Start();
+         rpc_deadlines_[thread_idx].emplace_back(
+             grpc_time_source::now() + std::chrono::seconds(1));
+         auto it = rpc_deadlines_[thread_idx].end();
+         --it;
+         ctx->set_deadline_posn(it);
+         ctx->Start(cli_cqs_[thread_idx].get());
 	 rpcs_outstanding_[next_channel_[thread_idx]]++;
 	 issued = true;
        }
      }
-     if (!issued)
+     if (issued) {
+       grpc_time next_issue;
+       NextIssueTime(thread_idx, &next_issue);
+       next_issue_[thread_idx]=next_issue;
+     }
+     else {
        issue_allowed_[thread_idx] = false;
+     }
    }
    return true;
   }
@@ -311,12 +331,11 @@ class AsyncUnaryClient GRPC_FINAL : public AsyncClient {
   ~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); }
 private:
   static ClientRpcContext *SetupCtx(int channel_id,
-				    CompletionQueue* cq,
 				    TestService::Stub* stub,
 				    const SimpleRequest& req) {
     auto check_done = [](grpc::Status s, SimpleResponse* response) {};
-    auto start_req = [cq](TestService::Stub* stub, grpc::ClientContext* ctx,
-                          const SimpleRequest& request) {
+    auto start_req = [](TestService::Stub* stub, grpc::ClientContext* ctx,
+                        const SimpleRequest& request, CompletionQueue* cq) {
       return stub->AsyncUnaryCall(ctx, request, cq);
     };
     return new ClientRpcContextUnaryImpl<SimpleRequest,
@@ -333,7 +352,8 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext {
       TestService::Stub* stub, const RequestType& req,
       std::function<std::unique_ptr<
           grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(
-          TestService::Stub*, grpc::ClientContext*, void*)> start_req,
+              TestService::Stub*, grpc::ClientContext*, CompletionQueue*,
+              void*)> start_req,
       std::function<void(grpc::Status, ResponseType*)> on_done)
       : ClientRpcContext(channel_id),
 	context_(),
@@ -343,8 +363,7 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext {
         next_state_(&ClientRpcContextStreamingImpl::ReqSent),
         callback_(on_done),
         start_req_(start_req),
-        start_(Timer::Now()),
-        stream_(start_req_(stub_, &context_, ClientRpcContext::tag(this))) {}
+        start_(Timer::Now()) {}
   ~ClientRpcContextStreamingImpl() GRPC_OVERRIDE {}
   bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
     return (this->*next_state_)(ok, hist);
@@ -353,7 +372,9 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext {
     return new ClientRpcContextStreamingImpl(channel_id_,
 					     stub_, req_, start_req_, callback_);
   }
-  void Start() GRPC_OVERRIDE {}
+  void Start(CompletionQueue *cq) GRPC_OVERRIDE {
+    stream_ = start_req_(stub_, &context_, cq, ClientRpcContext::tag(this));
+  }
  private:
   bool ReqSent(bool ok, Histogram*) { return StartWrite(ok); }
   bool StartWrite(bool ok) {
@@ -385,7 +406,8 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext {
   std::function<void(grpc::Status, ResponseType*)> callback_;
   std::function<
       std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(
-          TestService::Stub*, grpc::ClientContext*, void*)> start_req_;
+          TestService::Stub*, grpc::ClientContext*,
+          CompletionQueue *, void*)> start_req_;
   grpc::Status status_;
   double start_;
   std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>
@@ -396,17 +418,20 @@ class AsyncStreamingClient GRPC_FINAL : public AsyncClient {
  public:
   explicit AsyncStreamingClient(const ClientConfig& config)
       : AsyncClient(config, SetupCtx) {
+    // async streaming currently only supported closed loop
+    GPR_ASSERT(config.load_type() == CLOSED_LOOP);
+
     StartThreads(config.async_client_threads());
   }
 
   ~AsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); }
 private:
   static ClientRpcContext *SetupCtx(int channel_id,
-				    CompletionQueue* cq, TestService::Stub* stub,
-                       const SimpleRequest& req)  {
+                                    TestService::Stub* stub,
+                                    const SimpleRequest& req)  {
     auto check_done = [](grpc::Status s, SimpleResponse* response) {};
-    auto start_req = [cq](TestService::Stub* stub, grpc::ClientContext* ctx,
-                          void* tag) {
+    auto start_req = [](TestService::Stub* stub, grpc::ClientContext* ctx,
+                          CompletionQueue *cq, void* tag) {
       auto stream = stub->AsyncStreamingCall(ctx, cq, tag);
       return stream;
     };
-- 
GitLab