From e9a6eb7332eef19805e1177e813dfd301096743d Mon Sep 17 00:00:00 2001
From: Craig Tiller <craig.tiller@gmail.com>
Date: Thu, 9 Apr 2015 15:51:41 -0700
Subject: [PATCH] Allow RunScenarios to spawn in-process workers

This allows us to get back to single binary tests where appropriate, which will help in-depth profiling efforts.
I've built this atop my smoke_test changes as they inspired me to get this done.
---
 Makefile                    |  23 ++--
 build.json                  |  10 +-
 test/core/util/port_posix.c |  33 ++++-
 test/cpp/qps/driver.cc      |  28 ++++-
 test/cpp/qps/driver.h       |   3 +-
 test/cpp/qps/qps_driver.cc  |   4 +-
 test/cpp/qps/qps_worker.cc  | 233 ++++++++++++++++++++++++++++++++++++
 test/cpp/qps/qps_worker.h   |  60 ++++++++++
 test/cpp/qps/smoke_test.cc  |   8 +-
 test/cpp/qps/smoke_test.sh  |  28 -----
 test/cpp/qps/worker.cc      | 189 +----------------------------
 11 files changed, 385 insertions(+), 234 deletions(-)
 create mode 100644 test/cpp/qps/qps_worker.cc
 create mode 100644 test/cpp/qps/qps_worker.h
 delete mode 100755 test/cpp/qps/smoke_test.sh

diff --git a/Makefile b/Makefile
index 6665eb418c..47ce0cff05 100644
--- a/Makefile
+++ b/Makefile
@@ -3728,7 +3728,12 @@ $(OBJDIR)/$(CONFIG)/examples/pubsub/subscriber.o:     $(GENDIR)/examples/pubsub/
 
 LIBQPS_SRC = \
     $(GENDIR)/test/cpp/qps/qpstest.pb.cc \
+    test/cpp/qps/client_async.cc \
+    test/cpp/qps/client_sync.cc \
     test/cpp/qps/driver.cc \
+    test/cpp/qps/qps_worker.cc \
+    test/cpp/qps/server_async.cc \
+    test/cpp/qps/server_sync.cc \
     test/cpp/qps/timer.cc \
 
 
@@ -3757,7 +3762,12 @@ ifneq ($(OPENSSL_DEP),)
 # installing headers to their final destination on the drive. We need this
 # otherwise parallel compilation will fail if a source is compiled first.
 test/cpp/qps/qpstest.proto: $(OPENSSL_DEP)
+test/cpp/qps/client_async.cc: $(OPENSSL_DEP)
+test/cpp/qps/client_sync.cc: $(OPENSSL_DEP)
 test/cpp/qps/driver.cc: $(OPENSSL_DEP)
+test/cpp/qps/qps_worker.cc: $(OPENSSL_DEP)
+test/cpp/qps/server_async.cc: $(OPENSSL_DEP)
+test/cpp/qps/server_sync.cc: $(OPENSSL_DEP)
 test/cpp/qps/timer.cc: $(OPENSSL_DEP)
 endif
 
@@ -3784,7 +3794,12 @@ endif
 endif
 
 
+$(OBJDIR)/$(CONFIG)/test/cpp/qps/client_async.o:     $(GENDIR)/test/cpp/qps/qpstest.pb.cc
+$(OBJDIR)/$(CONFIG)/test/cpp/qps/client_sync.o:     $(GENDIR)/test/cpp/qps/qpstest.pb.cc
 $(OBJDIR)/$(CONFIG)/test/cpp/qps/driver.o:     $(GENDIR)/test/cpp/qps/qpstest.pb.cc
+$(OBJDIR)/$(CONFIG)/test/cpp/qps/qps_worker.o:     $(GENDIR)/test/cpp/qps/qpstest.pb.cc
+$(OBJDIR)/$(CONFIG)/test/cpp/qps/server_async.o:     $(GENDIR)/test/cpp/qps/qpstest.pb.cc
+$(OBJDIR)/$(CONFIG)/test/cpp/qps/server_sync.o:     $(GENDIR)/test/cpp/qps/qpstest.pb.cc
 $(OBJDIR)/$(CONFIG)/test/cpp/qps/timer.o:     $(GENDIR)/test/cpp/qps/qpstest.pb.cc
 
 
@@ -8775,10 +8790,6 @@ endif
 
 
 QPS_WORKER_SRC = \
-    test/cpp/qps/client_async.cc \
-    test/cpp/qps/client_sync.cc \
-    test/cpp/qps/server_async.cc \
-    test/cpp/qps/server_sync.cc \
     test/cpp/qps/worker.cc \
 
 QPS_WORKER_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(QPS_WORKER_SRC))))
@@ -8809,10 +8820,6 @@ endif
 
 endif
 
-$(OBJDIR)/$(CONFIG)/test/cpp/qps/client_async.o:  $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
-$(OBJDIR)/$(CONFIG)/test/cpp/qps/client_sync.o:  $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
-$(OBJDIR)/$(CONFIG)/test/cpp/qps/server_async.o:  $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
-$(OBJDIR)/$(CONFIG)/test/cpp/qps/server_sync.o:  $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
 $(OBJDIR)/$(CONFIG)/test/cpp/qps/worker.o:  $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
 
 deps_qps_worker: $(QPS_WORKER_OBJS:.o=.dep)
diff --git a/build.json b/build.json
index f2a39c6f2c..adf6128339 100644
--- a/build.json
+++ b/build.json
@@ -554,11 +554,17 @@
       "language": "c++",
       "headers": [
         "test/cpp/qps/driver.h",
+        "test/cpp/qps/qps_worker.h",
         "test/cpp/qps/timer.h"
       ],
       "src": [
         "test/cpp/qps/qpstest.proto",
+        "test/cpp/qps/client_async.cc",
+        "test/cpp/qps/client_sync.cc",
         "test/cpp/qps/driver.cc",
+        "test/cpp/qps/qps_worker.cc",
+        "test/cpp/qps/server_async.cc",
+        "test/cpp/qps/server_sync.cc",
         "test/cpp/qps/timer.cc"
       ]
     },
@@ -2007,10 +2013,6 @@
         "test/cpp/qps/server.h"
       ],
       "src": [
-        "test/cpp/qps/client_async.cc",
-        "test/cpp/qps/client_sync.cc",
-        "test/cpp/qps/server_async.cc",
-        "test/cpp/qps/server_sync.cc",
         "test/cpp/qps/worker.cc"
       ],
       "deps": [
diff --git a/test/core/util/port_posix.c b/test/core/util/port_posix.c
index 7467c2f9ea..726ee3bd6c 100644
--- a/test/core/util/port_posix.c
+++ b/test/core/util/port_posix.c
@@ -44,10 +44,37 @@
 #include <string.h>
 #include <unistd.h>
 
+#include <grpc/support/alloc.h>
 #include <grpc/support/log.h>
 
 #define NUM_RANDOM_PORTS_TO_PICK 100
 
+static int *chosen_ports = NULL;
+static size_t num_chosen_ports = 0;
+
+static int has_port_been_chosen(int port) {
+  size_t i;
+  for (i = 0; i < num_chosen_ports; i++) {
+    if (chosen_ports[i] == port) {
+      return 1;
+    }
+  }
+  return 0;
+}
+
+static void free_chosen_ports() {
+  gpr_free(chosen_ports);
+}
+
+static void chose_port(int port) {
+  if (chosen_ports == NULL) {
+    atexit(free_chosen_ports);
+  }
+  num_chosen_ports++;
+  chosen_ports = gpr_realloc(chosen_ports, sizeof(int) * num_chosen_ports);
+  chosen_ports[num_chosen_ports - 1] = port;
+}
+
 static int is_port_available(int *port, int is_tcp) {
   const int proto = is_tcp ? IPPROTO_TCP : 0;
   const int fd = socket(AF_INET, is_tcp ? SOCK_STREAM : SOCK_DGRAM, proto);
@@ -127,6 +154,10 @@ int grpc_pick_unused_port(void) {
       port = 0;
     }
 
+    if (has_port_been_chosen(port)) {
+      continue;
+    }
+
     if (!is_port_available(&port, is_tcp)) {
       continue;
     }
@@ -142,7 +173,7 @@ int grpc_pick_unused_port(void) {
 
     /* TODO(ctiller): consider caching this port in some structure, to avoid
                       handing it out again */
-
+    chose_port(port);
     return port;
   }
 
diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc
index f44883783d..9f7d3b56a4 100644
--- a/test/cpp/qps/driver.cc
+++ b/test/cpp/qps/driver.cc
@@ -42,21 +42,25 @@
 #include <grpc++/stream.h>
 #include <list>
 #include <thread>
+#include <deque>
 #include <vector>
 #include "test/cpp/qps/histogram.h"
+#include "test/cpp/qps/qps_worker.h"
+#include "test/core/util/port.h"
 
 using std::list;
 using std::thread;
 using std::unique_ptr;
+using std::deque;
 using std::vector;
 
 namespace grpc {
 namespace testing {
-static vector<string> get_hosts(const string& name) {
+static deque<string> get_hosts(const string& name) {
   char* env = gpr_getenv(name.c_str());
-  if (!env) return vector<string>();
+  if (!env) return deque<string>();
 
-  vector<string> out;
+  deque<string> out;
   char* p = env;
   for (;;) {
     char* comma = strchr(p, ',');
@@ -76,7 +80,8 @@ ScenarioResult RunScenario(const ClientConfig& initial_client_config,
                            const ServerConfig& server_config,
                            size_t num_servers,
                            int warmup_seconds,
-                           int benchmark_seconds) {
+                           int benchmark_seconds,
+                           int spawn_local_worker_count) {
   // ClientContext allocator (all are destroyed at scope exit)
   list<ClientContext> contexts;
   auto alloc_context = [&contexts]() {
@@ -88,6 +93,21 @@ ScenarioResult RunScenario(const ClientConfig& initial_client_config,
   auto workers = get_hosts("QPS_WORKERS");
   ClientConfig client_config = initial_client_config;
 
+  // Spawn some local workers if desired
+  vector<unique_ptr<QpsWorker>> local_workers;
+  for (int i = 0; i < abs(spawn_local_worker_count); i++) {
+    int driver_port = grpc_pick_unused_port_or_die();
+    int benchmark_port = grpc_pick_unused_port_or_die();
+    local_workers.emplace_back(new QpsWorker(driver_port, benchmark_port));
+    char addr[256];
+    sprintf(addr, "localhost:%d", driver_port);
+    if (spawn_local_worker_count < 0) {
+      workers.push_front(addr);
+    } else {
+      workers.push_back(addr);
+    }
+  }
+
   // TODO(ctiller): support running multiple configurations, and binpack
   // client/server pairs
   // to available workers
diff --git a/test/cpp/qps/driver.h b/test/cpp/qps/driver.h
index 7a81b701c4..bfa0e68ff8 100644
--- a/test/cpp/qps/driver.h
+++ b/test/cpp/qps/driver.h
@@ -56,7 +56,8 @@ ScenarioResult RunScenario(const grpc::testing::ClientConfig& client_config,
                            const grpc::testing::ServerConfig& server_config,
                            size_t num_servers,
                            int warmup_seconds,
-                           int benchmark_seconds);
+                           int benchmark_seconds,
+                           int spawn_local_worker_count);
 
 }  // namespace testing
 }  // namespace grpc
diff --git a/test/cpp/qps/qps_driver.cc b/test/cpp/qps/qps_driver.cc
index f42d538b16..e1f1649af4 100644
--- a/test/cpp/qps/qps_driver.cc
+++ b/test/cpp/qps/qps_driver.cc
@@ -42,6 +42,7 @@ DEFINE_int32(num_servers, 1, "Number of server binaries");
 
 DEFINE_int32(warmup_seconds, 5, "Warmup time (in seconds)");
 DEFINE_int32(benchmark_seconds, 30, "Benchmark time (in seconds)");
+DEFINE_int32(local_workers, 0, "Number of local workers to start");
 
 // Common config
 DEFINE_bool(enable_ssl, false, "Use SSL");
@@ -103,7 +104,8 @@ int main(int argc, char** argv) {
 
   auto result = RunScenario(client_config, FLAGS_num_clients,
                             server_config, FLAGS_num_servers,
-                            FLAGS_warmup_seconds, FLAGS_benchmark_seconds);
+                            FLAGS_warmup_seconds, FLAGS_benchmark_seconds,
+                            FLAGS_local_workers);
 
   gpr_log(GPR_INFO, "QPS: %.1f",
           result.latencies.Count() /
diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc
new file mode 100644
index 0000000000..46d70dce52
--- /dev/null
+++ b/test/cpp/qps/qps_worker.cc
@@ -0,0 +1,233 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *     * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "qps_worker.h"
+
+#include <cassert>
+#include <memory>
+#include <mutex>
+#include <string>
+#include <thread>
+#include <vector>
+#include <sstream>
+
+#include <grpc/grpc.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/histogram.h>
+#include <grpc/support/log.h>
+#include <grpc/support/host_port.h>
+#include <grpc++/client_context.h>
+#include <grpc++/status.h>
+#include <grpc++/server.h>
+#include <grpc++/server_builder.h>
+#include <grpc++/server_credentials.h>
+#include <grpc++/stream.h>
+#include "test/core/util/grpc_profiler.h"
+#include "test/cpp/util/create_test_channel.h"
+#include "test/cpp/qps/qpstest.pb.h"
+#include "test/cpp/qps/client.h"
+#include "test/cpp/qps/server.h"
+
+namespace grpc {
+namespace testing {
+
+std::unique_ptr<Client> CreateClient(const ClientConfig& config) {
+  switch (config.client_type()) {
+    case ClientType::SYNCHRONOUS_CLIENT:
+      return (config.rpc_type() == RpcType::UNARY) ?
+	CreateSynchronousUnaryClient(config) :
+	CreateSynchronousStreamingClient(config);
+    case ClientType::ASYNC_CLIENT:
+      return (config.rpc_type() == RpcType::UNARY) ?
+	CreateAsyncUnaryClient(config) : CreateAsyncStreamingClient(config);
+  }
+  abort();
+}
+
+std::unique_ptr<Server> CreateServer(const ServerConfig& config, int server_port) {
+  switch (config.server_type()) {
+    case ServerType::SYNCHRONOUS_SERVER:
+      return CreateSynchronousServer(config, server_port);
+    case ServerType::ASYNC_SERVER:
+      return CreateAsyncServer(config, server_port);
+  }
+  abort();
+}
+
+class WorkerImpl GRPC_FINAL : public Worker::Service {
+ public:
+  explicit WorkerImpl(int server_port) : server_port_(server_port), acquired_(false) {}
+
+  Status RunTest(ServerContext* ctx,
+                 ServerReaderWriter<ClientStatus, ClientArgs>* stream)
+      GRPC_OVERRIDE {
+    InstanceGuard g(this);
+    if (!g.Acquired()) {
+      return Status(RESOURCE_EXHAUSTED);
+    }
+
+    grpc_profiler_start("qps_client.prof");
+    Status ret = RunTestBody(ctx,stream);
+    grpc_profiler_stop();
+    return ret;
+  }
+
+  Status RunServer(ServerContext* ctx,
+                   ServerReaderWriter<ServerStatus, ServerArgs>* stream)
+      GRPC_OVERRIDE {
+    InstanceGuard g(this);
+    if (!g.Acquired()) {
+      return Status(RESOURCE_EXHAUSTED);
+    }
+
+    grpc_profiler_start("qps_server.prof");
+    Status ret = RunServerBody(ctx,stream);
+    grpc_profiler_stop();
+    return ret;
+  }
+
+ private:
+  // Protect against multiple clients using this worker at once.
+  class InstanceGuard {
+   public:
+    InstanceGuard(WorkerImpl* impl)
+        : impl_(impl), acquired_(impl->TryAcquireInstance()) {}
+    ~InstanceGuard() {
+      if (acquired_) {
+        impl_->ReleaseInstance();
+      }
+    }
+
+    bool Acquired() const { return acquired_; }
+
+   private:
+    WorkerImpl* const impl_;
+    const bool acquired_;
+  };
+
+  bool TryAcquireInstance() {
+    std::lock_guard<std::mutex> g(mu_);
+    if (acquired_) return false;
+    acquired_ = true;
+    return true;
+  }
+
+  void ReleaseInstance() {
+    std::lock_guard<std::mutex> g(mu_);
+    GPR_ASSERT(acquired_);
+    acquired_ = false;
+  }
+
+  Status RunTestBody(ServerContext* ctx,
+                     ServerReaderWriter<ClientStatus, ClientArgs>* stream) {
+    ClientArgs args;
+    if (!stream->Read(&args)) {
+      return Status(INVALID_ARGUMENT);
+    }
+    if (!args.has_setup()) {
+      return Status(INVALID_ARGUMENT);
+    }
+    auto client = CreateClient(args.setup());
+    if (!client) {
+      return Status(INVALID_ARGUMENT);
+    }
+    ClientStatus status;
+    if (!stream->Write(status)) {
+      return Status(UNKNOWN);
+    }
+    while (stream->Read(&args)) {
+      if (!args.has_mark()) {
+        return Status(INVALID_ARGUMENT);
+      }
+      *status.mutable_stats() = client->Mark();
+      stream->Write(status);
+    }
+
+    return Status::OK;
+  }
+
+  Status RunServerBody(ServerContext* ctx,
+                       ServerReaderWriter<ServerStatus, ServerArgs>* stream) {
+    ServerArgs args;
+    if (!stream->Read(&args)) {
+      return Status(INVALID_ARGUMENT);
+    }
+    if (!args.has_setup()) {
+      return Status(INVALID_ARGUMENT);
+    }
+    auto server = CreateServer(args.setup(), server_port_);
+    if (!server) {
+      return Status(INVALID_ARGUMENT);
+    }
+    ServerStatus status;
+    status.set_port(server_port_);
+    if (!stream->Write(status)) {
+      return Status(UNKNOWN);
+    }
+    while (stream->Read(&args)) {
+      if (!args.has_mark()) {
+        return Status(INVALID_ARGUMENT);
+      }
+      *status.mutable_stats() = server->Mark();
+      stream->Write(status);
+    }
+
+    return Status::OK;
+  }
+
+  const int server_port_;
+
+  std::mutex mu_;
+  bool acquired_;
+};
+
+QpsWorker::QpsWorker(int driver_port, int server_port) {
+  impl_.reset(new WorkerImpl(server_port));
+
+  char* server_address = NULL;
+  gpr_join_host_port(&server_address, "::", driver_port);
+
+  ServerBuilder builder;
+  builder.AddListeningPort(server_address, InsecureServerCredentials());
+  builder.RegisterService(impl_.get());
+
+  gpr_free(server_address);
+
+  server_ = std::move(builder.BuildAndStart());
+}
+
+QpsWorker::~QpsWorker() {
+}
+
+}  // namespace testing
+}  // namespace grpc
diff --git a/test/cpp/qps/qps_worker.h b/test/cpp/qps/qps_worker.h
new file mode 100644
index 0000000000..861588907e
--- /dev/null
+++ b/test/cpp/qps/qps_worker.h
@@ -0,0 +1,60 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *     * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef QPS_WORKER_H
+#define QPS_WORKER_H
+
+#include <memory>
+
+namespace grpc {
+
+class Server;
+
+namespace testing {
+
+class WorkerImpl;
+
+class QpsWorker {
+ public:
+  QpsWorker(int driver_port, int server_port);
+  ~QpsWorker();
+
+ private:
+  std::unique_ptr<WorkerImpl> impl_;
+  std::unique_ptr<Server> server_;
+};
+
+}  // namespace testing
+}  // namespace grpc
+
+#endif
diff --git a/test/cpp/qps/smoke_test.cc b/test/cpp/qps/smoke_test.cc
index 5cdabb88a0..b0cc0c3039 100644
--- a/test/cpp/qps/smoke_test.cc
+++ b/test/cpp/qps/smoke_test.cc
@@ -58,7 +58,7 @@ static void RunSynchronousUnaryPingPong() {
   server_config.set_enable_ssl(false);
   server_config.set_threads(1);
 
-  auto result = RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK);
+  auto result = RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
 
   gpr_log(GPR_INFO, "QPS: %.1f",
           result.latencies.Count() /
@@ -87,7 +87,7 @@ static void RunSynchronousStreamingPingPong() {
   server_config.set_enable_ssl(false);
   server_config.set_threads(1);
 
-  auto result = RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK);
+  auto result = RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
 
   gpr_log(GPR_INFO, "QPS: %.1f",
           result.latencies.Count() /
@@ -117,7 +117,7 @@ static void RunAsyncUnaryPingPong() {
   server_config.set_enable_ssl(false);
   server_config.set_threads(1);
 
-  auto result = RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK);
+  auto result = RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
 
   gpr_log(GPR_INFO, "QPS: %.1f",
           result.latencies.Count() /
@@ -147,7 +147,7 @@ static void RunQPS() {
   server_config.set_enable_ssl(false);
   server_config.set_threads(4);
 
-  auto result = RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK);
+  auto result = RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
 
   auto qps = 
       result.latencies.Count() /
diff --git a/test/cpp/qps/smoke_test.sh b/test/cpp/qps/smoke_test.sh
deleted file mode 100755
index ba7f0a4f27..0000000000
--- a/test/cpp/qps/smoke_test.sh
+++ /dev/null
@@ -1,28 +0,0 @@
-#!/bin/sh
-
-# performs a single qps run with one client and one server
-
-set -ex
-
-cd $(dirname $0)/../../..
-
-killall qps_worker || true
-
-config=opt
-
-NUMCPUS=`python2.7 -c 'import multiprocessing; print multiprocessing.cpu_count()'`
-
-make CONFIG=$config qps_worker qps_smoke_test -j$NUMCPUS
-
-bins/$config/qps_worker -driver_port 10000 -server_port 10001 &
-PID1=$!
-bins/$config/qps_worker -driver_port 10010 -server_port 10011 &
-PID2=$!
-
-export QPS_WORKERS="localhost:10000,localhost:10010"
-
-bins/$config/qps_smoke_test $*
-
-kill -2 $PID1 $PID2
-wait
-
diff --git a/test/cpp/qps/worker.cc b/test/cpp/qps/worker.cc
index b6830cc055..1ef5313b66 100644
--- a/test/cpp/qps/worker.cc
+++ b/test/cpp/qps/worker.cc
@@ -31,33 +31,15 @@
  *
  */
 
-#include <cassert>
-#include <memory>
-#include <mutex>
-#include <string>
-#include <thread>
-#include <vector>
-#include <sstream>
-
 #include <sys/signal.h>
 
+#include <chrono>
+#include <thread>
+
 #include <grpc/grpc.h>
-#include <grpc/support/alloc.h>
-#include <grpc/support/histogram.h>
-#include <grpc/support/log.h>
-#include <grpc/support/host_port.h>
 #include <gflags/gflags.h>
-#include <grpc++/client_context.h>
-#include <grpc++/status.h>
-#include <grpc++/server.h>
-#include <grpc++/server_builder.h>
-#include <grpc++/server_credentials.h>
-#include <grpc++/stream.h>
-#include "test/core/util/grpc_profiler.h"
-#include "test/cpp/util/create_test_channel.h"
-#include "test/cpp/qps/qpstest.pb.h"
-#include "test/cpp/qps/client.h"
-#include "test/cpp/qps/server.h"
+
+#include "qps_worker.h"
 
 DEFINE_int32(driver_port, 0, "Driver server port.");
 DEFINE_int32(server_port, 0, "Spawned server port.");
@@ -76,167 +58,8 @@ static void sigint_handler(int x) {got_sigint = true;}
 namespace grpc {
 namespace testing {
 
-std::unique_ptr<Client> CreateClient(const ClientConfig& config) {
-  switch (config.client_type()) {
-    case ClientType::SYNCHRONOUS_CLIENT:
-      return (config.rpc_type() == RpcType::UNARY) ?
-	CreateSynchronousUnaryClient(config) :
-	CreateSynchronousStreamingClient(config);
-    case ClientType::ASYNC_CLIENT:
-      return (config.rpc_type() == RpcType::UNARY) ?
-	CreateAsyncUnaryClient(config) : CreateAsyncStreamingClient(config);
-  }
-  abort();
-}
-
-std::unique_ptr<Server> CreateServer(const ServerConfig& config) {
-  switch (config.server_type()) {
-    case ServerType::SYNCHRONOUS_SERVER:
-      return CreateSynchronousServer(config, FLAGS_server_port);
-    case ServerType::ASYNC_SERVER:
-      return CreateAsyncServer(config, FLAGS_server_port);
-  }
-  abort();
-}
-
-class WorkerImpl GRPC_FINAL : public Worker::Service {
- public:
-  WorkerImpl() : acquired_(false) {}
-
-  Status RunTest(ServerContext* ctx,
-                 ServerReaderWriter<ClientStatus, ClientArgs>* stream)
-      GRPC_OVERRIDE {
-    InstanceGuard g(this);
-    if (!g.Acquired()) {
-      return Status(RESOURCE_EXHAUSTED);
-    }
-
-    grpc_profiler_start("qps_client.prof");
-    Status ret = RunTestBody(ctx,stream);
-    grpc_profiler_stop();
-    return ret;
-  }
-
-  Status RunServer(ServerContext* ctx,
-                   ServerReaderWriter<ServerStatus, ServerArgs>* stream)
-      GRPC_OVERRIDE {
-    InstanceGuard g(this);
-    if (!g.Acquired()) {
-      return Status(RESOURCE_EXHAUSTED);
-    }
-
-    grpc_profiler_start("qps_server.prof");
-    Status ret = RunServerBody(ctx,stream);
-    grpc_profiler_stop();
-    return ret;
-  }
-
- private:
-  // Protect against multiple clients using this worker at once.
-  class InstanceGuard {
-   public:
-    InstanceGuard(WorkerImpl* impl)
-        : impl_(impl), acquired_(impl->TryAcquireInstance()) {}
-    ~InstanceGuard() {
-      if (acquired_) {
-        impl_->ReleaseInstance();
-      }
-    }
-
-    bool Acquired() const { return acquired_; }
-
-   private:
-    WorkerImpl* const impl_;
-    const bool acquired_;
-  };
-
-  bool TryAcquireInstance() {
-    std::lock_guard<std::mutex> g(mu_);
-    if (acquired_) return false;
-    acquired_ = true;
-    return true;
-  }
-
-  void ReleaseInstance() {
-    std::lock_guard<std::mutex> g(mu_);
-    GPR_ASSERT(acquired_);
-    acquired_ = false;
-  }
-
-  Status RunTestBody(ServerContext* ctx,
-                     ServerReaderWriter<ClientStatus, ClientArgs>* stream) {
-    ClientArgs args;
-    if (!stream->Read(&args)) {
-      return Status(INVALID_ARGUMENT);
-    }
-    if (!args.has_setup()) {
-      return Status(INVALID_ARGUMENT);
-    }
-    auto client = CreateClient(args.setup());
-    if (!client) {
-      return Status(INVALID_ARGUMENT);
-    }
-    ClientStatus status;
-    if (!stream->Write(status)) {
-      return Status(UNKNOWN);
-    }
-    while (stream->Read(&args)) {
-      if (!args.has_mark()) {
-        return Status(INVALID_ARGUMENT);
-      }
-      *status.mutable_stats() = client->Mark();
-      stream->Write(status);
-    }
-
-    return Status::OK;
-  }
-
-  Status RunServerBody(ServerContext* ctx,
-                       ServerReaderWriter<ServerStatus, ServerArgs>* stream) {
-    ServerArgs args;
-    if (!stream->Read(&args)) {
-      return Status(INVALID_ARGUMENT);
-    }
-    if (!args.has_setup()) {
-      return Status(INVALID_ARGUMENT);
-    }
-    auto server = CreateServer(args.setup());
-    if (!server) {
-      return Status(INVALID_ARGUMENT);
-    }
-    ServerStatus status;
-    status.set_port(FLAGS_server_port);
-    if (!stream->Write(status)) {
-      return Status(UNKNOWN);
-    }
-    while (stream->Read(&args)) {
-      if (!args.has_mark()) {
-        return Status(INVALID_ARGUMENT);
-      }
-      *status.mutable_stats() = server->Mark();
-      stream->Write(status);
-    }
-
-    return Status::OK;
-  }
-
-  std::mutex mu_;
-  bool acquired_;
-};
-
 static void RunServer() {
-  char* server_address = NULL;
-  gpr_join_host_port(&server_address, "::", FLAGS_driver_port);
-
-  WorkerImpl service;
-
-  ServerBuilder builder;
-  builder.AddListeningPort(server_address, InsecureServerCredentials());
-  builder.RegisterService(&service);
-
-  gpr_free(server_address);
-
-  auto server = builder.BuildAndStart();
+  QpsWorker worker(FLAGS_driver_port, FLAGS_server_port);
 
   while (!got_sigint) {
     std::this_thread::sleep_for(std::chrono::seconds(5));
-- 
GitLab