From 999ac157e648d6bccdec16a696842bdbf5416e27 Mon Sep 17 00:00:00 2001
From: Yuxuan Li <yuxuanli@google.com>
Date: Wed, 3 May 2017 21:36:36 -0700
Subject: [PATCH] initial implementation.

---
 src/core/lib/surface/completion_queue.c       |  8 +++++++
 src/core/lib/surface/completion_queue.h       |  2 ++
 src/proto/grpc/testing/control.proto          |  2 ++
 src/proto/grpc/testing/stats.proto            |  2 ++
 test/cpp/microbenchmarks/fullstack_fixtures.h | 22 +++++++++++++++++++
 test/cpp/microbenchmarks/helpers.cc           |  4 ++++
 test/cpp/microbenchmarks/helpers.h            |  2 ++
 test/cpp/qps/client.h                         | 15 ++++++++++++-
 test/cpp/qps/client_async.cc                  | 15 +++++++++++++
 test/cpp/qps/driver.cc                        |  3 +++
 test/cpp/qps/qps_json_driver.cc               |  1 +
 test/cpp/qps/report.cc                        | 19 ++++++++++++++++
 test/cpp/qps/report.h                         |  7 ++++++
 13 files changed, 101 insertions(+), 1 deletion(-)

diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c
index eae3f103b1..bfdd7f22fd 100644
--- a/src/core/lib/surface/completion_queue.c
+++ b/src/core/lib/surface/completion_queue.c
@@ -227,6 +227,7 @@ struct grpc_completion_queue {
   /* TODO: sreek - This will no longer be needed. Use polling_type set */
   int is_non_listening_server_cq;
   int num_pluckers;
+  gpr_atm num_poll;
   plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
   grpc_closure pollset_shutdown_done;
 
@@ -292,6 +293,7 @@ grpc_completion_queue *grpc_completion_queue_create_internal(
   cc->is_server_cq = 0;
   cc->is_non_listening_server_cq = 0;
   cc->num_pluckers = 0;
+  gpr_atm_no_barrier_store(&cc->num_poll, 0);
   gpr_atm_no_barrier_store(&cc->things_queued_ever, 0);
 #ifndef NDEBUG
   cc->outstanding_tag_count = 0;
@@ -308,6 +310,10 @@ grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc) {
   return cc->completion_type;
 }
 
+gpr_atm grpc_get_cq_poll_num(grpc_completion_queue *cc) {
+  return gpr_atm_no_barrier_load(&cc->num_poll);
+}
+
 #ifdef GRPC_CQ_REF_COUNT_DEBUG
 void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason,
                           const char *file, int line) {
@@ -592,6 +598,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
       gpr_mu_lock(cc->mu);
       continue;
     } else {
+      cc->num_poll++;
       grpc_error *err = cc->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cc),
                                                 NULL, now, iteration_deadline);
       if (err != GRPC_ERROR_NONE) {
@@ -784,6 +791,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
       grpc_exec_ctx_flush(&exec_ctx);
       gpr_mu_lock(cc->mu);
     } else {
+      gpr_atm_no_barrier_fetch_add(&cc->num_poll, 1);
       grpc_error *err = cc->poller_vtable->work(
           &exec_ctx, POLLSET_FROM_CQ(cc), &worker, now, iteration_deadline);
       if (err != GRPC_ERROR_NONE) {
diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h
index a932087939..d8c812f2ae 100644
--- a/src/core/lib/surface/completion_queue.h
+++ b/src/core/lib/surface/completion_queue.h
@@ -100,6 +100,8 @@ bool grpc_cq_can_listen(grpc_completion_queue *cc);
 
 grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc);
 
+gpr_atm grpc_get_cq_poll_num(grpc_completion_queue *cc);
+
 grpc_completion_queue *grpc_completion_queue_create_internal(
     grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type);
 
diff --git a/src/proto/grpc/testing/control.proto b/src/proto/grpc/testing/control.proto
index acee86678d..a55483e3e0 100644
--- a/src/proto/grpc/testing/control.proto
+++ b/src/proto/grpc/testing/control.proto
@@ -241,6 +241,8 @@ message ScenarioResultSummary
   // Number of requests that succeeded/failed
   double successful_requests_per_second = 13;
   double failed_requests_per_second = 14;
+
+  double client_polls_per_request = 15;
 }
 
 // Results of a single benchmark scenario.
diff --git a/src/proto/grpc/testing/stats.proto b/src/proto/grpc/testing/stats.proto
index 80014161a1..8818f70cc5 100644
--- a/src/proto/grpc/testing/stats.proto
+++ b/src/proto/grpc/testing/stats.proto
@@ -81,4 +81,6 @@ message ClientStats {
 
   // Number of failed requests (one row per status code seen)
   repeated RequestResultCount request_results = 5;
+
+  uint64 cq_poll_count = 6;
 }
diff --git a/test/cpp/microbenchmarks/fullstack_fixtures.h b/test/cpp/microbenchmarks/fullstack_fixtures.h
index 98aca1c346..cb96ac5d71 100644
--- a/test/cpp/microbenchmarks/fullstack_fixtures.h
+++ b/test/cpp/microbenchmarks/fullstack_fixtures.h
@@ -100,6 +100,17 @@ class FullstackFixture : public BaseFixture {
     }
   }
 
+  void Finish(benchmark::State &state) {
+    std::ostringstream out;
+    AddToLabel(out, state);
+    AppendToLabel(out, "polls/iter", (double)grpc_get_cq_poll_num(this->cq()->cq())/state.iterations());
+    auto label = out.str();
+    if (label.length() && label[0] == ' ') {
+      label = label.substr(1);
+    }
+    state.SetLabel(label);
+  }
+
   ServerCompletionQueue* cq() { return cq_.get(); }
   std::shared_ptr<Channel> channel() { return channel_; }
 
@@ -212,6 +223,17 @@ class EndpointPairFixture : public BaseFixture {
     }
   }
 
+  void Finish(benchmark::State &state) {
+    std::ostringstream out;
+    AddToLabel(out, state);
+    AppendToLabel(out, "polls/iter", (double)grpc_get_cq_poll_num(this->cq()->cq())/state.iterations());
+    auto label = out.str();
+    if (label.length() && label[0] == ' ') {
+      label = label.substr(1);
+    }
+    state.SetLabel(label);
+  }
+  
   ServerCompletionQueue* cq() { return cq_.get(); }
   std::shared_ptr<Channel> channel() { return channel_; }
 
diff --git a/test/cpp/microbenchmarks/helpers.cc b/test/cpp/microbenchmarks/helpers.cc
index 6550742453..3bf67c3c01 100644
--- a/test/cpp/microbenchmarks/helpers.cc
+++ b/test/cpp/microbenchmarks/helpers.cc
@@ -67,3 +67,7 @@ void TrackCounters::AddToLabel(std::ostream &out, benchmark::State &state) {
           (double)state.iterations());
 #endif
 }
+
+void TrackCounters::AppendToLabel(std::ostream& out, std::string metric, double value) {
+  out << " " << key << ":" << value;
+}
diff --git a/test/cpp/microbenchmarks/helpers.h b/test/cpp/microbenchmarks/helpers.h
index 7360a1c9f2..d2402a2d96 100644
--- a/test/cpp/microbenchmarks/helpers.h
+++ b/test/cpp/microbenchmarks/helpers.h
@@ -35,6 +35,7 @@
 #define TEST_CPP_MICROBENCHMARKS_COUNTERS_H
 
 #include <sstream>
+#include <string>
 
 extern "C" {
 #include <grpc/support/port_platform.h>
@@ -79,6 +80,7 @@ class TrackCounters {
  public:
   virtual void Finish(benchmark::State& state);
   virtual void AddToLabel(std::ostream& out, benchmark::State& state);
+  virtual void AppendToLabel(std::ostream& out, std::string metric, double value);
 
  private:
 #ifdef GPR_LOW_LEVEL_COUNTERS
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h
index 25a19a5a74..92c6c7a3a3 100644
--- a/test/cpp/qps/client.h
+++ b/test/cpp/qps/client.h
@@ -150,7 +150,8 @@ class Client {
   Client()
       : timer_(new UsageTimer),
         interarrival_timer_(),
-        started_requests_(false) {
+        started_requests_(false),
+        last_reset_poll_count_(0) {
     gpr_event_init(&start_requests_);
   }
   virtual ~Client() {}
@@ -162,6 +163,7 @@ class Client {
 
     MaybeStartRequests();
 
+    int last_reset_poll_count_to_use = last_reset_poll_count_;
     if (reset) {
       std::vector<Histogram> to_merge(threads_.size());
       std::vector<StatusHistogram> to_merge_status(threads_.size());
@@ -176,6 +178,7 @@ class Client {
         MergeStatusHistogram(to_merge_status[i], &statuses);
       }
       timer_result = timer->Mark();
+      last_reset_poll_count_ = GetPollCount();
     } else {
       // merge snapshots of each thread histogram
       for (size_t i = 0; i < threads_.size(); i++) {
@@ -195,6 +198,9 @@ class Client {
     stats.set_time_elapsed(timer_result.wall);
     stats.set_time_system(timer_result.system);
     stats.set_time_user(timer_result.user);
+    gpr_log(GPR_INFO, "*****poll count : %d %d %d", GetPollCount(), last_reset_poll_count_, last_reset_poll_count_to_use);
+
+    stats.set_cq_poll_count(GetPollCount() - last_reset_poll_count_to_use);
     return stats;
   }
 
@@ -209,6 +215,11 @@ class Client {
     }
   }
 
+  virtual int GetPollCount() {
+    // For sync client.
+    return 0;
+  }
+
  protected:
   bool closed_loop_;
   gpr_atm thread_pool_done_;
@@ -351,6 +362,8 @@ class Client {
   gpr_event start_requests_;
   bool started_requests_;
 
+  int last_reset_poll_count_;
+
   void MaybeStartRequests() {
     if (!started_requests_) {
       started_requests_ = true;
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index 29a79e7343..2e4b7acba7 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -54,6 +54,10 @@
 #include "test/cpp/qps/usage_timer.h"
 #include "test/cpp/util/create_test_channel.h"
 
+extern "C" {
+#include "src/core/lib/surface/completion_queue.h"
+}
+
 namespace grpc {
 namespace testing {
 
@@ -205,6 +209,17 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
     }
   }
 
+int GetPollCount() {
+  int count = 0;
+  int i = 0;
+  for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
+    int k = (int)grpc_get_cq_poll_num((*cq)->cq());
+    gpr_log(GPR_INFO, "%d: per cq poll:%d", i++, k);
+    count += k;
+  }
+  return count;
+}
+
  protected:
   const int num_async_threads_;
 
diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc
index 74fe3662c1..c4cae286fd 100644
--- a/test/cpp/qps/driver.cc
+++ b/test/cpp/qps/driver.cc
@@ -112,6 +112,7 @@ static deque<string> get_workers(const string& env_name) {
 static double WallTime(ClientStats s) { return s.time_elapsed(); }
 static double SystemTime(ClientStats s) { return s.time_system(); }
 static double UserTime(ClientStats s) { return s.time_user(); }
+static double PollCount(ClientStats s) { return s.cq_poll_count(); }
 static double ServerWallTime(ServerStats s) { return s.time_elapsed(); }
 static double ServerSystemTime(ServerStats s) { return s.time_system(); }
 static double ServerUserTime(ServerStats s) { return s.time_user(); }
@@ -180,6 +181,8 @@ static void postprocess_scenario_result(ScenarioResult* result) {
     result->mutable_summary()->set_failed_requests_per_second(failures /
                                                               time_estimate);
   }
+  gpr_log(GPR_INFO, "poll count : %f", sum(result->client_stats(), PollCount));
+  result->mutable_summary()->set_client_polls_per_request(sum(result->client_stats(), PollCount)/histogram.Count());
 }
 
 std::unique_ptr<ScenarioResult> RunScenario(
diff --git a/test/cpp/qps/qps_json_driver.cc b/test/cpp/qps/qps_json_driver.cc
index a906137474..f00f771ea0 100644
--- a/test/cpp/qps/qps_json_driver.cc
+++ b/test/cpp/qps/qps_json_driver.cc
@@ -94,6 +94,7 @@ static std::unique_ptr<ScenarioResult> RunAndReport(const Scenario& scenario,
   GetReporter()->ReportLatency(*result);
   GetReporter()->ReportTimes(*result);
   GetReporter()->ReportCpuUsage(*result);
+  GetReporter()->ReportPollCount(*result);
 
   for (int i = 0; *success && i < result->client_success_size(); i++) {
     *success = result->client_success(i);
diff --git a/test/cpp/qps/report.cc b/test/cpp/qps/report.cc
index a9130bf5d4..ae56b0e857 100644
--- a/test/cpp/qps/report.cc
+++ b/test/cpp/qps/report.cc
@@ -80,6 +80,12 @@ void CompositeReporter::ReportCpuUsage(const ScenarioResult& result) {
   }
 }
 
+void CompositeReporter::ReportPollCount(const ScenarioResult& result) {
+  for (size_t i = 0; i < reporters_.size(); ++i) {
+    reporters_[i]->ReportPollCount(result);
+  }
+}
+
 void GprLogReporter::ReportQPS(const ScenarioResult& result) {
   gpr_log(GPR_INFO, "QPS: %.1f", result.summary().qps());
   if (result.summary().failed_requests_per_second() > 0) {
@@ -121,6 +127,11 @@ void GprLogReporter::ReportCpuUsage(const ScenarioResult& result) {
           result.summary().server_cpu_usage());
 }
 
+void GprLogReporter::ReportPollCount(const ScenarioResult& result) {
+  gpr_log(GPR_INFO, "Client Polls per Request: %.2f%%",
+          result.summary().client_polls_per_request());
+}
+
 void JsonReporter::ReportQPS(const ScenarioResult& result) {
   grpc::string json_string =
       SerializeJson(result, "type.googleapis.com/grpc.testing.ScenarioResult");
@@ -145,6 +156,10 @@ void JsonReporter::ReportCpuUsage(const ScenarioResult& result) {
   // NOP - all reporting is handled by ReportQPS.
 }
 
+void JsonReporter::ReportPollCount(const ScenarioResult& result) {
+  // NOP - all reporting is handled by ReportQPS.
+}
+
 void RpcReporter::ReportQPS(const ScenarioResult& result) {
   grpc::ClientContext context;
   grpc::Status status;
@@ -177,5 +192,9 @@ void RpcReporter::ReportCpuUsage(const ScenarioResult& result) {
   // NOP - all reporting is handled by ReportQPS.
 }
 
+void RpcReporter::ReportPollCount(const ScenarioResult& result) {
+  // NOP - all reporting is handled by ReportQPS.
+}
+
 }  // namespace testing
 }  // namespace grpc
diff --git a/test/cpp/qps/report.h b/test/cpp/qps/report.h
index 1749be98c6..6ed3ea1b0d 100644
--- a/test/cpp/qps/report.h
+++ b/test/cpp/qps/report.h
@@ -76,6 +76,9 @@ class Reporter {
   /** Reports server cpu usage. */
   virtual void ReportCpuUsage(const ScenarioResult& result) = 0;
 
+  /** Reports server cpu usage. */
+  virtual void ReportPollCount(const ScenarioResult& result) = 0;
+
  private:
   const string name_;
 };
@@ -93,6 +96,7 @@ class CompositeReporter : public Reporter {
   void ReportLatency(const ScenarioResult& result) override;
   void ReportTimes(const ScenarioResult& result) override;
   void ReportCpuUsage(const ScenarioResult& result) override;
+  void ReportPollCount(const ScenarioResult& result) override;
 
  private:
   std::vector<std::unique_ptr<Reporter> > reporters_;
@@ -109,6 +113,7 @@ class GprLogReporter : public Reporter {
   void ReportLatency(const ScenarioResult& result) override;
   void ReportTimes(const ScenarioResult& result) override;
   void ReportCpuUsage(const ScenarioResult& result) override;
+  void ReportPollCount(const ScenarioResult& result) override;
 };
 
 /** Dumps the report to a JSON file. */
@@ -123,6 +128,7 @@ class JsonReporter : public Reporter {
   void ReportLatency(const ScenarioResult& result) override;
   void ReportTimes(const ScenarioResult& result) override;
   void ReportCpuUsage(const ScenarioResult& result) override;
+  void ReportPollCount(const ScenarioResult& result) override;
 
   const string report_file_;
 };
@@ -138,6 +144,7 @@ class RpcReporter : public Reporter {
   void ReportLatency(const ScenarioResult& result) override;
   void ReportTimes(const ScenarioResult& result) override;
   void ReportCpuUsage(const ScenarioResult& result) override;
+  void ReportPollCount(const ScenarioResult& result) override;
 
   std::unique_ptr<ReportQpsScenarioService::Stub> stub_;
 };
-- 
GitLab