From 86253ca1da5349488e7098fcb794a1bf26d88415 Mon Sep 17 00:00:00 2001 From: Craig Tiller <ctiller@google.com> Date: Thu, 8 Oct 2015 13:31:02 -0700 Subject: [PATCH] Latency traces --- src/core/iomgr/exec_ctx.c | 4 + src/core/iomgr/fd_posix.c | 2 + .../iomgr/pollset_multipoller_with_epoll.c | 7 +- src/core/iomgr/pollset_posix.c | 7 +- src/core/iomgr/tcp_posix.c | 4 +- src/core/profiling/basic_timers.c | 32 +-- src/core/profiling/timers.h | 59 ++--- src/core/surface/call.c | 7 + src/core/surface/completion_queue.c | 11 + src/core/transport/chttp2_transport.c | 4 + src/cpp/client/channel.cc | 3 - src/cpp/server/server.cc | 4 + test/cpp/qps/client_sync.cc | 7 + tools/profile_analyzer/profile_analyzer.py | 233 ++++++++++++------ 14 files changed, 259 insertions(+), 125 deletions(-) diff --git a/src/core/iomgr/exec_ctx.c b/src/core/iomgr/exec_ctx.c index f2914d376e..10786b44c2 100644 --- a/src/core/iomgr/exec_ctx.c +++ b/src/core/iomgr/exec_ctx.c @@ -35,8 +35,11 @@ #include <grpc/support/log.h> +#include "src/core/profiling/timers.h" + int grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) { int did_something = 0; + GRPC_TIMER_BEGIN(GRPC_PTAG_EXEC_CTX_FLUSH, 0); while (!grpc_closure_list_empty(exec_ctx->closure_list)) { grpc_closure *c = exec_ctx->closure_list.head; exec_ctx->closure_list.head = exec_ctx->closure_list.tail = NULL; @@ -47,6 +50,7 @@ int grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) { c = next; } } + GRPC_TIMER_END(GRPC_PTAG_EXEC_CTX_FLUSH, 0); return did_something; } diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index b48b7f050a..a9a58904d2 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -45,6 +45,8 @@ #include <grpc/support/log.h> #include <grpc/support/useful.h> +#include "src/core/profiling/timers.h" + enum descriptor_state { NOT_READY = 0, READY = 1 diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c index faf0a6362b..c3feba3ccf 100644 --- a/src/core/iomgr/pollset_multipoller_with_epoll.c +++ b/src/core/iomgr/pollset_multipoller_with_epoll.c @@ -41,10 +41,11 @@ #include <sys/epoll.h> #include <unistd.h> -#include "src/core/iomgr/fd_posix.h" -#include "src/core/support/block_annotate.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include "src/core/iomgr/fd_posix.h" +#include "src/core/support/block_annotate.h" +#include "src/core/profiling/timers.h" typedef struct wakeup_fd_hdl { grpc_wakeup_fd wakeup_fd; @@ -182,9 +183,11 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock( /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid even going into the blocking annotation if possible */ + GRPC_TIMER_BEGIN(GRPC_PTAG_POLL, 0); GRPC_SCHEDULING_START_BLOCKING_REGION; poll_rv = grpc_poll_function(pfds, 2, timeout_ms); GRPC_SCHEDULING_END_BLOCKING_REGION; + GRPC_TIMER_END(GRPC_PTAG_POLL, 0); if (poll_rv < 0) { if (errno != EINTR) { diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index b663780a02..fceeb69192 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -195,6 +195,7 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, /* pollset->mu already held */ int added_worker = 0; int locked = 1; + GRPC_TIMER_BEGIN(GRPC_PTAG_POLLSET_WORK, 0); /* this must happen before we (potentially) drop pollset->mu */ worker->next = worker->prev = NULL; /* TODO(ctiller): pool these */ @@ -223,8 +224,10 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, added_worker = 1; gpr_tls_set(&g_current_thread_poller, (gpr_intptr)pollset); gpr_tls_set(&g_current_thread_worker, (gpr_intptr)worker); + GRPC_TIMER_BEGIN(GRPC_PTAG_POLLSET_WORK, 0); pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, worker, deadline, now); + GRPC_TIMER_END(GRPC_PTAG_POLLSET_WORK, 0); locked = 0; gpr_tls_set(&g_current_thread_poller, 0); gpr_tls_set(&g_current_thread_worker, 0); @@ -261,6 +264,7 @@ done: gpr_mu_lock(&pollset->mu); } } + GRPC_TIMER_END(GRPC_PTAG_POLLSET_WORK, 0); } void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, @@ -492,10 +496,11 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, even going into the blocking annotation if possible */ /* poll fd count (argument 2) is shortened by one if we have no events to poll on - such that it only includes the kicker */ + GRPC_TIMER_BEGIN(GRPC_PTAG_POLL, 0); GRPC_SCHEDULING_START_BLOCKING_REGION; r = grpc_poll_function(pfd, nfds, timeout); GRPC_SCHEDULING_END_BLOCKING_REGION; - GRPC_TIMER_MARK(GRPC_PTAG_POLL_FINISHED, r); + GRPC_TIMER_END(GRPC_PTAG_POLL, 0); if (fd) { grpc_fd_end_poll(exec_ctx, &fd_watcher, pfd[2].revents & POLLIN, diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index 4a57037a72..540ebd612c 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -199,7 +199,7 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { msg.msg_controllen = 0; msg.msg_flags = 0; - GRPC_TIMER_BEGIN(GRPC_PTAG_RECVMSG, 0); + GRPC_TIMER_BEGIN(GRPC_PTAG_RECVMSG, 1); do { read_bytes = recvmsg(tcp->fd, &msg, 0); } while (read_bytes < 0 && errno == EINTR); @@ -316,7 +316,7 @@ static flush_result tcp_flush(grpc_tcp *tcp) { msg.msg_controllen = 0; msg.msg_flags = 0; - GRPC_TIMER_BEGIN(GRPC_PTAG_SENDMSG, 0); + GRPC_TIMER_BEGIN(GRPC_PTAG_SENDMSG, 1); do { /* TODO(klempner): Cork if this is a partial write */ sent_length = sendmsg(tcp->fd, &msg, SENDMSG_FLAGS); diff --git a/src/core/profiling/basic_timers.c b/src/core/profiling/basic_timers.c index b7614375b3..944ed5f478 100644 --- a/src/core/profiling/basic_timers.c +++ b/src/core/profiling/basic_timers.c @@ -47,16 +47,16 @@ typedef enum { BEGIN = '{', END = '}', - MARK = '.', - IMPORTANT = '!' + MARK = '.' } marker_type; typedef struct grpc_timer_entry { gpr_timespec tm; const char *tagstr; - marker_type type; const char *file; int line; + char type; + gpr_uint8 important; } grpc_timer_entry; #define MAX_COUNT (1024 * 1024 / sizeof(grpc_timer_entry)) @@ -81,10 +81,10 @@ static void log_report() { grpc_timer_entry *entry = &(g_log[i]); fprintf(output_file, "{\"t\": %ld.%09d, \"thd\": \"%p\", \"type\": \"%c\", \"tag\": " - "\"%s\", \"file\": \"%s\", \"line\": %d}\n", + "\"%s\", \"file\": \"%s\", \"line\": %d, \"imp\": %d}\n", entry->tm.tv_sec, entry->tm.tv_nsec, (void *)(gpr_intptr)gpr_thd_currentid(), entry->type, entry->tagstr, - entry->file, entry->line); + entry->file, entry->line, entry->important); } /* Now clear out the log */ @@ -92,7 +92,7 @@ static void log_report() { } static void grpc_timers_log_add(int tag, const char *tagstr, marker_type type, - void *id, const char *file, int line) { + int important, const char *file, int line) { grpc_timer_entry *entry; /* TODO (vpai) : Improve concurrency */ @@ -107,34 +107,28 @@ static void grpc_timers_log_add(int tag, const char *tagstr, marker_type type, entry->type = type; entry->file = file; entry->line = line; + entry->important = important != 0; } /* Latency profiler API implementation. */ -void grpc_timer_add_mark(int tag, const char *tagstr, void *id, +void grpc_timer_add_mark(int tag, const char *tagstr, int important, const char *file, int line) { if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { - grpc_timers_log_add(tag, tagstr, MARK, id, file, line); + grpc_timers_log_add(tag, tagstr, MARK, important, file, line); } } -void grpc_timer_add_important_mark(int tag, const char *tagstr, void *id, - const char *file, int line) { - if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { - grpc_timers_log_add(tag, tagstr, IMPORTANT, id, file, line); - } -} - -void grpc_timer_begin(int tag, const char *tagstr, void *id, const char *file, +void grpc_timer_begin(int tag, const char *tagstr, int important, const char *file, int line) { if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { - grpc_timers_log_add(tag, tagstr, BEGIN, id, file, line); + grpc_timers_log_add(tag, tagstr, BEGIN, important, file, line); } } -void grpc_timer_end(int tag, const char *tagstr, void *id, const char *file, +void grpc_timer_end(int tag, const char *tagstr, int important, const char *file, int line) { if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { - grpc_timers_log_add(tag, tagstr, END, id, file, line); + grpc_timers_log_add(tag, tagstr, END, important, file, line); } } diff --git a/src/core/profiling/timers.h b/src/core/profiling/timers.h index c7cbf2bc2e..326c1997c3 100644 --- a/src/core/profiling/timers.h +++ b/src/core/profiling/timers.h @@ -41,13 +41,11 @@ extern "C" { void grpc_timers_global_init(void); void grpc_timers_global_destroy(void); -void grpc_timer_add_mark(int tag, const char *tagstr, void *id, +void grpc_timer_add_mark(int tag, const char *tagstr, int important, const char *file, int line); -void grpc_timer_add_important_mark(int tag, const char *tagstr, void *id, - const char *file, int line); -void grpc_timer_begin(int tag, const char *tagstr, void *id, const char *file, +void grpc_timer_begin(int tag, const char *tagstr, int important, const char *file, int line); -void grpc_timer_end(int tag, const char *tagstr, void *id, const char *file, +void grpc_timer_end(int tag, const char *tagstr, int important, const char *file, int line); enum grpc_profiling_tags { @@ -60,21 +58,36 @@ enum grpc_profiling_tags { /* Re. sockets. */ GRPC_PTAG_HANDLE_READ = 200 + GRPC_PTAG_IGNORE_THRESHOLD, - GRPC_PTAG_SENDMSG = 201 + GRPC_PTAG_IGNORE_THRESHOLD, - GRPC_PTAG_RECVMSG = 202 + GRPC_PTAG_IGNORE_THRESHOLD, - GRPC_PTAG_POLL_FINISHED = 203 + GRPC_PTAG_IGNORE_THRESHOLD, + GRPC_PTAG_SENDMSG = 201, + GRPC_PTAG_RECVMSG = 202, + GRPC_PTAG_POLL = 203, GRPC_PTAG_TCP_CB_WRITE = 204 + GRPC_PTAG_IGNORE_THRESHOLD, GRPC_PTAG_TCP_WRITE = 205 + GRPC_PTAG_IGNORE_THRESHOLD, - GRPC_PTAG_CALL_ON_DONE_RECV = 206, + GRPC_PTAG_BECOME_READABLE = 207, /* C++ */ GRPC_PTAG_CPP_CALL_CREATED = 300 + GRPC_PTAG_IGNORE_THRESHOLD, GRPC_PTAG_CPP_PERFORM_OPS = 301 + GRPC_PTAG_IGNORE_THRESHOLD, + GRPC_PTAG_CLIENT_UNARY_CALL = 302, + GRPC_PTAG_SERVER_CALL = 303, + GRPC_PTAG_SERVER_CALLBACK = 304, /* Transports */ + GRPC_PTAG_HTTP2_RECV_DATA = 400, GRPC_PTAG_HTTP2_UNLOCK = 401 + GRPC_PTAG_IGNORE_THRESHOLD, GRPC_PTAG_HTTP2_UNLOCK_CLEANUP = 402 + GRPC_PTAG_IGNORE_THRESHOLD, + /* Completion queue */ + GRPC_PTAG_CQ_NEXT = 501, + GRPC_PTAG_CQ_PLUCK = 502, + GRPC_PTAG_POLLSET_WORK = 503, + GRPC_PTAG_EXEC_CTX_FLUSH = 504, + + /* Surface */ + GRPC_PTAG_CALL_START_BATCH = 600, + GRPC_PTAG_CALL_ON_DONE_RECV = 601, + GRPC_PTAG_CALL_UNLOCK = 602, + /* > 1024 Unassigned reserved. For any miscellaneous use. * Use addition to generate tags from this base or take advantage of the 10 * zero'd bits for OR-ing. */ @@ -83,19 +96,15 @@ enum grpc_profiling_tags { #if !(defined(GRPC_STAP_PROFILER) + defined(GRPC_BASIC_PROFILER)) /* No profiling. No-op all the things. */ -#define GRPC_TIMER_MARK(tag, id) \ +#define GRPC_TIMER_MARK(tag, important) \ do { \ } while (0) -#define GRPC_TIMER_IMPORTANT_MARK(tag, id) \ - do { \ - } while (0) - -#define GRPC_TIMER_BEGIN(tag, id) \ +#define GRPC_TIMER_BEGIN(tag, important) \ do { \ } while (0) -#define GRPC_TIMER_END(tag, id) \ +#define GRPC_TIMER_END(tag, important) \ do { \ } while (0) @@ -106,27 +115,21 @@ enum grpc_profiling_tags { #endif /* Generic profiling interface. */ -#define GRPC_TIMER_MARK(tag, id) \ +#define GRPC_TIMER_MARK(tag, important) \ if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { \ - grpc_timer_add_mark(tag, #tag, ((void *)(gpr_intptr)(id)), __FILE__, \ + grpc_timer_add_mark(tag, #tag, important, __FILE__, \ __LINE__); \ } -#define GRPC_TIMER_IMPORTANT_MARK(tag, id) \ - if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { \ - grpc_timer_add_important_mark(tag, #tag, ((void *)(gpr_intptr)(id)), \ - __FILE__, __LINE__); \ - } - -#define GRPC_TIMER_BEGIN(tag, id) \ +#define GRPC_TIMER_BEGIN(tag, important) \ if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { \ - grpc_timer_begin(tag, #tag, ((void *)(gpr_intptr)(id)), __FILE__, \ + grpc_timer_begin(tag, #tag, important, __FILE__, \ __LINE__); \ } -#define GRPC_TIMER_END(tag, id) \ +#define GRPC_TIMER_END(tag, important) \ if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { \ - grpc_timer_end(tag, #tag, ((void *)(gpr_intptr)(id)), __FILE__, __LINE__); \ + grpc_timer_end(tag, #tag, important, __FILE__, __LINE__); \ } #ifdef GRPC_STAP_PROFILER diff --git a/src/core/surface/call.c b/src/core/surface/call.c index d15a3bcbad..90df15ab39 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -607,6 +607,8 @@ static void unlock(grpc_exec_ctx *exec_ctx, grpc_call *call) { const size_t MAX_RECV_PEEK_AHEAD = 65536; size_t buffered_bytes; + GRPC_TIMER_BEGIN(GRPC_PTAG_CALL_UNLOCK, 0); + memset(&op, 0, sizeof(op)); op.cancel_with_status = call->cancel_with_status; @@ -677,6 +679,8 @@ static void unlock(grpc_exec_ctx *exec_ctx, grpc_call *call) { unlock(exec_ctx, call); GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completing"); } + + GRPC_TIMER_END(GRPC_PTAG_CALL_UNLOCK, 0); } static void get_final_status(grpc_call *call, grpc_ioreq_data out) { @@ -1589,6 +1593,8 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, grpc_call_error error; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GRPC_TIMER_BEGIN(GRPC_PTAG_CALL_START_BATCH, 0); + GRPC_API_TRACE( "grpc_call_start_batch(call=%p, ops=%p, nops=%lu, tag=%p, reserved=%p)", 5, (call, ops, (unsigned long)nops, tag, reserved)); @@ -1826,6 +1832,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, finish_func, tag); done: grpc_exec_ctx_finish(&exec_ctx); + GRPC_TIMER_END(GRPC_PTAG_CALL_START_BATCH, 0); return error; } diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index e818ccba48..5ca8f837df 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -42,6 +42,7 @@ #include "src/core/surface/call.h" #include "src/core/surface/event_string.h" #include "src/core/surface/surface_trace.h" +#include "src/core/profiling/timers.h" #include <grpc/support/alloc.h> #include <grpc/support/atm.h> #include <grpc/support/log.h> @@ -184,6 +185,8 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, gpr_timespec now; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GRPC_TIMER_BEGIN(GRPC_PTAG_CQ_NEXT, 0); + GRPC_API_TRACE( "grpc_completion_queue_next(" "cc=%p, " @@ -230,6 +233,9 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); GRPC_CQ_INTERNAL_UNREF(cc, "next"); grpc_exec_ctx_finish(&exec_ctx); + + GRPC_TIMER_END(GRPC_PTAG_CQ_NEXT, 0); + return ret; } @@ -268,6 +274,8 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, int first_loop = 1; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GRPC_TIMER_BEGIN(GRPC_PTAG_CQ_PLUCK, 0); + GRPC_API_TRACE( "grpc_completion_queue_pluck(" "cc=%p, tag=%p, " @@ -333,6 +341,9 @@ done: GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); GRPC_CQ_INTERNAL_UNREF(cc, "pluck"); grpc_exec_ctx_finish(&exec_ctx); + + GRPC_TIMER_END(GRPC_PTAG_CQ_PLUCK, 0); + return ret; } diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index de74379546..b4508b42bc 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -1103,6 +1103,8 @@ static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, int success) { int keep_reading = 0; grpc_chttp2_transport *t = tp; + GRPC_TIMER_BEGIN(GRPC_PTAG_HTTP2_RECV_DATA, 0); + lock(t); i = 0; GPR_ASSERT(!t->parsing_active); @@ -1154,6 +1156,8 @@ static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, int success) { } else { UNREF_TRANSPORT(exec_ctx, t, "recv_data"); } + + GRPC_TIMER_END(GRPC_PTAG_HTTP2_RECV_DATA, 0); } /* diff --git a/src/cpp/client/channel.cc b/src/cpp/client/channel.cc index dc8e304664..c7974d655b 100644 --- a/src/cpp/client/channel.cc +++ b/src/cpp/client/channel.cc @@ -78,7 +78,6 @@ Call Channel::CreateCall(const RpcMethod& method, ClientContext* context, context->raw_deadline(), nullptr); } grpc_census_call_set_context(c_call, context->census_context()); - GRPC_TIMER_MARK(GRPC_PTAG_CPP_CALL_CREATED, c_call); context->set_call(c_call, shared_from_this()); return Call(c_call, this, cq); } @@ -87,11 +86,9 @@ void Channel::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) { static const size_t MAX_OPS = 8; size_t nops = 0; grpc_op cops[MAX_OPS]; - GRPC_TIMER_BEGIN(GRPC_PTAG_CPP_PERFORM_OPS, call->call()); ops->FillOps(cops, &nops); GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call->call(), cops, nops, ops, nullptr)); - GRPC_TIMER_END(GRPC_PTAG_CPP_PERFORM_OPS, call->call()); } void* Channel::RegisterMethod(const char* method) { diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index a44e1d2025..f271973506 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -541,6 +541,7 @@ void Server::ScheduleCallback() { void Server::RunRpc() { // Wait for one more incoming rpc. bool ok; + GRPC_TIMER_BEGIN(GRPC_PTAG_SERVER_CALL, 0); auto* mrd = SyncRequest::Wait(&cq_, &ok); if (mrd) { ScheduleCallback(); @@ -556,9 +557,12 @@ void Server::RunRpc() { mrd->TeardownRequest(); } } + GRPC_TIMER_BEGIN(GRPC_PTAG_SERVER_CALLBACK, 0); cd.Run(); + GRPC_TIMER_END(GRPC_PTAG_SERVER_CALLBACK, 0); } } + GRPC_TIMER_END(GRPC_PTAG_SERVER_CALL, 0); { grpc::unique_lock<grpc::mutex> lock(mu_); diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index beef604856..0523371013 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -59,6 +59,8 @@ #include "test/cpp/qps/interarrival.h" #include "test/cpp/qps/timer.h" +#include "src/core/profiling/timers.h" + namespace grpc { namespace testing { @@ -100,8 +102,10 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); double start = Timer::Now(); grpc::ClientContext context; + GRPC_TIMER_BEGIN(GRPC_PTAG_CLIENT_UNARY_CALL, 0); grpc::Status s = stub->UnaryCall(&context, request_, &responses_[thread_idx]); + GRPC_TIMER_END(GRPC_PTAG_CLIENT_UNARY_CALL, 0); histogram->Add((Timer::Now() - start) * 1e9); return s.ok(); } @@ -136,11 +140,14 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient { bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE { WaitToIssue(thread_idx); double start = Timer::Now(); + GRPC_TIMER_BEGIN(GRPC_PTAG_CLIENT_UNARY_CALL, 0); if (stream_[thread_idx]->Write(request_) && stream_[thread_idx]->Read(&responses_[thread_idx])) { + GRPC_TIMER_END(GRPC_PTAG_CLIENT_UNARY_CALL, 0); histogram->Add((Timer::Now() - start) * 1e9); return true; } + GRPC_TIMER_END(GRPC_PTAG_CLIENT_UNARY_CALL, 0); return false; } diff --git a/tools/profile_analyzer/profile_analyzer.py b/tools/profile_analyzer/profile_analyzer.py index 8db0b68f89..259bbb08a3 100755 --- a/tools/profile_analyzer/profile_analyzer.py +++ b/tools/profile_analyzer/profile_analyzer.py @@ -1,7 +1,11 @@ #!/usr/bin/env python2.7 -import json import collections +import hashlib import itertools +import json +import math +import tabulate +import time SELF_TIME = object() @@ -13,98 +17,187 @@ TIME_TO_STACK_END = object() class LineItem(object): - def __init__(self, line, indent): - self.tag = line['tag'] - self.indent = indent - self.time_stamp = line['t'] - self.important = line['type'] == '!' - self.times = {} + def __init__(self, line, indent): + self.tag = line['tag'] + self.indent = indent + self.start_time = line['t'] + self.end_time = None + self.important = line['imp'] + self.times = {} class ScopeBuilder(object): - def __init__(self, call_stack_builder, line): - self.call_stack_builder = call_stack_builder - self.indent = len(call_stack_builder.stk) - self.top_line = LineItem(line, self.indent) - call_stack_builder.lines.append(self.top_line) - self.first_child_pos = len(call_stack_builder.lines) + def __init__(self, call_stack_builder, line): + self.call_stack_builder = call_stack_builder + self.indent = len(call_stack_builder.stk) + self.top_line = LineItem(line, self.indent) + call_stack_builder.lines.append(self.top_line) + self.first_child_pos = len(call_stack_builder.lines) - def mark(self, line): - pass + def mark(self, line): + line_item = LineItem(line, self.indent + 1) + line_item.end_time = line_item.start_time + self.call_stack_builder.lines.append(line_item) - def finish(self, line): - assert line['tag'] == self.top_line.tag - final_time_stamp = line['t'] - assert SELF_TIME not in self.top_line.times - self.top_line.tims[SELF_TIME] = final_time_stamp - self.top_line.time_stamp - for line in self.call_stack_builder.lines[self.first_child_pos:]: - if TIME_FROM_SCOPE_START not in line.times: - line[TIME_FROM_SCOPE_START] = line.time_stamp - self.top_line.time_stamp - line[TIME_TO_SCOPE_END] = final_time_stamp - line.time_stamp + + def finish(self, line): + assert line['tag'] == self.top_line.tag + final_time_stamp = line['t'] + assert self.top_line.end_time is None + self.top_line.end_time = final_time_stamp + assert SELF_TIME not in self.top_line.times + self.top_line.times[SELF_TIME] = final_time_stamp - self.top_line.start_time + for line in self.call_stack_builder.lines[self.first_child_pos:]: + if TIME_FROM_SCOPE_START not in line.times: + line.times[TIME_FROM_SCOPE_START] = line.start_time - self.top_line.start_time + line.times[TIME_TO_SCOPE_END] = final_time_stamp - line.end_time class CallStackBuilder(object): - - def __init__(self): - self.stk = [] - self.signature = '' - self.lines = [] - - def add(self, line): - line_type = line['type'] - self.signature = '%s%s%s' % (self.signature, line_type, line['tag']) - if line_type == '{': - self.stk.append(ScopeBuilder(self, line)) - return False - elif line_type == '}': - self.stk.pop().finish(line) - return not self.stk - elif line_type == '.' or line_type == '!': - self.stk[-1].mark(line, True) - return False - else: - raise Exception('Unknown line type: \'%s\'' % line_type) + def __init__(self): + self.stk = [] + self.signature = hashlib.md5() + self.lines = [] + + def finish(self): + start_time = self.lines[0].start_time + end_time = self.lines[0].end_time + self.signature = self.signature.hexdigest() + for line in self.lines: + line.times[TIME_FROM_STACK_START] = line.start_time - start_time + line.times[TIME_TO_STACK_END] = end_time - line.end_time + + def add(self, line): + line_type = line['type'] + self.signature.update(line_type) + self.signature.update(line['tag']) + if line_type == '{': + self.stk.append(ScopeBuilder(self, line)) + return False + elif line_type == '}': + self.stk.pop().finish(line) + if not self.stk: + self.finish() + return True + return False + elif line_type == '.' or line_type == '!': + self.stk[-1].mark(line) + return False + else: + raise Exception('Unknown line type: \'%s\'' % line_type) -class CallStack(object): - def __init__(self, initial_call_stack_builder): - self.count = 1 - self.signature = initial_call_stack_builder.signature - self.lines = initial_call_stack_builder.lines - for line in lines: - for key, val in line.times.items(): - line.times[key] = [val] - - def add(self, call_stack_builder): - assert self.signature == call_stack_builder.signature - self.count += 1 - assert len(self.lines) == len(call_stack_builder.lines) - for lsum, line in itertools.izip(self.lines, call_stack_builder.lines): - assert lsum.tag == line.tag - assert lsum.times.keys() == line.times.keys() - for k, lst in lsum.times.iterkeys(): - lst.append(line.times[k]) +class CallStack(object): + def __init__(self, initial_call_stack_builder): + self.count = 1 + self.signature = initial_call_stack_builder.signature + self.lines = initial_call_stack_builder.lines + for line in self.lines: + for key, val in line.times.items(): + line.times[key] = [val] + + def add(self, call_stack_builder): + assert self.signature == call_stack_builder.signature + self.count += 1 + assert len(self.lines) == len(call_stack_builder.lines) + for lsum, line in itertools.izip(self.lines, call_stack_builder.lines): + assert lsum.tag == line.tag + assert lsum.times.keys() == line.times.keys() + for k, lst in lsum.times.iteritems(): + lst.append(line.times[k]) + + def finish(self): + for line in self.lines: + for lst in line.times.itervalues(): + lst.sort() builder = collections.defaultdict(CallStackBuilder) call_stacks = collections.defaultdict(CallStack) +print 'Loading...' +lines = 0 +start = time.time() with open('latency_trace.txt') as f: for line in f: + lines += 1 inf = json.loads(line) thd = inf['thd'] cs = builder[thd] if cs.add(inf): - if cs.signature in call_stacks: - call_stacks[cs.signature].add(cs) - else: - call_stacks[cs.signature] = CallStack(cs) - del builder[thd] - + if cs.signature in call_stacks: + call_stacks[cs.signature].add(cs) + else: + call_stacks[cs.signature] = CallStack(cs) + del builder[thd] +time_taken = time.time() - start +print 'Read %d lines in %f seconds (%f lines/sec)' % (lines, time_taken, lines / time_taken) + +print 'Analyzing...' call_stacks = sorted(call_stacks.values(), key=lambda cs: cs.count, reverse=True) +for cs in call_stacks: + cs.finish() + +print 'Writing report...' +def percentile(N, percent, key=lambda x:x): + """ + Find the percentile of a list of values. + + @parameter N - is a list of values. Note N MUST BE already sorted. + @parameter percent - a float value from 0.0 to 1.0. + @parameter key - optional key function to compute value from each element of N. + + @return - the percentile of the values + """ + if not N: + return None + k = (len(N)-1) * percent + f = math.floor(k) + c = math.ceil(k) + if f == c: + return key(N[int(k)]) + d0 = key(N[int(f)]) * (c-k) + d1 = key(N[int(c)]) * (k-f) + return d0+d1 + +def tidy_tag(tag): + if tag[0:10] == 'GRPC_PTAG_': + return tag[10:] + return tag + +def time_string(values): + num_values = len(values) + return '%.1f/%.1f/%.1f' % ( + 1e6 * percentile(values, 0.5), + 1e6 * percentile(values, 0.9), + 1e6 * percentile(values, 0.99)) + +def time_format(idx): + def ent(line, idx=idx): + if idx in line.times: + return time_string(line.times[idx]) + return '' + return ent + +FORMAT = [ + ('TAG', lambda line: '..'*line.indent + tidy_tag(line.tag)), + ('FROM_STACK_START', time_format(TIME_FROM_STACK_START)), + ('SELF', time_format(SELF_TIME)), + ('TO_STACK_END', time_format(TIME_TO_STACK_END)), + ('FROM_SCOPE_START', time_format(TIME_FROM_SCOPE_START)), + ('SELF', time_format(SELF_TIME)), + ('TO_SCOPE_END', time_format(TIME_TO_SCOPE_END)), +] for cs in call_stacks: - print cs.signature - print cs.count + print cs.count + header, _ = zip(*FORMAT) + table = [] + for line in cs.lines: + fields = [] + for _, fn in FORMAT: + fields.append(fn(line)) + table.append(fields) + print tabulate.tabulate(table, header, tablefmt="simple") -- GitLab