diff --git a/src/node/performance/worker_service_impl.js b/src/node/performance/worker_service_impl.js index 79baa1ce690ac2cfe2f0e74cc36581f4c6fc2cd0..14392498784050bda66945f72317b7380a6487a4 100644 --- a/src/node/performance/worker_service_impl.js +++ b/src/node/performance/worker_service_impl.js @@ -37,6 +37,11 @@ var os = require('os'); var BenchmarkClient = require('./benchmark_client'); var BenchmarkServer = require('./benchmark_server'); +exports.quitWorker = function quitWorker(call, callback) { + callback(null, {}); + process.exit(0); +} + exports.runClient = function runClient(call) { var client; call.on('data', function(request) { diff --git a/src/proto/grpc/testing/control.proto b/src/proto/grpc/testing/control.proto index 8278836468fd8bf2c8878acba7d9d79d68bfc4de..cc365cafe1af69073d728093efb625ae03eb0b17 100644 --- a/src/proto/grpc/testing/control.proto +++ b/src/proto/grpc/testing/control.proto @@ -166,3 +166,6 @@ message CoreResponse { // Number of cores available on the server int32 cores = 1; } + +message Void { +} diff --git a/src/proto/grpc/testing/services.proto b/src/proto/grpc/testing/services.proto index 4c8e32bb8f36ef0a3d0d003322af6cecdd75eca9..a2c5fda47efb264c825a20f7d63a5f592edc8e63 100644 --- a/src/proto/grpc/testing/services.proto +++ b/src/proto/grpc/testing/services.proto @@ -65,4 +65,7 @@ service WorkerService { // Just return the core count - unary call rpc CoreCount(CoreRequest) returns (CoreResponse); + + // Quit this worker + rpc QuitWorker(Void) returns (Void); } diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc index c70b0303b8d3dc505551e64bdbecc2f60cca4800..80f6ada409150df24f7eabdbe6cddeea82dae2c3 100644 --- a/test/cpp/qps/driver.cc +++ b/test/cpp/qps/driver.cc @@ -384,5 +384,18 @@ std::unique_ptr<ScenarioResult> RunScenario( delete[] servers; return result; } + +void RunQuit() { + // Get client, server lists + auto workers = get_workers("QPS_WORKERS"); + for (size_t i = 0; i < workers.size(); i++) { + auto stub = WorkerService::NewStub( + CreateChannel(workers[i], InsecureChannelCredentials())); + Void dummy; + grpc::ClientContext ctx; + GPR_ASSERT(stub->QuitWorker(&ctx, dummy, &dummy).ok()); + } +} + } // namespace testing } // namespace grpc diff --git a/test/cpp/qps/driver.h b/test/cpp/qps/driver.h index 2a7cf805e57d70263b816a6b8c6cc761dfb309ca..4b2b400c0ce9ec3aa10e30ce0b47f83cbb478b48 100644 --- a/test/cpp/qps/driver.h +++ b/test/cpp/qps/driver.h @@ -70,6 +70,7 @@ std::unique_ptr<ScenarioResult> RunScenario( const grpc::testing::ServerConfig& server_config, size_t num_servers, int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count); +void RunQuit(); } // namespace testing } // namespace grpc diff --git a/test/cpp/qps/qps_driver.cc b/test/cpp/qps/qps_driver.cc index ffc8a83fc53a6c530cea56722951e090efaf49b4..69fb4d75e8d83dc998f1222a441c8e6e519ff158 100644 --- a/test/cpp/qps/qps_driver.cc +++ b/test/cpp/qps/qps_driver.cc @@ -78,6 +78,8 @@ DEFINE_int32(client_core_limit, -1, "Limit on client cores to use"); DEFINE_bool(secure_test, false, "Run a secure test"); +DEFINE_bool(quit, false, "Quit the workers"); + using grpc::testing::ClientConfig; using grpc::testing::ServerConfig; using grpc::testing::ClientType; @@ -90,6 +92,11 @@ namespace grpc { namespace testing { static void QpsDriver() { + if (FLAGS_quit) { + RunQuit(); + return; + } + RpcType rpc_type; GPR_ASSERT(RpcType_Parse(FLAGS_rpc_type, &rpc_type)); diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc index 7e9e05f7ecc5c1c28a0d65c5609c42b3a987643a..9442017ddf525bb3ecb886a90925dd2fa9587c2c 100644 --- a/test/cpp/qps/qps_worker.cc +++ b/test/cpp/qps/qps_worker.cc @@ -103,8 +103,8 @@ static std::unique_ptr<Server> CreateServer(const ServerConfig& config) { class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service { public: - explicit WorkerServiceImpl(int server_port) - : acquired_(false), server_port_(server_port) {} + WorkerServiceImpl(int server_port, QpsWorker* worker) + : acquired_(false), server_port_(server_port), worker_(worker) {} Status RunClient(ServerContext* ctx, ServerReaderWriter<ClientStatus, ClientArgs>* stream) @@ -140,6 +140,16 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service { return Status::OK; } + Status QuitWorker(ServerContext* ctx, const Void*, Void*) GRPC_OVERRIDE { + InstanceGuard g(this); + if (!g.Acquired()) { + return Status(StatusCode::RESOURCE_EXHAUSTED, ""); + } + + worker_->MarkDone(); + return Status::OK; + } + private: // Protect against multiple clients using this worker at once. class InstanceGuard { @@ -250,10 +260,12 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service { std::mutex mu_; bool acquired_; int server_port_; + QpsWorker* worker_; }; QpsWorker::QpsWorker(int driver_port, int server_port) { - impl_.reset(new WorkerServiceImpl(server_port)); + impl_.reset(new WorkerServiceImpl(server_port, this)); + gpr_atm_rel_store(&done_, static_cast<gpr_atm>(0)); char* server_address = NULL; gpr_join_host_port(&server_address, "::", driver_port); @@ -269,5 +281,11 @@ QpsWorker::QpsWorker(int driver_port, int server_port) { QpsWorker::~QpsWorker() {} +bool QpsWorker::Done() const { + return (gpr_atm_acq_load(&done_) != static_cast<gpr_atm>(0)); +} +void QpsWorker::MarkDone() { + gpr_atm_rel_store(&done_, static_cast<gpr_atm>(1)); +} } // namespace testing } // namespace grpc diff --git a/test/cpp/qps/qps_worker.h b/test/cpp/qps/qps_worker.h index 27de69fa65a3896ac544f3998fdbe15eb667d408..624c182100e9c59dad85c24e1a9b7fdd80c9f327 100644 --- a/test/cpp/qps/qps_worker.h +++ b/test/cpp/qps/qps_worker.h @@ -36,6 +36,8 @@ #include <memory> +#include <grpc/support/atm.h> + namespace grpc { class Server; @@ -49,9 +51,14 @@ class QpsWorker { explicit QpsWorker(int driver_port, int server_port = 0); ~QpsWorker(); + bool Done() const; + void MarkDone(); + private: std::unique_ptr<WorkerServiceImpl> impl_; std::unique_ptr<Server> server_; + + gpr_atm done_; }; } // namespace testing diff --git a/test/cpp/qps/worker.cc b/test/cpp/qps/worker.cc index a1e73e9abe921a8ce197e866b6643c77342fa510..f42cfe3255a399c9a67d8b8c56445826b8894962 100644 --- a/test/cpp/qps/worker.cc +++ b/test/cpp/qps/worker.cc @@ -56,7 +56,7 @@ namespace testing { static void RunServer() { QpsWorker worker(FLAGS_driver_port, FLAGS_server_port); - while (!got_sigint) { + while (!got_sigint && !worker.Done()) { gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(5, GPR_TIMESPAN))); }