diff --git a/src/core/lib/profiling/basic_timers.c b/src/core/lib/profiling/basic_timers.c index bdf9af2339cc2c2dad11263036793254bf9b543e..c4f2772822763f0d19f41a5917aa9e3c216fa485 100644 --- a/src/core/lib/profiling/basic_timers.c +++ b/src/core/lib/profiling/basic_timers.c @@ -44,6 +44,8 @@ #include <grpc/support/time.h> #include <stdio.h> +#include "src/core/lib/support/env.h" + typedef enum { BEGIN = '{', END = '}', MARK = '.' } marker_type; typedef struct gpr_timer_entry { @@ -74,7 +76,7 @@ typedef struct gpr_timer_log_list { static __thread gpr_timer_log *g_thread_log; static gpr_once g_once_init = GPR_ONCE_INIT; static FILE *output_file; -static const char *output_filename = "latency_trace.txt"; +static const char *output_filename_or_null = NULL; static pthread_mutex_t g_mu; static pthread_cond_t g_cv; static gpr_timer_log_list g_in_progress_logs; @@ -85,6 +87,16 @@ static __thread int g_thread_id; static int g_next_thread_id; static int g_writing_enabled = 1; +static const char *output_filename() { + if (output_filename_or_null == NULL) { + output_filename_or_null = gpr_getenv("LATENCY_TRACE"); + if (output_filename_or_null == NULL) { + output_filename_or_null = "latency_trace.txt"; + } + } + return output_filename_or_null; +} + static int timer_log_push_back(gpr_timer_log_list *list, gpr_timer_log *log) { if (list->head == NULL) { list->head = list->tail = log; @@ -134,7 +146,7 @@ static void timer_log_remove(gpr_timer_log_list *list, gpr_timer_log *log) { static void write_log(gpr_timer_log *log) { size_t i; if (output_file == NULL) { - output_file = fopen(output_filename, "w"); + output_file = fopen(output_filename(), "w"); } for (i = 0; i < log->num_entries; i++) { gpr_timer_entry *entry = &(log->log[i]); @@ -198,7 +210,7 @@ static void finish_writing(void) { } void gpr_timers_set_log_filename(const char *filename) { - output_filename = filename; + output_filename_or_null = filename; } static void init_output() { diff --git a/tools/profiling/latency_profile/profile_analyzer.py b/tools/profiling/latency_profile/profile_analyzer.py index 48b8e9b950f1e18862817518f25b143e5ed75a3b..2087cd2793e63bf8a99011fbdd713949b20860ea 100755 --- a/tools/profiling/latency_profile/profile_analyzer.py +++ b/tools/profiling/latency_profile/profile_analyzer.py @@ -34,6 +34,7 @@ import hashlib import itertools import json import math +import sys import tabulate import time @@ -49,6 +50,7 @@ TIME_FROM_LAST_IMPORTANT = object() argp = argparse.ArgumentParser(description='Process output of basic_prof builds') argp.add_argument('--source', default='latency_trace.txt', type=str) argp.add_argument('--fmt', choices=tabulate.tabulate_formats, default='simple') +argp.add_argument('--out', default='-', type=str) args = argp.parse_args() class LineItem(object): @@ -246,16 +248,20 @@ FORMAT = [ ('TO_SCOPE_END', time_format(TIME_TO_SCOPE_END)), ] +out = sys.stdout +if args.out != '-': + out = open(args.out, 'w') + if args.fmt == 'html': - print '<html>' - print '<head>' - print '<title>Profile Report</title>' - print '</head>' + print >>out, '<html>' + print >>out, '<head>' + print >>out, '<title>Profile Report</title>' + print >>out, '</head>' accounted_for = 0 for cs in call_stacks: if args.fmt in BANNER: - print BANNER[args.fmt] % { + print >>out, BANNER[args.fmt] % { 'count': cs.count, } header, _ = zip(*FORMAT) @@ -265,7 +271,7 @@ for cs in call_stacks: for _, fn in FORMAT: fields.append(fn(line)) table.append(fields) - print tabulate.tabulate(table, header, tablefmt=args.fmt) + print >>out, tabulate.tabulate(table, header, tablefmt=args.fmt) accounted_for += cs.count if accounted_for > .99 * total_stacks: break diff --git a/tools/run_tests/python_utils/start_port_server.py b/tools/run_tests/python_utils/start_port_server.py new file mode 100644 index 0000000000000000000000000000000000000000..4103eb0534babe65aaf8788c75331b350e287047 --- /dev/null +++ b/tools/run_tests/python_utils/start_port_server.py @@ -0,0 +1,128 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +from __future__ import print_function + +from six.moves import urllib +import os +import subprocess +import tempfile +import sys +import time + +def start_port_server(port_server_port): + # check if a compatible port server is running + # if incompatible (version mismatch) ==> start a new one + # if not running ==> start a new one + # otherwise, leave it up + try: + version = int(urllib.request.urlopen( + 'http://localhost:%d/version_number' % port_server_port, + timeout=10).read()) + print('detected port server running version %d' % version) + running = True + except Exception as e: + print('failed to detect port server: %s' % sys.exc_info()[0]) + print(e.strerror) + running = False + if running: + current_version = int(subprocess.check_output( + [sys.executable, os.path.abspath('tools/run_tests/python_utils/port_server.py'), + 'dump_version'])) + print('my port server is version %d' % current_version) + running = (version >= current_version) + if not running: + print('port_server version mismatch: killing the old one') + urllib.request.urlopen('http://localhost:%d/quitquitquit' % port_server_port).read() + time.sleep(1) + if not running: + fd, logfile = tempfile.mkstemp() + os.close(fd) + print('starting port_server, with log file %s' % logfile) + args = [sys.executable, os.path.abspath('tools/run_tests/python_utils/port_server.py'), + '-p', '%d' % port_server_port, '-l', logfile] + env = dict(os.environ) + env['BUILD_ID'] = 'pleaseDontKillMeJenkins' + if platform_string() == 'windows': + # Working directory of port server needs to be outside of Jenkins + # workspace to prevent file lock issues. + tempdir = tempfile.mkdtemp() + port_server = subprocess.Popen( + args, + env=env, + cwd=tempdir, + creationflags = 0x00000008, # detached process + close_fds=True) + else: + port_server = subprocess.Popen( + args, + env=env, + preexec_fn=os.setsid, + close_fds=True) + time.sleep(1) + # ensure port server is up + waits = 0 + while True: + if waits > 10: + print('killing port server due to excessive start up waits') + port_server.kill() + if port_server.poll() is not None: + print('port_server failed to start') + # try one final time: maybe another build managed to start one + time.sleep(1) + try: + urllib.request.urlopen('http://localhost:%d/get' % port_server_port, + timeout=1).read() + print('last ditch attempt to contact port server succeeded') + break + except: + traceback.print_exc() + port_log = open(logfile, 'r').read() + print(port_log) + sys.exit(1) + try: + urllib.request.urlopen('http://localhost:%d/get' % port_server_port, + timeout=1).read() + print('port server is up and ready') + break + except socket.timeout: + print('waiting for port_server: timeout') + traceback.print_exc(); + time.sleep(1) + waits += 1 + except urllib.error.URLError: + print('waiting for port_server: urlerror') + traceback.print_exc(); + time.sleep(1) + waits += 1 + except: + traceback.print_exc() + port_server.kill() + raise + diff --git a/tools/profiling/microbenchmark/bm.py b/tools/run_tests/run_microbenchmark.py similarity index 78% rename from tools/profiling/microbenchmark/bm.py rename to tools/run_tests/run_microbenchmark.py index b4f0de841fd06013ed06f696ebe2b89b61c6a1ec..42a31a622f74dca90fd5eb4f2cd19731b19ca3cc 100755 --- a/tools/profiling/microbenchmark/bm.py +++ b/tools/run_tests/run_microbenchmark.py @@ -33,8 +33,18 @@ import os import subprocess import sys +import python_utils.jobset as jobset +import python_utils.start_port_server as start_port_server + flamegraph_dir = os.path.join(os.path.expanduser('~'), 'FlameGraph') +os.chdir(os.path.join(os.path.dirname(sys.argv[0]), '../..')) +if not os.path.exists('reports'): + os.makedirs('reports') + +port_server_port = 32766 +start_port_server.start_port_server(port_server_port) + def fnize(s): out = '' for c in s: @@ -45,10 +55,6 @@ def fnize(s): out += c return out -os.chdir(os.path.join(os.path.dirname(sys.argv[0]), '../../..')) -if not os.path.exists('reports'): - os.makedirs('reports') - # index html index_html = """ <html> @@ -66,6 +72,9 @@ def link(txt, tgt): global index_html index_html += "<p><a href=\"%s\">%s</a></p>\n" % (tgt, txt) +benchmarks = [] +profile_analysis = [] + for bm_name in sys.argv[1:]: # generate latency profiles heading('Latency Profiles: %s' % bm_name) @@ -75,13 +84,18 @@ for bm_name in sys.argv[1:]: for line in subprocess.check_output(['bins/basicprof/%s' % bm_name, '--benchmark_list_tests']).splitlines(): link(line, '%s.txt' % fnize(line)) - with open('reports/%s.txt' % fnize(line), 'w') as f: - f.write(subprocess.check_output(['bins/basicprof/%s' % bm_name, - '--benchmark_filter=^%s$' % line])) - f.write('\n***********************************************************\n') - f.write(subprocess.check_output([ - sys.executable, 'tools/profiling/latency_profile/profile_analyzer.py', - '--source', 'latency_trace.txt', '--fmt', 'simple'])) + benchmarks.append( + jobset.JobSpec(['bins/basicprof/%s' % bm_name, '--benchmark_filter=^%s$' % line], + environ={'LATENCY_TRACE': '%s.trace' % fnize(line)})) + profile_analysis.append( + jobset.JobSpec([sys.executable, + 'tools/profiling/latency_profile/profile_analyzer.py', + '--source', '%s.trace' % fnize(line), '--fmt', 'simple', + '--out', 'reports/%s.txt' % fnize(line)], timeout_seconds=None)) + + jobset.run(benchmarks, maxjobs=multiprocessing.cpu_count()/2, + add_env={'GRPC_TEST_PORT_SERVER': 'localhost:%d' % port_server_port}) + jobset.run(profile_analysis, maxjobs=multiprocessing.cpu_count()) # generate flamegraphs heading('Flamegraphs: %s' % bm_name) diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py index 9d767258102670a85492bf60302322104d22677c..999ebf87eb7f5880c833db2b8b94c10f5582cc55 100755 --- a/tools/run_tests/run_tests.py +++ b/tools/run_tests/run_tests.py @@ -57,6 +57,7 @@ import uuid import python_utils.jobset as jobset import python_utils.report_utils as report_utils import python_utils.watch_dirs as watch_dirs +import python_utils.start_port_server as start_port_server _ROOT = os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]), '../..')) @@ -1322,97 +1323,6 @@ def _shut_down_legacy_server(legacy_server_port): 'http://localhost:%d/quitquitquit' % legacy_server_port).read() -def _start_port_server(port_server_port): - # check if a compatible port server is running - # if incompatible (version mismatch) ==> start a new one - # if not running ==> start a new one - # otherwise, leave it up - try: - version = int(urllib.request.urlopen( - 'http://localhost:%d/version_number' % port_server_port, - timeout=10).read()) - print('detected port server running version %d' % version) - running = True - except Exception as e: - print('failed to detect port server: %s' % sys.exc_info()[0]) - print(e.strerror) - running = False - if running: - current_version = int(subprocess.check_output( - [sys.executable, os.path.abspath('tools/run_tests/python_utils/port_server.py'), - 'dump_version'])) - print('my port server is version %d' % current_version) - running = (version >= current_version) - if not running: - print('port_server version mismatch: killing the old one') - urllib.request.urlopen('http://localhost:%d/quitquitquit' % port_server_port).read() - time.sleep(1) - if not running: - fd, logfile = tempfile.mkstemp() - os.close(fd) - print('starting port_server, with log file %s' % logfile) - args = [sys.executable, os.path.abspath('tools/run_tests/python_utils/port_server.py'), - '-p', '%d' % port_server_port, '-l', logfile] - env = dict(os.environ) - env['BUILD_ID'] = 'pleaseDontKillMeJenkins' - if platform_string() == 'windows': - # Working directory of port server needs to be outside of Jenkins - # workspace to prevent file lock issues. - tempdir = tempfile.mkdtemp() - port_server = subprocess.Popen( - args, - env=env, - cwd=tempdir, - creationflags = 0x00000008, # detached process - close_fds=True) - else: - port_server = subprocess.Popen( - args, - env=env, - preexec_fn=os.setsid, - close_fds=True) - time.sleep(1) - # ensure port server is up - waits = 0 - while True: - if waits > 10: - print('killing port server due to excessive start up waits') - port_server.kill() - if port_server.poll() is not None: - print('port_server failed to start') - # try one final time: maybe another build managed to start one - time.sleep(1) - try: - urllib.request.urlopen('http://localhost:%d/get' % port_server_port, - timeout=1).read() - print('last ditch attempt to contact port server succeeded') - break - except: - traceback.print_exc() - port_log = open(logfile, 'r').read() - print(port_log) - sys.exit(1) - try: - urllib.request.urlopen('http://localhost:%d/get' % port_server_port, - timeout=1).read() - print('port server is up and ready') - break - except socket.timeout: - print('waiting for port_server: timeout') - traceback.print_exc(); - time.sleep(1) - waits += 1 - except urllib.error.URLError: - print('waiting for port_server: urlerror') - traceback.print_exc(); - time.sleep(1) - waits += 1 - except: - traceback.print_exc() - port_server.kill() - raise - - def _calculate_num_runs_failures(list_of_results): """Caculate number of runs and failures for a particular test. @@ -1460,7 +1370,7 @@ def _build_and_run( antagonists = [subprocess.Popen(['tools/run_tests/python_utils/antagonist.py']) for _ in range(0, args.antagonists)] port_server_port = 32766 - _start_port_server(port_server_port) + start_port_server.start_port_server(port_server_port) resultset = None num_test_failures = 0 try: