Skip to content
Snippets Groups Projects
Commit 61ead3e0 authored by Craig Tiller's avatar Craig Tiller
Browse files

Lower latency profiling

Current latency profiles have their tails dominated by writing latency
logs, which is hugely undesirable.

Now when a thread log fills up, push it to a background thread to write
to disk. At shutdown, wait for all latency traces to be flushed.
parent 4b65b1de
No related branches found
No related tags found
No related merge requests found
......@@ -38,6 +38,7 @@ cache.mk
# Temporary test reports
report.xml
latency_trace.txt
latency_trace.*.txt
# port server log
portlog.txt
......
......@@ -53,50 +53,186 @@ typedef struct gpr_timer_entry {
short line;
char type;
gpr_uint8 important;
int thd;
} gpr_timer_entry;
#define MAX_COUNT (5 * 1024 * 1024 / sizeof(gpr_timer_entry))
#define MAX_COUNT 1000000
static __thread gpr_timer_entry g_log[MAX_COUNT];
static __thread int g_count;
typedef struct gpr_timer_log {
size_t num_entries;
struct gpr_timer_log *next;
struct gpr_timer_log *prev;
gpr_timer_entry log[MAX_COUNT];
} gpr_timer_log;
typedef struct gpr_timer_log_list {
gpr_timer_log *head;
/* valid iff head!=NULL */
gpr_timer_log *tail;
} gpr_timer_log_list;
static __thread gpr_timer_log *g_thread_log;
static gpr_once g_once_init = GPR_ONCE_INIT;
static FILE *output_file;
static const char *output_filename = "latency_trace.txt";
static pthread_mutex_t g_mu;
static pthread_cond_t g_cv;
static gpr_timer_log_list g_in_progress_logs;
static gpr_timer_log_list g_done_logs;
static int g_shutdown;
static gpr_thd_id g_writing_thread;
static __thread int g_thread_id;
static int g_next_thread_id;
static void close_output() { fclose(output_file); }
static int timer_log_push_back(gpr_timer_log_list *list, gpr_timer_log *log) {
if (list->head == NULL) {
list->head = list->tail = log;
log->next = log->prev = NULL;
return 1;
} else {
log->prev = list->tail;
log->next = NULL;
list->tail->next = log;
list->tail = log;
return 0;
}
}
static void init_output() {
output_file = fopen("latency_trace.txt", "w");
GPR_ASSERT(output_file);
atexit(close_output);
static gpr_timer_log *timer_log_pop_front(gpr_timer_log_list *list) {
gpr_timer_log *out = list->head;
if (out != NULL) {
list->head = out->next;
if (list->head != NULL) {
list->head->prev = NULL;
} else {
list->tail = NULL;
}
}
return out;
}
static void log_report() {
int i;
gpr_once_init(&g_once_init, init_output);
for (i = 0; i < g_count; i++) {
gpr_timer_entry *entry = &(g_log[i]);
static void timer_log_remove(gpr_timer_log_list *list, gpr_timer_log *log) {
if (log->prev == NULL) {
list->head = log->next;
if (list->head != NULL) {
list->head->prev = NULL;
}
} else {
log->prev->next = log->next;
}
if (log->next == NULL) {
list->tail = log->prev;
if (list->tail != NULL) {
list->tail->next = NULL;
}
} else {
log->next->prev = log->prev;
}
}
static void write_log(gpr_timer_log *log) {
size_t i;
if (output_file == NULL) {
output_file = fopen(output_filename, "w");
}
for (i = 0; i < log->num_entries; i++) {
gpr_timer_entry *entry = &(log->log[i]);
if (gpr_time_cmp(entry->tm, gpr_time_0(entry->tm.clock_type)) < 0) {
entry->tm = gpr_time_0(entry->tm.clock_type);
}
fprintf(output_file,
"{\"t\": %ld.%09d, \"thd\": \"%p\", \"type\": \"%c\", \"tag\": "
"{\"t\": %ld.%09d, \"thd\": \"%d\", \"type\": \"%c\", \"tag\": "
"\"%s\", \"file\": \"%s\", \"line\": %d, \"imp\": %d}\n",
entry->tm.tv_sec, entry->tm.tv_nsec,
(void *)(gpr_intptr)gpr_thd_currentid(), entry->type, entry->tagstr,
entry->file, entry->line, entry->important);
entry->tm.tv_sec, entry->tm.tv_nsec, entry->thd, entry->type,
entry->tagstr, entry->file, entry->line, entry->important);
}
}
static void writing_thread(void *unused) {
gpr_timer_log *log;
pthread_mutex_lock(&g_mu);
for (;;) {
while ((log = timer_log_pop_front(&g_done_logs)) == NULL && !g_shutdown) {
pthread_cond_wait(&g_cv, &g_mu);
}
if (log != NULL) {
pthread_mutex_unlock(&g_mu);
write_log(log);
free(log);
pthread_mutex_lock(&g_mu);
}
if (g_shutdown) {
pthread_mutex_unlock(&g_mu);
return;
}
}
}
/* Now clear out the log */
g_count = 0;
static void flush_logs(gpr_timer_log_list *list) {
gpr_timer_log *log;
while ((log = timer_log_pop_front(list)) != NULL) {
write_log(log);
free(log);
}
}
static void finish_writing() {
pthread_mutex_lock(&g_mu);
g_shutdown = 1;
pthread_cond_signal(&g_cv);
pthread_mutex_unlock(&g_mu);
gpr_thd_join(g_writing_thread);
gpr_log(GPR_INFO, "flushing logs");
pthread_mutex_lock(&g_mu);
flush_logs(&g_done_logs);
flush_logs(&g_in_progress_logs);
pthread_mutex_unlock(&g_mu);
if (output_file) {
fclose(output_file);
}
}
void gpr_timers_set_log_filename(const char *filename) {
output_filename = filename;
}
static void init_output() {
gpr_thd_options options = gpr_thd_options_default();
gpr_thd_options_set_joinable(&options);
gpr_thd_new(&g_writing_thread, writing_thread, NULL, &options);
atexit(finish_writing);
}
static void rotate_log() {
gpr_timer_log *new = malloc(sizeof(*new));
gpr_once_init(&g_once_init, init_output);
new->num_entries = 0;
pthread_mutex_lock(&g_mu);
if (g_thread_log != NULL) {
timer_log_remove(&g_in_progress_logs, g_thread_log);
if (timer_log_push_back(&g_done_logs, g_thread_log)) {
pthread_cond_signal(&g_cv);
}
} else {
g_thread_id = g_next_thread_id++;
}
timer_log_push_back(&g_in_progress_logs, new);
pthread_mutex_unlock(&g_mu);
g_thread_log = new;
}
static void gpr_timers_log_add(const char *tagstr, marker_type type,
int important, const char *file, int line) {
gpr_timer_entry *entry;
/* TODO (vpai) : Improve concurrency */
if (g_count == MAX_COUNT) {
log_report();
if (g_thread_log == NULL || g_thread_log->num_entries == MAX_COUNT) {
rotate_log();
}
entry = &g_log[g_count++];
entry = &g_thread_log->log[g_thread_log->num_entries++];
entry->tm = gpr_now(GPR_CLOCK_PRECISE);
entry->tagstr = tagstr;
......@@ -104,6 +240,7 @@ static void gpr_timers_log_add(const char *tagstr, marker_type type,
entry->file = file;
entry->line = (short)line;
entry->important = important != 0;
entry->thd = g_thread_id;
}
/* Latency profiler API implementation. */
......@@ -131,4 +268,6 @@ void gpr_timers_global_destroy(void) {}
void gpr_timers_global_init(void) {}
void gpr_timers_global_destroy(void) {}
void gpr_timers_set_log_filename(const char *filename) {}
#endif /* GRPC_BASIC_PROFILER */
......@@ -48,6 +48,8 @@ void gpr_timer_begin(const char *tagstr, int important, const char *file,
void gpr_timer_end(const char *tagstr, int important, const char *file,
int line);
void gpr_timers_set_log_filename(const char *filename);
#if !(defined(GRPC_STAP_PROFILER) + defined(GRPC_BASIC_PROFILER))
/* No profiling. No-op all the things. */
#define GPR_TIMER_MARK(tag, important) \
......
......@@ -53,7 +53,7 @@ struct thd_arg {
/* Body of every thread started via gpr_thd_new. */
static void *thread_body(void *v) {
struct thd_arg a = *(struct thd_arg *)v;
gpr_free(v);
free(v);
(*a.body)(a.arg);
return NULL;
}
......@@ -63,7 +63,10 @@ int gpr_thd_new(gpr_thd_id *t, void (*thd_body)(void *arg), void *arg,
int thread_started;
pthread_attr_t attr;
pthread_t p;
struct thd_arg *a = gpr_malloc(sizeof(*a));
/* don't use gpr_malloc as we may cause an infinite recursion with
* the profiling code */
struct thd_arg *a = malloc(sizeof(*a));
GPR_ASSERT(a != NULL);
a->body = thd_body;
a->arg = arg;
......@@ -78,7 +81,7 @@ int gpr_thd_new(gpr_thd_id *t, void (*thd_body)(void *arg), void *arg,
thread_started = (pthread_create(&p, &attr, &thread_body, a) == 0);
GPR_ASSERT(pthread_attr_destroy(&attr) == 0);
if (!thread_started) {
gpr_free(a);
free(a);
}
*t = (gpr_thd_id)p;
return thread_started;
......
......@@ -315,6 +315,8 @@ static void finalize_outbuf(grpc_exec_ctx *exec_ctx,
grpc_chttp2_list_add_written_stream(transport_writing, stream_writing);
}
}
GPR_TIMER_END("finalize_outbuf", 0);
}
void grpc_chttp2_cleanup_writing(
......
......@@ -41,6 +41,7 @@
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include <grpc/support/useful.h>
#include "src/core/profiling/timers.h"
#include "test/core/util/grpc_profiler.h"
#include "test/core/util/test_config.h"
......@@ -89,6 +90,7 @@ static void init_ping_pong_request(void) {
}
static void step_ping_pong_request(void) {
GPR_TIMER_BEGIN("ping_pong", 1);
call = grpc_channel_create_call(channel, NULL, GRPC_PROPAGATE_DEFAULTS, cq,
"/Reflector/reflectUnary", "localhost",
gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
......@@ -99,6 +101,7 @@ static void step_ping_pong_request(void) {
grpc_call_destroy(call);
grpc_byte_buffer_destroy(response_payload_recv);
call = NULL;
GPR_TIMER_END("ping_pong", 1);
}
static void init_ping_pong_stream(void) {
......@@ -122,10 +125,12 @@ static void init_ping_pong_stream(void) {
static void step_ping_pong_stream(void) {
grpc_call_error error;
GPR_TIMER_BEGIN("ping_pong", 1);
error = grpc_call_start_batch(call, stream_step_ops, 2, (void *)1, NULL);
GPR_ASSERT(GRPC_CALL_OK == error);
grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
grpc_byte_buffer_destroy(response_payload_recv);
GPR_TIMER_END("ping_pong", 1);
}
static double now(void) {
......@@ -159,12 +164,14 @@ int main(int argc, char **argv) {
char *scenario_name = "ping-pong-request";
scenario sc = {NULL, NULL, NULL};
gpr_timers_set_log_filename("latency_trace.fling_client.txt");
grpc_init();
GPR_ASSERT(argc >= 1);
fake_argv[0] = argv[0];
grpc_test_init(1, fake_argv);
grpc_init();
cl = gpr_cmdline_create("fling client");
gpr_cmdline_add_int(cl, "payload_size", "Size of the payload to send",
&payload_size);
......
......@@ -44,15 +44,16 @@
#include <unistd.h>
#endif
#include "test/core/util/grpc_profiler.h"
#include "test/core/util/test_config.h"
#include <grpc/support/alloc.h>
#include <grpc/support/cmdline.h>
#include <grpc/support/host_port.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include "src/core/profiling/timers.h"
#include "test/core/util/port.h"
#include "test/core/end2end/data/ssl_test_data.h"
#include "test/core/util/grpc_profiler.h"
#include "test/core/util/test_config.h"
static grpc_completion_queue *cq;
static grpc_server *server;
......@@ -192,6 +193,8 @@ int main(int argc, char **argv) {
char *fake_argv[1];
gpr_timers_set_log_filename("latency_trace.fling_server.txt");
GPR_ASSERT(argc >= 1);
fake_argv[0] = argv[0];
grpc_test_init(1, fake_argv);
......
......@@ -49,7 +49,7 @@ class ScopeBuilder(object):
self.call_stack_builder.lines.append(line_item)
def finish(self, line):
assert line['tag'] == self.top_line.tag, 'expected %s, got %s' % (self.top_line.tag, line['tag'])
assert line['tag'] == self.top_line.tag, 'expected %s, got %s; thread=%s; t0=%f t1=%f' % (self.top_line.tag, line['tag'], line['thd'], self.top_line.start_time, line['t'])
final_time_stamp = line['t']
assert self.top_line.end_time is None
self.top_line.end_time = final_time_stamp
......@@ -84,6 +84,7 @@ class CallStackBuilder(object):
self.stk.append(ScopeBuilder(self, line))
return False
elif line_type == '}':
assert self.stk, 'expected non-empty stack for closing %s; thread=%s; t=%f' % (line['tag'], line['thd'], line['t'])
self.stk.pop().finish(line)
if not self.stk:
self.finish()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment