Skip to content
Snippets Groups Projects
Commit d1e18fae authored by Craig Tiller's avatar Craig Tiller
Browse files

Async client progress

parent 88568759
No related branches found
No related tags found
No related merge requests found
...@@ -175,164 +175,3 @@ std::unique_ptr<Client> CreateAsyncClient(const ClientConfig& args) { ...@@ -175,164 +175,3 @@ std::unique_ptr<Client> CreateAsyncClient(const ClientConfig& args) {
} // namespace testing } // namespace testing
} // namespace grpc } // namespace grpc
#if 0
static void RunTest(const int client_threads, const int client_channels,
const int num_rpcs, const int payload_size) {
gpr_log(GPR_INFO,
"QPS test with parameters\n"
"enable_ssl = %d\n"
"client_channels = %d\n"
"client_threads = %d\n"
"num_rpcs = %d\n"
"payload_size = %d\n"
"server_host:server_port = %s:%d\n\n",
FLAGS_enable_ssl, client_channels, client_threads, num_rpcs,
payload_size, FLAGS_server_host.c_str(), FLAGS_server_port);
std::ostringstream oss;
oss << FLAGS_server_host << ":" << FLAGS_server_port;
std::vector<std::thread> threads; // Will add threads when ready to execute
std::vector< ::gpr_histogram *> thread_stats(client_threads);
TestService::Stub *stub_stats = channels[0].get_stub();
grpc::ClientContext context_stats_begin;
StatsRequest stats_request;
ServerStats server_stats_begin;
stats_request.set_test_num(0);
grpc::Status status_beg = stub_stats->CollectServerStats(
&context_stats_begin, stats_request, &server_stats_begin);
grpc_profiler_start("qps_client_async.prof");
for (int i = 0; i < client_threads; i++) {
gpr_histogram *hist = gpr_histogram_create(0.01, 60e9);
GPR_ASSERT(hist != NULL);
thread_stats[i] = hist;
threads.push_back(std::thread(
[hist, client_threads, client_channels, num_rpcs, payload_size,
&channels, &CheckDone](int channel_num) {
using namespace std::placeholders;
SimpleRequest request;
request.set_response_type(grpc::testing::PayloadType::COMPRESSABLE);
request.set_response_size(payload_size);
grpc::CompletionQueue cli_cq;
int rpcs_sent = 0;
while (rpcs_sent < num_rpcs) {
rpcs_sent++;
TestService::Stub *stub = channels[channel_num].get_stub();
new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(stub,
request, start_req, CheckDone);
void *got_tag;
bool ok;
// Need to call 2 next for every 1 RPC (1 for req done, 1 for resp
// done)
cli_cq.Next(&got_tag, &ok);
if (!ok) break;
ClientRpcContext *ctx = ClientRpcContext::detag(got_tag);
if (ctx->RunNextState() == false) {
// call the callback and then delete it
ctx->report_stats(hist);
ctx->RunNextState();
delete ctx;
}
cli_cq.Next(&got_tag, &ok);
if (!ok) break;
ctx = ClientRpcContext::detag(got_tag);
if (ctx->RunNextState() == false) {
// call the callback and then delete it
ctx->report_stats(hist);
ctx->RunNextState();
delete ctx;
}
// Now do runtime round-robin assignment of the next
// channel number
channel_num += client_threads;
channel_num %= client_channels;
}
},
i % client_channels));
}
gpr_histogram *hist = gpr_histogram_create(0.01, 60e9);
GPR_ASSERT(hist != NULL);
for (auto &t : threads) {
t.join();
}
grpc_profiler_stop();
for (int i = 0; i < client_threads; i++) {
gpr_histogram *h = thread_stats[i];
gpr_log(GPR_INFO, "latency at thread %d (50/90/95/99/99.9): %f/%f/%f/%f/%f",
i, gpr_histogram_percentile(h, 50), gpr_histogram_percentile(h, 90),
gpr_histogram_percentile(h, 95), gpr_histogram_percentile(h, 99),
gpr_histogram_percentile(h, 99.9));
gpr_histogram_merge(hist, h);
gpr_histogram_destroy(h);
}
gpr_log(
GPR_INFO,
"latency across %d threads with %d channels and %d payload "
"(50/90/95/99/99.9): %f / %f / %f / %f / %f",
client_threads, client_channels, payload_size,
gpr_histogram_percentile(hist, 50), gpr_histogram_percentile(hist, 90),
gpr_histogram_percentile(hist, 95), gpr_histogram_percentile(hist, 99),
gpr_histogram_percentile(hist, 99.9));
gpr_histogram_destroy(hist);
grpc::ClientContext context_stats_end;
ServerStats server_stats_end;
grpc::Status status_end = stub_stats->CollectServerStats(
&context_stats_end, stats_request, &server_stats_end);
double elapsed = server_stats_end.time_now() - server_stats_begin.time_now();
int total_rpcs = client_threads * num_rpcs;
double utime = server_stats_end.time_user() - server_stats_begin.time_user();
double stime =
server_stats_end.time_system() - server_stats_begin.time_system();
gpr_log(GPR_INFO,
"Elapsed time: %.3f\n"
"RPC Count: %d\n"
"QPS: %.3f\n"
"System time: %.3f\n"
"User time: %.3f\n"
"Resource usage: %.1f%%\n",
elapsed, total_rpcs, total_rpcs / elapsed, stime, utime,
(stime + utime) / elapsed * 100.0);
}
int main(int argc, char **argv) {
grpc_init();
ParseCommandLineFlags(&argc, &argv, true);
GPR_ASSERT(FLAGS_server_port);
if (FLAGS_workload.length() == 0) {
RunTest(FLAGS_client_threads, FLAGS_client_channels, FLAGS_num_rpcs,
FLAGS_payload_size);
} else {
std::istringstream workload(FLAGS_workload);
int client_threads, client_channels, num_rpcs, payload_size;
workload >> client_threads;
while (!workload.eof()) {
workload >> client_channels >> num_rpcs >> payload_size;
RunTest(client_threads, client_channels, num_rpcs, payload_size);
workload >> client_threads;
}
gpr_log(GPR_INFO, "Done with specified workload.");
}
grpc_shutdown();
return 0;
}
#endif
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment