From 61ead3e061f685f87e284bf41f7ed1cb44f347b4 Mon Sep 17 00:00:00 2001
From: Craig Tiller <ctiller@google.com>
Date: Tue, 3 Nov 2015 11:03:48 -0800
Subject: [PATCH] Lower latency profiling

Current latency profiles have their tails dominated by writing latency
logs, which is hugely undesirable.

Now when a thread log fills up, push it to a background thread to write
to disk. At shutdown, wait for all latency traces to be flushed.
---
 .gitignore                                    |   1 +
 src/core/profiling/basic_timers.c             | 185 +++++++++++++++---
 src/core/profiling/timers.h                   |   2 +
 src/core/support/thd_posix.c                  |   9 +-
 src/core/transport/chttp2/writing.c           |   2 +
 test/core/fling/client.c                      |  11 +-
 test/core/fling/server.c                      |   7 +-
 .../latency_profile/profile_analyzer.py       |   3 +-
 8 files changed, 189 insertions(+), 31 deletions(-)

diff --git a/.gitignore b/.gitignore
index 40f4c4734c..f059eca239 100644
--- a/.gitignore
+++ b/.gitignore
@@ -38,6 +38,7 @@ cache.mk
 # Temporary test reports
 report.xml
 latency_trace.txt
+latency_trace.*.txt
 
 # port server log
 portlog.txt
diff --git a/src/core/profiling/basic_timers.c b/src/core/profiling/basic_timers.c
index 527a160101..f0fce7858d 100644
--- a/src/core/profiling/basic_timers.c
+++ b/src/core/profiling/basic_timers.c
@@ -53,50 +53,186 @@ typedef struct gpr_timer_entry {
   short line;
   char type;
   gpr_uint8 important;
+  int thd;
 } gpr_timer_entry;
 
-#define MAX_COUNT (5 * 1024 * 1024 / sizeof(gpr_timer_entry))
+#define MAX_COUNT 1000000
 
-static __thread gpr_timer_entry g_log[MAX_COUNT];
-static __thread int g_count;
+typedef struct gpr_timer_log {
+  size_t num_entries;
+  struct gpr_timer_log *next;
+  struct gpr_timer_log *prev;
+  gpr_timer_entry log[MAX_COUNT];
+} gpr_timer_log;
+
+typedef struct gpr_timer_log_list {
+  gpr_timer_log *head;
+  /* valid iff head!=NULL */
+  gpr_timer_log *tail;
+} gpr_timer_log_list;
+
+static __thread gpr_timer_log *g_thread_log;
 static gpr_once g_once_init = GPR_ONCE_INIT;
 static FILE *output_file;
+static const char *output_filename = "latency_trace.txt";
+static pthread_mutex_t g_mu;
+static pthread_cond_t g_cv;
+static gpr_timer_log_list g_in_progress_logs;
+static gpr_timer_log_list g_done_logs;
+static int g_shutdown;
+static gpr_thd_id g_writing_thread;
+static __thread int g_thread_id;
+static int g_next_thread_id;
 
-static void close_output() { fclose(output_file); }
+static int timer_log_push_back(gpr_timer_log_list *list, gpr_timer_log *log) {
+  if (list->head == NULL) {
+    list->head = list->tail = log;
+    log->next = log->prev = NULL;
+    return 1;
+  } else {
+    log->prev = list->tail;
+    log->next = NULL;
+    list->tail->next = log;
+    list->tail = log;
+    return 0;
+  }
+}
 
-static void init_output() {
-  output_file = fopen("latency_trace.txt", "w");
-  GPR_ASSERT(output_file);
-  atexit(close_output);
+static gpr_timer_log *timer_log_pop_front(gpr_timer_log_list *list) {
+  gpr_timer_log *out = list->head;
+  if (out != NULL) {
+    list->head = out->next;
+    if (list->head != NULL) {
+      list->head->prev = NULL;
+    } else {
+      list->tail = NULL;
+    }
+  }
+  return out;
 }
 
-static void log_report() {
-  int i;
-  gpr_once_init(&g_once_init, init_output);
-  for (i = 0; i < g_count; i++) {
-    gpr_timer_entry *entry = &(g_log[i]);
+static void timer_log_remove(gpr_timer_log_list *list, gpr_timer_log *log) {
+  if (log->prev == NULL) {
+    list->head = log->next;
+    if (list->head != NULL) {
+      list->head->prev = NULL;
+    }
+  } else {
+    log->prev->next = log->next;
+  }
+  if (log->next == NULL) {
+    list->tail = log->prev;
+    if (list->tail != NULL) {
+      list->tail->next = NULL;
+    }
+  } else {
+    log->next->prev = log->prev;
+  }
+}
+
+static void write_log(gpr_timer_log *log) {
+  size_t i;
+  if (output_file == NULL) {
+    output_file = fopen(output_filename, "w");
+  }
+  for (i = 0; i < log->num_entries; i++) {
+    gpr_timer_entry *entry = &(log->log[i]);
+    if (gpr_time_cmp(entry->tm, gpr_time_0(entry->tm.clock_type)) < 0) {
+      entry->tm = gpr_time_0(entry->tm.clock_type);
+    }
     fprintf(output_file,
-            "{\"t\": %ld.%09d, \"thd\": \"%p\", \"type\": \"%c\", \"tag\": "
+            "{\"t\": %ld.%09d, \"thd\": \"%d\", \"type\": \"%c\", \"tag\": "
             "\"%s\", \"file\": \"%s\", \"line\": %d, \"imp\": %d}\n",
-            entry->tm.tv_sec, entry->tm.tv_nsec,
-            (void *)(gpr_intptr)gpr_thd_currentid(), entry->type, entry->tagstr,
-            entry->file, entry->line, entry->important);
+            entry->tm.tv_sec, entry->tm.tv_nsec, entry->thd, entry->type,
+            entry->tagstr, entry->file, entry->line, entry->important);
+  }
+}
+
+static void writing_thread(void *unused) {
+  gpr_timer_log *log;
+  pthread_mutex_lock(&g_mu);
+  for (;;) {
+    while ((log = timer_log_pop_front(&g_done_logs)) == NULL && !g_shutdown) {
+      pthread_cond_wait(&g_cv, &g_mu);
+    }
+    if (log != NULL) {
+      pthread_mutex_unlock(&g_mu);
+      write_log(log);
+      free(log);
+      pthread_mutex_lock(&g_mu);
+    }
+    if (g_shutdown) {
+      pthread_mutex_unlock(&g_mu);
+      return;
+    }
   }
+}
 
-  /* Now clear out the log */
-  g_count = 0;
+static void flush_logs(gpr_timer_log_list *list) {
+  gpr_timer_log *log;
+  while ((log = timer_log_pop_front(list)) != NULL) {
+    write_log(log);
+    free(log);
+  }
+}
+
+static void finish_writing() {
+  pthread_mutex_lock(&g_mu);
+  g_shutdown = 1;
+  pthread_cond_signal(&g_cv);
+  pthread_mutex_unlock(&g_mu);
+  gpr_thd_join(g_writing_thread);
+
+  gpr_log(GPR_INFO, "flushing logs");
+
+  pthread_mutex_lock(&g_mu);
+  flush_logs(&g_done_logs);
+  flush_logs(&g_in_progress_logs);
+  pthread_mutex_unlock(&g_mu);
+
+  if (output_file) {
+    fclose(output_file);
+  }
+}
+
+void gpr_timers_set_log_filename(const char *filename) {
+  output_filename = filename;
+}
+
+static void init_output() {
+  gpr_thd_options options = gpr_thd_options_default();
+  gpr_thd_options_set_joinable(&options);
+  gpr_thd_new(&g_writing_thread, writing_thread, NULL, &options);
+  atexit(finish_writing);
+}
+
+static void rotate_log() {
+  gpr_timer_log *new = malloc(sizeof(*new));
+  gpr_once_init(&g_once_init, init_output);
+  new->num_entries = 0;
+  pthread_mutex_lock(&g_mu);
+  if (g_thread_log != NULL) {
+    timer_log_remove(&g_in_progress_logs, g_thread_log);
+    if (timer_log_push_back(&g_done_logs, g_thread_log)) {
+      pthread_cond_signal(&g_cv);
+    }
+  } else {
+    g_thread_id = g_next_thread_id++;
+  }
+  timer_log_push_back(&g_in_progress_logs, new);
+  pthread_mutex_unlock(&g_mu);
+  g_thread_log = new;
 }
 
 static void gpr_timers_log_add(const char *tagstr, marker_type type,
                                int important, const char *file, int line) {
   gpr_timer_entry *entry;
 
-  /* TODO (vpai) : Improve concurrency */
-  if (g_count == MAX_COUNT) {
-    log_report();
+  if (g_thread_log == NULL || g_thread_log->num_entries == MAX_COUNT) {
+    rotate_log();
   }
 
-  entry = &g_log[g_count++];
+  entry = &g_thread_log->log[g_thread_log->num_entries++];
 
   entry->tm = gpr_now(GPR_CLOCK_PRECISE);
   entry->tagstr = tagstr;
@@ -104,6 +240,7 @@ static void gpr_timers_log_add(const char *tagstr, marker_type type,
   entry->file = file;
   entry->line = (short)line;
   entry->important = important != 0;
+  entry->thd = g_thread_id;
 }
 
 /* Latency profiler API implementation. */
@@ -131,4 +268,6 @@ void gpr_timers_global_destroy(void) {}
 void gpr_timers_global_init(void) {}
 
 void gpr_timers_global_destroy(void) {}
+
+void gpr_timers_set_log_filename(const char *filename) {}
 #endif /* GRPC_BASIC_PROFILER */
diff --git a/src/core/profiling/timers.h b/src/core/profiling/timers.h
index 0d112e7248..6a188dc566 100644
--- a/src/core/profiling/timers.h
+++ b/src/core/profiling/timers.h
@@ -48,6 +48,8 @@ void gpr_timer_begin(const char *tagstr, int important, const char *file,
 void gpr_timer_end(const char *tagstr, int important, const char *file,
                    int line);
 
+void gpr_timers_set_log_filename(const char *filename);
+
 #if !(defined(GRPC_STAP_PROFILER) + defined(GRPC_BASIC_PROFILER))
 /* No profiling. No-op all the things. */
 #define GPR_TIMER_MARK(tag, important) \
diff --git a/src/core/support/thd_posix.c b/src/core/support/thd_posix.c
index c36d94d044..653a1c88c1 100644
--- a/src/core/support/thd_posix.c
+++ b/src/core/support/thd_posix.c
@@ -53,7 +53,7 @@ struct thd_arg {
 /* Body of every thread started via gpr_thd_new. */
 static void *thread_body(void *v) {
   struct thd_arg a = *(struct thd_arg *)v;
-  gpr_free(v);
+  free(v);
   (*a.body)(a.arg);
   return NULL;
 }
@@ -63,7 +63,10 @@ int gpr_thd_new(gpr_thd_id *t, void (*thd_body)(void *arg), void *arg,
   int thread_started;
   pthread_attr_t attr;
   pthread_t p;
-  struct thd_arg *a = gpr_malloc(sizeof(*a));
+  /* don't use gpr_malloc as we may cause an infinite recursion with
+   * the profiling code */
+  struct thd_arg *a = malloc(sizeof(*a));
+  GPR_ASSERT(a != NULL);
   a->body = thd_body;
   a->arg = arg;
 
@@ -78,7 +81,7 @@ int gpr_thd_new(gpr_thd_id *t, void (*thd_body)(void *arg), void *arg,
   thread_started = (pthread_create(&p, &attr, &thread_body, a) == 0);
   GPR_ASSERT(pthread_attr_destroy(&attr) == 0);
   if (!thread_started) {
-    gpr_free(a);
+    free(a);
   }
   *t = (gpr_thd_id)p;
   return thread_started;
diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c
index 353e476e40..7fe5649e5e 100644
--- a/src/core/transport/chttp2/writing.c
+++ b/src/core/transport/chttp2/writing.c
@@ -315,6 +315,8 @@ static void finalize_outbuf(grpc_exec_ctx *exec_ctx,
       grpc_chttp2_list_add_written_stream(transport_writing, stream_writing);
     }
   }
+
+  GPR_TIMER_END("finalize_outbuf", 0);
 }
 
 void grpc_chttp2_cleanup_writing(
diff --git a/test/core/fling/client.c b/test/core/fling/client.c
index a53411c2f5..99b30d6c4a 100644
--- a/test/core/fling/client.c
+++ b/test/core/fling/client.c
@@ -41,6 +41,7 @@
 #include <grpc/support/log.h>
 #include <grpc/support/time.h>
 #include <grpc/support/useful.h>
+#include "src/core/profiling/timers.h"
 #include "test/core/util/grpc_profiler.h"
 #include "test/core/util/test_config.h"
 
@@ -89,6 +90,7 @@ static void init_ping_pong_request(void) {
 }
 
 static void step_ping_pong_request(void) {
+  GPR_TIMER_BEGIN("ping_pong", 1);
   call = grpc_channel_create_call(channel, NULL, GRPC_PROPAGATE_DEFAULTS, cq,
                                   "/Reflector/reflectUnary", "localhost",
                                   gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
@@ -99,6 +101,7 @@ static void step_ping_pong_request(void) {
   grpc_call_destroy(call);
   grpc_byte_buffer_destroy(response_payload_recv);
   call = NULL;
+  GPR_TIMER_END("ping_pong", 1);
 }
 
 static void init_ping_pong_stream(void) {
@@ -122,10 +125,12 @@ static void init_ping_pong_stream(void) {
 
 static void step_ping_pong_stream(void) {
   grpc_call_error error;
+  GPR_TIMER_BEGIN("ping_pong", 1);
   error = grpc_call_start_batch(call, stream_step_ops, 2, (void *)1, NULL);
   GPR_ASSERT(GRPC_CALL_OK == error);
   grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
   grpc_byte_buffer_destroy(response_payload_recv);
+  GPR_TIMER_END("ping_pong", 1);
 }
 
 static double now(void) {
@@ -159,12 +164,14 @@ int main(int argc, char **argv) {
   char *scenario_name = "ping-pong-request";
   scenario sc = {NULL, NULL, NULL};
 
+  gpr_timers_set_log_filename("latency_trace.fling_client.txt");
+
+  grpc_init();
+
   GPR_ASSERT(argc >= 1);
   fake_argv[0] = argv[0];
   grpc_test_init(1, fake_argv);
 
-  grpc_init();
-
   cl = gpr_cmdline_create("fling client");
   gpr_cmdline_add_int(cl, "payload_size", "Size of the payload to send",
                       &payload_size);
diff --git a/test/core/fling/server.c b/test/core/fling/server.c
index 67631e5a07..14a1a815bc 100644
--- a/test/core/fling/server.c
+++ b/test/core/fling/server.c
@@ -44,15 +44,16 @@
 #include <unistd.h>
 #endif
 
-#include "test/core/util/grpc_profiler.h"
-#include "test/core/util/test_config.h"
 #include <grpc/support/alloc.h>
 #include <grpc/support/cmdline.h>
 #include <grpc/support/host_port.h>
 #include <grpc/support/log.h>
 #include <grpc/support/time.h>
+#include "src/core/profiling/timers.h"
 #include "test/core/util/port.h"
 #include "test/core/end2end/data/ssl_test_data.h"
+#include "test/core/util/grpc_profiler.h"
+#include "test/core/util/test_config.h"
 
 static grpc_completion_queue *cq;
 static grpc_server *server;
@@ -192,6 +193,8 @@ int main(int argc, char **argv) {
 
   char *fake_argv[1];
 
+  gpr_timers_set_log_filename("latency_trace.fling_server.txt");
+
   GPR_ASSERT(argc >= 1);
   fake_argv[0] = argv[0];
   grpc_test_init(1, fake_argv);
diff --git a/tools/profiling/latency_profile/profile_analyzer.py b/tools/profiling/latency_profile/profile_analyzer.py
index b12e1074e5..b2a38ea60a 100755
--- a/tools/profiling/latency_profile/profile_analyzer.py
+++ b/tools/profiling/latency_profile/profile_analyzer.py
@@ -49,7 +49,7 @@ class ScopeBuilder(object):
     self.call_stack_builder.lines.append(line_item)
 
   def finish(self, line):
-    assert line['tag'] == self.top_line.tag, 'expected %s, got %s' % (self.top_line.tag, line['tag'])
+    assert line['tag'] == self.top_line.tag, 'expected %s, got %s; thread=%s; t0=%f t1=%f' % (self.top_line.tag, line['tag'], line['thd'], self.top_line.start_time, line['t'])
     final_time_stamp = line['t']
     assert self.top_line.end_time is None
     self.top_line.end_time = final_time_stamp
@@ -84,6 +84,7 @@ class CallStackBuilder(object):
       self.stk.append(ScopeBuilder(self, line))
       return False
     elif line_type == '}':
+      assert self.stk, 'expected non-empty stack for closing %s; thread=%s; t=%f' % (line['tag'], line['thd'], line['t'])
       self.stk.pop().finish(line)
       if not self.stk:
         self.finish()
-- 
GitLab