Skip to content
Snippets Groups Projects
Commit 40fde140 authored by Craig Tiller's avatar Craig Tiller
Browse files

Add logging of flow control variables

parent c2d9f1e2
Branches
Tags
No related merge requests found
......@@ -193,3 +193,11 @@ size_t grpc_trickle_endpoint_trickle(grpc_exec_ctx *exec_ctx,
gpr_mu_unlock(&te->mu);
return backlog;
}
size_t grpc_trickle_get_backlog(grpc_endpoint *ep) {
trickle_endpoint *te = (trickle_endpoint *)ep;
gpr_mu_lock(&te->mu);
size_t backlog = te->write_buffer.length;
gpr_mu_unlock(&te->mu);
return backlog;
}
......@@ -43,4 +43,6 @@ grpc_endpoint *grpc_trickle_endpoint_create(grpc_endpoint *wrap,
size_t grpc_trickle_endpoint_trickle(grpc_exec_ctx *exec_ctx,
grpc_endpoint *endpoint);
size_t grpc_trickle_get_backlog(grpc_endpoint *endpoint);
#endif
......@@ -34,6 +34,8 @@
/* Benchmark gRPC end2end in various configurations */
#include <benchmark/benchmark.h>
#include <gflags/gflags.h>
#include <fstream>
#include "src/core/lib/profiling/timers.h"
#include "src/cpp/client/create_channel_internal.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h"
......@@ -45,16 +47,48 @@ extern "C" {
#include "test/core/util/trickle_endpoint.h"
}
DEFINE_bool(log, false, "Log state to CSV files");
namespace grpc {
namespace testing {
static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); }
template <class A0>
static void write_csv(std::ostream* out, A0&& a0) {
if (!out) return;
(*out) << a0 << "\n";
}
template <class A0, class... Arg>
static void write_csv(std::ostream* out, A0&& a0, Arg&&... arg) {
if (!out) return;
(*out) << a0 << ",";
write_csv(out, std::forward<Arg>(arg)...);
}
class TrickledCHTTP2 : public EndpointPairFixture {
public:
TrickledCHTTP2(Service* service, size_t megabits_per_second)
TrickledCHTTP2(Service* service, size_t message_size,
size_t megabits_per_second)
: EndpointPairFixture(service, MakeEndpoints(megabits_per_second),
FixtureConfiguration()) {}
FixtureConfiguration()) {
if (FLAGS_log) {
std::ostringstream fn;
fn << "trickle." << message_size << "." << megabits_per_second << ".csv";
log_.reset(new std::ofstream(fn.str().c_str()));
write_csv(log_.get(), "t", "iteration", "client_backlog",
"server_backlog", "client_t_stall", "client_s_stall",
"server_t_stall", "server_s_stall", "client_t_outgoing",
"server_t_outgoing", "client_t_incoming", "server_t_incoming",
"client_s_outgoing_delta", "server_s_outgoing_delta",
"client_s_incoming_delta", "server_s_incoming_delta",
"client_s_announce_window", "server_s_announce_window",
"client_peer_iws", "client_local_iws", "client_sent_iws",
"client_acked_iws", "server_peer_iws", "server_local_iws",
"server_sent_iws", "server_acked_iws");
}
}
void AddToLabel(std::ostream& out, benchmark::State& state) {
out << " writes/iter:"
......@@ -75,6 +109,55 @@ class TrickledCHTTP2 : public EndpointPairFixture {
(double)state.iterations());
}
void Log(int64_t iteration) {
auto now = gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), start_);
grpc_chttp2_transport* client =
reinterpret_cast<grpc_chttp2_transport*>(client_transport_);
grpc_chttp2_transport* server =
reinterpret_cast<grpc_chttp2_transport*>(server_transport_);
grpc_chttp2_stream* client_stream =
client->stream_map.count == 1
? static_cast<grpc_chttp2_stream*>(client->stream_map.values[0])
: nullptr;
grpc_chttp2_stream* server_stream =
server->stream_map.count == 1
? static_cast<grpc_chttp2_stream*>(server->stream_map.values[0])
: nullptr;
write_csv(
log_.get(), static_cast<double>(now.tv_sec) +
1e-9 * static_cast<double>(now.tv_nsec),
iteration, grpc_trickle_get_backlog(endpoint_pair_.client),
grpc_trickle_get_backlog(endpoint_pair_.server),
client->lists[GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT].head != nullptr,
client->lists[GRPC_CHTTP2_LIST_STALLED_BY_STREAM].head != nullptr,
server->lists[GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT].head != nullptr,
server->lists[GRPC_CHTTP2_LIST_STALLED_BY_STREAM].head != nullptr,
client->outgoing_window, server->outgoing_window,
client->incoming_window, server->incoming_window,
client_stream ? client_stream->outgoing_window_delta : -1,
server_stream ? server_stream->outgoing_window_delta : -1,
client_stream ? client_stream->incoming_window_delta : -1,
server_stream ? server_stream->incoming_window_delta : -1,
client_stream ? client_stream->announce_window : -1,
server_stream ? server_stream->announce_window : -1,
client->settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
client->settings[GRPC_LOCAL_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
client->settings[GRPC_SENT_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
client->settings[GRPC_ACKED_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
server->settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
server->settings[GRPC_LOCAL_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
server->settings[GRPC_SENT_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
server->settings[GRPC_ACKED_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]);
}
void Step() {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
size_t client_backlog =
......@@ -97,6 +180,8 @@ class TrickledCHTTP2 : public EndpointPairFixture {
};
Stats client_stats_;
Stats server_stats_;
std::unique_ptr<std::ofstream> log_;
gpr_timespec start_ = gpr_now(GPR_CLOCK_MONOTONIC);
grpc_endpoint_pair MakeEndpoints(size_t kilobits) {
grpc_endpoint_pair p;
......@@ -123,8 +208,10 @@ class TrickledCHTTP2 : public EndpointPairFixture {
// force library initialization
auto& force_library_initialization = Library::get();
static void TrickleCQNext(TrickledCHTTP2* fixture, void** t, bool* ok) {
static void TrickleCQNext(TrickledCHTTP2* fixture, void** t, bool* ok,
int64_t iteration) {
while (true) {
fixture->Log(iteration);
switch (fixture->cq()->AsyncNext(
t, ok, gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
gpr_time_from_micros(100, GPR_TIMESPAN)))) {
......@@ -143,7 +230,7 @@ static void TrickleCQNext(TrickledCHTTP2* fixture, void** t, bool* ok) {
static void BM_PumpStreamServerToClient_Trickle(benchmark::State& state) {
EchoTestService::AsyncService service;
std::unique_ptr<TrickledCHTTP2> fixture(
new TrickledCHTTP2(&service, state.range(1)));
new TrickledCHTTP2(&service, state.range(0), state.range(1)));
{
EchoResponse send_response;
EchoResponse recv_response;
......@@ -163,7 +250,7 @@ static void BM_PumpStreamServerToClient_Trickle(benchmark::State& state) {
void* t;
bool ok;
while (need_tags) {
TrickleCQNext(fixture.get(), &t, &ok);
TrickleCQNext(fixture.get(), &t, &ok, -1);
GPR_ASSERT(ok);
int i = (int)(intptr_t)t;
GPR_ASSERT(need_tags & (1 << i));
......@@ -174,7 +261,7 @@ static void BM_PumpStreamServerToClient_Trickle(benchmark::State& state) {
GPR_TIMER_SCOPE("BenchmarkCycle", 0);
response_rw.Write(send_response, tag(1));
while (true) {
TrickleCQNext(fixture.get(), &t, &ok);
TrickleCQNext(fixture.get(), &t, &ok, state.iterations());
if (t == tag(0)) {
request_rw->Read(&recv_response, tag(0));
} else if (t == tag(1)) {
......@@ -187,7 +274,7 @@ static void BM_PumpStreamServerToClient_Trickle(benchmark::State& state) {
response_rw.Finish(Status::OK, tag(1));
need_tags = (1 << 0) | (1 << 1);
while (need_tags) {
TrickleCQNext(fixture.get(), &t, &ok);
TrickleCQNext(fixture.get(), &t, &ok, -1);
int i = (int)(intptr_t)t;
GPR_ASSERT(need_tags & (1 << i));
need_tags &= ~(1 << i);
......@@ -217,4 +304,8 @@ BENCHMARK(BM_PumpStreamServerToClient_Trickle)->Apply(TrickleArgs);
}
}
BENCHMARK_MAIN();
int main(int argc, char** argv) {
::benchmark::Initialize(&argc, argv);
::google::ParseCommandLineFlags(&argc, &argv, false);
::benchmark::RunSpecifiedBenchmarks();
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment