Skip to content
Snippets Groups Projects
Commit bcfe99a5 authored by Michael Lumish's avatar Michael Lumish
Browse files

Merge pull request #5008 from vjpai/worker_quit

Provide an RPC to quit the performance benchmarking workers
parents 3c7c76e7 ed53c082
No related branches found
No related tags found
No related merge requests found
...@@ -37,6 +37,11 @@ var os = require('os'); ...@@ -37,6 +37,11 @@ var os = require('os');
var BenchmarkClient = require('./benchmark_client'); var BenchmarkClient = require('./benchmark_client');
var BenchmarkServer = require('./benchmark_server'); var BenchmarkServer = require('./benchmark_server');
exports.quitWorker = function quitWorker(call, callback) {
callback(null, {});
process.exit(0);
}
exports.runClient = function runClient(call) { exports.runClient = function runClient(call) {
var client; var client;
call.on('data', function(request) { call.on('data', function(request) {
......
...@@ -166,3 +166,6 @@ message CoreResponse { ...@@ -166,3 +166,6 @@ message CoreResponse {
// Number of cores available on the server // Number of cores available on the server
int32 cores = 1; int32 cores = 1;
} }
message Void {
}
...@@ -65,4 +65,7 @@ service WorkerService { ...@@ -65,4 +65,7 @@ service WorkerService {
// Just return the core count - unary call // Just return the core count - unary call
rpc CoreCount(CoreRequest) returns (CoreResponse); rpc CoreCount(CoreRequest) returns (CoreResponse);
// Quit this worker
rpc QuitWorker(Void) returns (Void);
} }
...@@ -384,5 +384,18 @@ std::unique_ptr<ScenarioResult> RunScenario( ...@@ -384,5 +384,18 @@ std::unique_ptr<ScenarioResult> RunScenario(
delete[] servers; delete[] servers;
return result; return result;
} }
void RunQuit() {
// Get client, server lists
auto workers = get_workers("QPS_WORKERS");
for (size_t i = 0; i < workers.size(); i++) {
auto stub = WorkerService::NewStub(
CreateChannel(workers[i], InsecureChannelCredentials()));
Void dummy;
grpc::ClientContext ctx;
GPR_ASSERT(stub->QuitWorker(&ctx, dummy, &dummy).ok());
}
}
} // namespace testing } // namespace testing
} // namespace grpc } // namespace grpc
...@@ -70,6 +70,7 @@ std::unique_ptr<ScenarioResult> RunScenario( ...@@ -70,6 +70,7 @@ std::unique_ptr<ScenarioResult> RunScenario(
const grpc::testing::ServerConfig& server_config, size_t num_servers, const grpc::testing::ServerConfig& server_config, size_t num_servers,
int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count); int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count);
void RunQuit();
} // namespace testing } // namespace testing
} // namespace grpc } // namespace grpc
......
...@@ -78,6 +78,8 @@ DEFINE_int32(client_core_limit, -1, "Limit on client cores to use"); ...@@ -78,6 +78,8 @@ DEFINE_int32(client_core_limit, -1, "Limit on client cores to use");
DEFINE_bool(secure_test, false, "Run a secure test"); DEFINE_bool(secure_test, false, "Run a secure test");
DEFINE_bool(quit, false, "Quit the workers");
using grpc::testing::ClientConfig; using grpc::testing::ClientConfig;
using grpc::testing::ServerConfig; using grpc::testing::ServerConfig;
using grpc::testing::ClientType; using grpc::testing::ClientType;
...@@ -90,6 +92,11 @@ namespace grpc { ...@@ -90,6 +92,11 @@ namespace grpc {
namespace testing { namespace testing {
static void QpsDriver() { static void QpsDriver() {
if (FLAGS_quit) {
RunQuit();
return;
}
RpcType rpc_type; RpcType rpc_type;
GPR_ASSERT(RpcType_Parse(FLAGS_rpc_type, &rpc_type)); GPR_ASSERT(RpcType_Parse(FLAGS_rpc_type, &rpc_type));
......
...@@ -103,8 +103,8 @@ static std::unique_ptr<Server> CreateServer(const ServerConfig& config) { ...@@ -103,8 +103,8 @@ static std::unique_ptr<Server> CreateServer(const ServerConfig& config) {
class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service { class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
public: public:
explicit WorkerServiceImpl(int server_port) WorkerServiceImpl(int server_port, QpsWorker* worker)
: acquired_(false), server_port_(server_port) {} : acquired_(false), server_port_(server_port), worker_(worker) {}
Status RunClient(ServerContext* ctx, Status RunClient(ServerContext* ctx,
ServerReaderWriter<ClientStatus, ClientArgs>* stream) ServerReaderWriter<ClientStatus, ClientArgs>* stream)
...@@ -140,6 +140,16 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service { ...@@ -140,6 +140,16 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
return Status::OK; return Status::OK;
} }
Status QuitWorker(ServerContext* ctx, const Void*, Void*) GRPC_OVERRIDE {
InstanceGuard g(this);
if (!g.Acquired()) {
return Status(StatusCode::RESOURCE_EXHAUSTED, "");
}
worker_->MarkDone();
return Status::OK;
}
private: private:
// Protect against multiple clients using this worker at once. // Protect against multiple clients using this worker at once.
class InstanceGuard { class InstanceGuard {
...@@ -250,10 +260,12 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service { ...@@ -250,10 +260,12 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
std::mutex mu_; std::mutex mu_;
bool acquired_; bool acquired_;
int server_port_; int server_port_;
QpsWorker* worker_;
}; };
QpsWorker::QpsWorker(int driver_port, int server_port) { QpsWorker::QpsWorker(int driver_port, int server_port) {
impl_.reset(new WorkerServiceImpl(server_port)); impl_.reset(new WorkerServiceImpl(server_port, this));
gpr_atm_rel_store(&done_, static_cast<gpr_atm>(0));
char* server_address = NULL; char* server_address = NULL;
gpr_join_host_port(&server_address, "::", driver_port); gpr_join_host_port(&server_address, "::", driver_port);
...@@ -269,5 +281,11 @@ QpsWorker::QpsWorker(int driver_port, int server_port) { ...@@ -269,5 +281,11 @@ QpsWorker::QpsWorker(int driver_port, int server_port) {
QpsWorker::~QpsWorker() {} QpsWorker::~QpsWorker() {}
bool QpsWorker::Done() const {
return (gpr_atm_acq_load(&done_) != static_cast<gpr_atm>(0));
}
void QpsWorker::MarkDone() {
gpr_atm_rel_store(&done_, static_cast<gpr_atm>(1));
}
} // namespace testing } // namespace testing
} // namespace grpc } // namespace grpc
...@@ -36,6 +36,8 @@ ...@@ -36,6 +36,8 @@
#include <memory> #include <memory>
#include <grpc/support/atm.h>
namespace grpc { namespace grpc {
class Server; class Server;
...@@ -49,9 +51,14 @@ class QpsWorker { ...@@ -49,9 +51,14 @@ class QpsWorker {
explicit QpsWorker(int driver_port, int server_port = 0); explicit QpsWorker(int driver_port, int server_port = 0);
~QpsWorker(); ~QpsWorker();
bool Done() const;
void MarkDone();
private: private:
std::unique_ptr<WorkerServiceImpl> impl_; std::unique_ptr<WorkerServiceImpl> impl_;
std::unique_ptr<Server> server_; std::unique_ptr<Server> server_;
gpr_atm done_;
}; };
} // namespace testing } // namespace testing
......
...@@ -56,7 +56,7 @@ namespace testing { ...@@ -56,7 +56,7 @@ namespace testing {
static void RunServer() { static void RunServer() {
QpsWorker worker(FLAGS_driver_port, FLAGS_server_port); QpsWorker worker(FLAGS_driver_port, FLAGS_server_port);
while (!got_sigint) { while (!got_sigint && !worker.Done()) {
gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_seconds(5, GPR_TIMESPAN))); gpr_time_from_seconds(5, GPR_TIMESPAN)));
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment