From d08a738166e7f293d4590bc852e102ebcbd2239d Mon Sep 17 00:00:00 2001 From: vjpai <vpai@google.com> Date: Mon, 2 Nov 2015 16:45:08 -0800 Subject: [PATCH] Cleanup some names Remove some unused options and make server_threads relevant Start stubbing out better qps/core calculations --- .../cpp/qps/async_streaming_ping_pong_test.cc | 4 ++-- test/cpp/qps/async_unary_ping_pong_test.cc | 4 ++-- test/cpp/qps/client.h | 4 ++-- test/cpp/qps/client_async.cc | 2 +- test/cpp/qps/client_sync.cc | 2 +- test/cpp/qps/driver.cc | 15 ++++++++------- test/cpp/qps/driver.h | 8 +++++--- test/cpp/qps/qps-sweep.sh | 4 ++-- test/cpp/qps/qps_driver.cc | 19 +++++-------------- test/cpp/qps/qps_openloop_test.cc | 2 +- test/cpp/qps/qps_test.cc | 4 ++-- test/cpp/qps/qps_worker.cc | 17 ++++++++++++----- test/cpp/qps/report.cc | 7 ++++--- test/cpp/qps/server.h | 5 ++++- test/cpp/qps/server_async.cc | 12 ++++++------ test/cpp/qps/server_sync.cc | 2 +- test/cpp/qps/sync_streaming_ping_pong_test.cc | 7 +++---- test/cpp/qps/sync_unary_ping_pong_test.cc | 7 +++---- test/proto/benchmarks/control.proto | 14 ++++++++------ 19 files changed, 72 insertions(+), 67 deletions(-) diff --git a/test/cpp/qps/async_streaming_ping_pong_test.cc b/test/cpp/qps/async_streaming_ping_pong_test.cc index e3a614e743..c66aa19e18 100644 --- a/test/cpp/qps/async_streaming_ping_pong_test.cc +++ b/test/cpp/qps/async_streaming_ping_pong_test.cc @@ -58,12 +58,12 @@ static void RunAsyncStreamingPingPong() { client_config.set_payload_size(1); client_config.set_async_client_threads(1); client_config.set_rpc_type(STREAMING); - client_config.mutable_load_params()->mutable_closed(); + client_config.mutable_load_params()->mutable_closed_loop(); ServerConfig server_config; server_config.set_server_type(ASYNC_SERVER); server_config.set_use_tls(false); - server_config.set_threads(1); + server_config.set_async_server_threads(1); const auto result = RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2); diff --git a/test/cpp/qps/async_unary_ping_pong_test.cc b/test/cpp/qps/async_unary_ping_pong_test.cc index caed835325..efc599d698 100644 --- a/test/cpp/qps/async_unary_ping_pong_test.cc +++ b/test/cpp/qps/async_unary_ping_pong_test.cc @@ -58,12 +58,12 @@ static void RunAsyncUnaryPingPong() { client_config.set_payload_size(1); client_config.set_async_client_threads(1); client_config.set_rpc_type(UNARY); - client_config.mutable_load_params()->mutable_closed(); + client_config.mutable_load_params()->mutable_closed_loop(); ServerConfig server_config; server_config.set_server_type(ASYNC_SERVER); server_config.set_use_tls(false); - server_config.set_threads(1); + server_config.set_async_server_threads(1); const auto result = RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2); diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 110249bd25..961e714fa8 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -41,7 +41,7 @@ #include "test/cpp/qps/interarrival.h" #include "test/cpp/qps/timer.h" #include "test/cpp/util/create_test_channel.h" -#include "test/proto/perf_tests/perf_services.grpc.pb.h" +#include "test/proto/benchmarks/services.grpc.pb.h" namespace grpc { @@ -171,7 +171,7 @@ class Client { } else if (load.has_pareto()) { random_dist.reset(new ParetoDist(load.pareto().interarrival_base() * num_threads, load.pareto().alpha())); - } else if (load.has_closed()) { + } else if (load.has_closed_loop()) { // Closed-loop doesn't use random dist at all } else { // invalid load type GPR_ASSERT(false); diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 41db6151c5..b376f8501b 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -51,7 +51,7 @@ #include "test/cpp/qps/timer.h" #include "test/cpp/qps/client.h" #include "test/cpp/util/create_test_channel.h" -#include "test/proto/perf_tests/perf_services.grpc.pb.h" +#include "test/proto/benchmarks/services.grpc.pb.h" namespace grpc { namespace testing { diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index 44d525b196..10d680860a 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -57,7 +57,7 @@ #include "test/cpp/qps/histogram.h" #include "test/cpp/qps/interarrival.h" #include "test/cpp/qps/timer.h" -#include "test/proto/perf_tests/perf_services.grpc.pb.h" +#include "test/proto/benchmarks/services.grpc.pb.h" #include "src/core/profiling/timers.h" diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc index 6c852769a5..2803991b42 100644 --- a/test/cpp/qps/driver.cc +++ b/test/cpp/qps/driver.cc @@ -48,7 +48,7 @@ #include "test/cpp/qps/driver.h" #include "test/cpp/qps/histogram.h" #include "test/cpp/qps/qps_worker.h" -#include "test/proto/perf_tests/perf_services.grpc.pb.h" +#include "test/proto/benchmarks/services.grpc.pb.h" using std::list; using std::thread; @@ -165,7 +165,6 @@ std::unique_ptr<ScenarioResult> RunScenario( WorkerService::NewStub(CreateChannel(workers[i], InsecureCredentials())); ServerArgs args; result_server_config = server_config; - result_server_config.set_host(workers[i]); *args.mutable_setup() = server_config; servers[i].stream = servers[i].stub->RunServer(runsc::AllocContext(&contexts, deadline)); @@ -193,7 +192,6 @@ std::unique_ptr<ScenarioResult> RunScenario( CreateChannel(workers[i + num_servers], InsecureCredentials())); ClientArgs args; result_client_config = client_config; - result_client_config.set_host(workers[i + num_servers]); *args.mutable_setup() = client_config; clients[i].stream = clients[i].stub->RunClient(runsc::AllocContext(&contexts, deadline)); @@ -250,15 +248,18 @@ std::unique_ptr<ScenarioResult> RunScenario( for (auto server = &servers[0]; server != &servers[num_servers]; server++) { GPR_ASSERT(server->stream->Read(&server_status)); const auto& stats = server_status.stats(); - result->server_resources.emplace_back( - stats.time_elapsed(), stats.time_user(), stats.time_system()); + result->server_resources.emplace_back(stats.time_elapsed(), + stats.time_user(), + stats.time_system(), + server_status.cores()); } for (auto client = &clients[0]; client != &clients[num_clients]; client++) { GPR_ASSERT(client->stream->Read(&client_status)); const auto& stats = client_status.stats(); result->latencies.MergeProto(stats.latencies()); - result->client_resources.emplace_back( - stats.time_elapsed(), stats.time_user(), stats.time_system()); + result->client_resources.emplace_back(stats.time_elapsed(), + stats.time_user(), + stats.time_system(), -1); } for (auto client = &clients[0]; client != &clients[num_clients]; client++) { diff --git a/test/cpp/qps/driver.h b/test/cpp/qps/driver.h index 7f69c013b7..50c0ba63a1 100644 --- a/test/cpp/qps/driver.h +++ b/test/cpp/qps/driver.h @@ -37,22 +37,24 @@ #include <memory> #include "test/cpp/qps/histogram.h" -#include "test/proto/perf_tests/perf_control.grpc.pb.h" +#include "test/proto/benchmarks/control.grpc.pb.h" namespace grpc { namespace testing { class ResourceUsage { public: - ResourceUsage(double w, double u, double s) - : wall_time_(w), user_time_(u), system_time_(s) {} + ResourceUsage(double w, double u, double s, int c) + : wall_time_(w), user_time_(u), system_time_(s), cores_(c) {} double wall_time() const { return wall_time_; } double user_time() const { return user_time_; } double system_time() const { return system_time_; } + int cores() const { return cores_; } private: double wall_time_; double user_time_; double system_time_; + int cores_; }; struct ScenarioResult { diff --git a/test/cpp/qps/qps-sweep.sh b/test/cpp/qps/qps-sweep.sh index cb93201933..216e40820c 100755 --- a/test/cpp/qps/qps-sweep.sh +++ b/test/cpp/qps/qps-sweep.sh @@ -39,9 +39,9 @@ bins=`find . .. ../.. ../../.. -name bins | head -1` for channels in 1 2 4 8 do - for client in SYNCHRONOUS_CLIENT ASYNC_CLIENT + for client in SYNC_CLIENT ASYNC_CLIENT do - for server in SYNCHRONOUS_SERVER ASYNC_SERVER + for server in SYNC_SERVER ASYNC_SERVER do for rpc in UNARY STREAMING do diff --git a/test/cpp/qps/qps_driver.cc b/test/cpp/qps/qps_driver.cc index 658cf873e8..a314be6f69 100644 --- a/test/cpp/qps/qps_driver.cc +++ b/test/cpp/qps/qps_driver.cc @@ -54,15 +54,15 @@ DEFINE_bool(use_tls, false, "Use TLS"); DEFINE_string(rpc_type, "UNARY", "Type of RPC: UNARY or STREAMING"); // Server config -DEFINE_int32(server_threads, 1, "Number of server threads"); -DEFINE_string(server_type, "SYNCHRONOUS_SERVER", "Server type"); +DEFINE_int32(async_server_threads, 1, "Number of threads for async servers"); +DEFINE_string(server_type, "SYNC_SERVER", "Server type"); // Client config DEFINE_int32(outstanding_rpcs_per_channel, 1, "Number of outstanding rpcs per channel"); DEFINE_int32(client_channels, 1, "Number of client channels"); DEFINE_int32(payload_size, 1, "Payload size"); -DEFINE_string(client_type, "SYNCHRONOUS_CLIENT", "Client type"); +DEFINE_string(client_type, "SYNC_CLIENT", "Client type"); DEFINE_int32(async_client_threads, 1, "Async client threads"); DEFINE_double(poisson_load, -1.0, "Poisson offered load (qps)"); @@ -117,23 +117,14 @@ static void QpsDriver() { pareto->set_interarrival_base(FLAGS_pareto_base / 1e6); pareto->set_alpha(FLAGS_pareto_alpha); } else { - client_config.mutable_load_params()->mutable_closed(); + client_config.mutable_load_params()->mutable_closed_loop(); // No further load parameters to set up for closed loop } ServerConfig server_config; server_config.set_server_type(server_type); - server_config.set_threads(FLAGS_server_threads); server_config.set_use_tls(FLAGS_use_tls); - - // If we're running a sync-server streaming test, make sure - // that we have at least as many threads as the active streams - // or else threads will be blocked from forward progress and the - // client will deadlock on a timer. - GPR_ASSERT(!(server_type == grpc::testing::SYNCHRONOUS_SERVER && - rpc_type == grpc::testing::STREAMING && - FLAGS_server_threads < - FLAGS_client_channels * FLAGS_outstanding_rpcs_per_channel)); + server_config.set_async_server_threads(FLAGS_async_server_threads); const auto result = RunScenario( client_config, FLAGS_num_clients, server_config, FLAGS_num_servers, diff --git a/test/cpp/qps/qps_openloop_test.cc b/test/cpp/qps/qps_openloop_test.cc index 918381b850..d9f5ffc2ef 100644 --- a/test/cpp/qps/qps_openloop_test.cc +++ b/test/cpp/qps/qps_openloop_test.cc @@ -64,7 +64,7 @@ static void RunQPS() { ServerConfig server_config; server_config.set_server_type(ASYNC_SERVER); server_config.set_use_tls(false); - server_config.set_threads(4); + server_config.set_async_server_threads(4); const auto result = RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2); diff --git a/test/cpp/qps/qps_test.cc b/test/cpp/qps/qps_test.cc index 82850e5dbe..770a0d4ebf 100644 --- a/test/cpp/qps/qps_test.cc +++ b/test/cpp/qps/qps_test.cc @@ -58,12 +58,12 @@ static void RunQPS() { client_config.set_payload_size(1); client_config.set_async_client_threads(8); client_config.set_rpc_type(UNARY); - client_config.mutable_load_params()->mutable_closed(); + client_config.mutable_load_params()->mutable_closed_loop(); ServerConfig server_config; server_config.set_server_type(ASYNC_SERVER); server_config.set_use_tls(false); - server_config.set_threads(8); + server_config.set_async_server_threads(8); const auto result = RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2); diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc index cad2a9e064..0b34daec63 100644 --- a/test/cpp/qps/qps_worker.cc +++ b/test/cpp/qps/qps_worker.cc @@ -55,14 +55,14 @@ #include "test/cpp/qps/client.h" #include "test/cpp/qps/server.h" #include "test/cpp/util/create_test_channel.h" -#include "test/proto/perf_tests/perf_services.pb.h" +#include "test/proto/benchmarks/services.pb.h" namespace grpc { namespace testing { -std::unique_ptr<Client> CreateClient(const ClientConfig& config) { +static std::unique_ptr<Client> CreateClient(const ClientConfig& config) { switch (config.client_type()) { - case ClientType::SYNCHRONOUS_CLIENT: + case ClientType::SYNC_CLIENT: return (config.rpc_type() == RpcType::UNARY) ? CreateSynchronousUnaryClient(config) : CreateSynchronousStreamingClient(config); @@ -76,9 +76,15 @@ std::unique_ptr<Client> CreateClient(const ClientConfig& config) { abort(); } -std::unique_ptr<Server> CreateServer(const ServerConfig& config) { +static void LimitCores(int cores) { +} + +static std::unique_ptr<Server> CreateServer(const ServerConfig& config) { + if (config.core_limit() > 0) { + LimitCores(config.core_limit()); + } switch (config.server_type()) { - case ServerType::SYNCHRONOUS_SERVER: + case ServerType::SYNC_SERVER: return CreateSynchronousServer(config); case ServerType::ASYNC_SERVER: return CreateAsyncServer(config); @@ -195,6 +201,7 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service { } ServerStatus status; status.set_port(server->Port()); + status.set_cores(server->Cores()); if (!stream->Write(status)) { return Status(StatusCode::UNKNOWN, ""); } diff --git a/test/cpp/qps/report.cc b/test/cpp/qps/report.cc index e03e8e1fb0..b230eb441e 100644 --- a/test/cpp/qps/report.cc +++ b/test/cpp/qps/report.cc @@ -43,6 +43,7 @@ namespace testing { static double WallTime(ResourceUsage u) { return u.wall_time(); } static double UserTime(ResourceUsage u) { return u.user_time(); } static double SystemTime(ResourceUsage u) { return u.system_time(); } +static int Cores(ResourceUsage u) { return u.cores(); } void CompositeReporter::add(std::unique_ptr<Reporter> reporter) { reporters_.emplace_back(std::move(reporter)); @@ -83,7 +84,7 @@ void GprLogReporter::ReportQPSPerCore(const ScenarioResult& result) { result.latencies.Count() / average(result.client_resources, WallTime); gpr_log(GPR_INFO, "QPS: %.1f (%.1f/server core)", qps, - qps / result.server_config.threads()); + qps / sum(result.server_resources, Cores)); } void GprLogReporter::ReportLatency(const ScenarioResult& result) { @@ -123,10 +124,10 @@ void PerfDbReporter::ReportQPSPerCore(const ScenarioResult& result) { auto qps = result.latencies.Count() / average(result.client_resources, WallTime); - auto qpsPerCore = qps / result.server_config.threads(); + auto qps_per_core = qps / sum(result.server_resources, Cores); perf_db_client_.setQps(qps); - perf_db_client_.setQpsPerCore(qpsPerCore); + perf_db_client_.setQpsPerCore(qps_per_core); perf_db_client_.setConfigs(result.client_config, result.server_config); } diff --git a/test/cpp/qps/server.h b/test/cpp/qps/server.h index 3ea9382e50..12bbf1fef6 100644 --- a/test/cpp/qps/server.h +++ b/test/cpp/qps/server.h @@ -34,10 +34,12 @@ #ifndef TEST_QPS_SERVER_H #define TEST_QPS_SERVER_H +#include <grpc/support/cpu.h> + #include "test/core/util/port.h" #include "test/cpp/qps/timer.h" #include "test/proto/messages.grpc.pb.h" -#include "test/proto/perf_tests/perf_control.grpc.pb.h" +#include "test/proto/benchmarks/control.grpc.pb.h" namespace grpc { namespace testing { @@ -83,6 +85,7 @@ class Server { } int Port() const {return port_;} + int Cores() const {return gpr_cpu_num_cores();} private: int port_; std::unique_ptr<Timer> timer_; diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index 2b3f7a38fb..b4b397afa8 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -50,7 +50,7 @@ #include <gtest/gtest.h> #include "test/cpp/qps/server.h" -#include "test/proto/perf_tests/perf_services.grpc.pb.h" +#include "test/proto/benchmarks/services.grpc.pb.h" namespace grpc { namespace testing { @@ -67,15 +67,15 @@ class AsyncQpsServerTest : public Server { gpr_free(server_address); builder.RegisterAsyncService(&async_service_); - for (int i = 0; i < config.threads(); i++) { + for (int i = 0; i < config.async_server_threads(); i++) { srv_cqs_.emplace_back(builder.AddCompletionQueue()); } server_ = builder.BuildAndStart(); using namespace std::placeholders; - for (int i = 0; i < 10000 / config.threads(); i++) { - for (int j = 0; j < config.threads(); j++) { + for (int i = 0; i < 10000 / config.async_server_threads(); i++) { + for (int j = 0; j < config.async_server_threads(); j++) { auto request_unary = std::bind( &BenchmarkService::AsyncService::RequestUnaryCall, &async_service_, _1, _2, _3, srv_cqs_[j].get(), srv_cqs_[j].get(), _4); @@ -90,10 +90,10 @@ class AsyncQpsServerTest : public Server { request_streaming, ProcessRPC)); } } - for (int i = 0; i < config.threads(); i++) { + for (int i = 0; i < config.async_server_threads(); i++) { shutdown_state_.emplace_back(new PerThreadShutdownState()); } - for (int i = 0; i < config.threads(); i++) { + for (int i = 0; i < config.async_server_threads(); i++) { threads_.emplace_back(&AsyncQpsServerTest::ThreadFunc, this, i); } } diff --git a/test/cpp/qps/server_sync.cc b/test/cpp/qps/server_sync.cc index 3e7cf1d4cb..feca7e2ac2 100644 --- a/test/cpp/qps/server_sync.cc +++ b/test/cpp/qps/server_sync.cc @@ -45,7 +45,7 @@ #include "test/cpp/qps/server.h" #include "test/cpp/qps/timer.h" -#include "test/proto/perf_tests/perf_services.grpc.pb.h" +#include "test/proto/benchmarks/services.grpc.pb.h" namespace grpc { namespace testing { diff --git a/test/cpp/qps/sync_streaming_ping_pong_test.cc b/test/cpp/qps/sync_streaming_ping_pong_test.cc index ce10a87ab3..dd3be73685 100644 --- a/test/cpp/qps/sync_streaming_ping_pong_test.cc +++ b/test/cpp/qps/sync_streaming_ping_pong_test.cc @@ -51,18 +51,17 @@ static void RunSynchronousStreamingPingPong() { gpr_log(GPR_INFO, "Running Synchronous Streaming Ping Pong"); ClientConfig client_config; - client_config.set_client_type(SYNCHRONOUS_CLIENT); + client_config.set_client_type(SYNC_CLIENT); client_config.set_use_tls(false); client_config.set_outstanding_rpcs_per_channel(1); client_config.set_client_channels(1); client_config.set_payload_size(1); client_config.set_rpc_type(STREAMING); - client_config.mutable_load_params()->mutable_closed(); + client_config.mutable_load_params()->mutable_closed_loop(); ServerConfig server_config; - server_config.set_server_type(SYNCHRONOUS_SERVER); + server_config.set_server_type(SYNC_SERVER); server_config.set_use_tls(false); - server_config.set_threads(1); const auto result = RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2); diff --git a/test/cpp/qps/sync_unary_ping_pong_test.cc b/test/cpp/qps/sync_unary_ping_pong_test.cc index c20e2c5ff0..2c1d2aa764 100644 --- a/test/cpp/qps/sync_unary_ping_pong_test.cc +++ b/test/cpp/qps/sync_unary_ping_pong_test.cc @@ -51,18 +51,17 @@ static void RunSynchronousUnaryPingPong() { gpr_log(GPR_INFO, "Running Synchronous Unary Ping Pong"); ClientConfig client_config; - client_config.set_client_type(SYNCHRONOUS_CLIENT); + client_config.set_client_type(SYNC_CLIENT); client_config.set_use_tls(false); client_config.set_outstanding_rpcs_per_channel(1); client_config.set_client_channels(1); client_config.set_payload_size(1); client_config.set_rpc_type(UNARY); - client_config.mutable_load_params()->mutable_closed(); + client_config.mutable_load_params()->mutable_closed_loop(); ServerConfig server_config; - server_config.set_server_type(SYNCHRONOUS_SERVER); + server_config.set_server_type(SYNC_SERVER); server_config.set_use_tls(false); - server_config.set_threads(1); const auto result = RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2); diff --git a/test/proto/benchmarks/control.proto b/test/proto/benchmarks/control.proto index e0fe8c0d26..50c2a31633 100644 --- a/test/proto/benchmarks/control.proto +++ b/test/proto/benchmarks/control.proto @@ -91,8 +91,7 @@ message ClientConfig { // only for async client: int32 async_client_threads = 7; RpcType rpc_type = 8; - string host = 9; - LoadParams load_params = 11; + LoadParams load_params = 10; } message ClientStatus { @@ -113,10 +112,12 @@ message ClientArgs { message ServerConfig { ServerType server_type = 1; - int32 threads = 2; - bool use_tls = 3; - string host = 4; - int32 port = 5; + bool use_tls = 2; + int32 port = 4; + // only for async server + int32 async_server_threads = 7; + // restrict core usage + int32 core_limit = 8; } message ServerArgs { @@ -129,4 +130,5 @@ message ServerArgs { message ServerStatus { ServerStats stats = 1; int32 port = 2; + int32 cores = 3; } -- GitLab