Skip to content
Snippets Groups Projects
Commit 7dc4ea66 authored by Craig Tiller's avatar Craig Tiller
Browse files

Make the microbenchmark profile gatherer run some in parallel

parent 95ca017c
No related branches found
No related tags found
No related merge requests found
......@@ -44,6 +44,8 @@
#include <grpc/support/time.h>
#include <stdio.h>
#include "src/core/lib/support/env.h"
typedef enum { BEGIN = '{', END = '}', MARK = '.' } marker_type;
typedef struct gpr_timer_entry {
......@@ -74,7 +76,7 @@ typedef struct gpr_timer_log_list {
static __thread gpr_timer_log *g_thread_log;
static gpr_once g_once_init = GPR_ONCE_INIT;
static FILE *output_file;
static const char *output_filename = "latency_trace.txt";
static const char *output_filename_or_null = NULL;
static pthread_mutex_t g_mu;
static pthread_cond_t g_cv;
static gpr_timer_log_list g_in_progress_logs;
......@@ -85,6 +87,16 @@ static __thread int g_thread_id;
static int g_next_thread_id;
static int g_writing_enabled = 1;
static const char *output_filename() {
if (output_filename_or_null == NULL) {
output_filename_or_null = gpr_getenv("LATENCY_TRACE");
if (output_filename_or_null == NULL) {
output_filename_or_null = "latency_trace.txt";
}
}
return output_filename_or_null;
}
static int timer_log_push_back(gpr_timer_log_list *list, gpr_timer_log *log) {
if (list->head == NULL) {
list->head = list->tail = log;
......@@ -134,7 +146,7 @@ static void timer_log_remove(gpr_timer_log_list *list, gpr_timer_log *log) {
static void write_log(gpr_timer_log *log) {
size_t i;
if (output_file == NULL) {
output_file = fopen(output_filename, "w");
output_file = fopen(output_filename(), "w");
}
for (i = 0; i < log->num_entries; i++) {
gpr_timer_entry *entry = &(log->log[i]);
......@@ -198,7 +210,7 @@ static void finish_writing(void) {
}
void gpr_timers_set_log_filename(const char *filename) {
output_filename = filename;
output_filename_or_null = filename;
}
static void init_output() {
......
......@@ -34,6 +34,7 @@ import hashlib
import itertools
import json
import math
import sys
import tabulate
import time
......@@ -49,6 +50,7 @@ TIME_FROM_LAST_IMPORTANT = object()
argp = argparse.ArgumentParser(description='Process output of basic_prof builds')
argp.add_argument('--source', default='latency_trace.txt', type=str)
argp.add_argument('--fmt', choices=tabulate.tabulate_formats, default='simple')
argp.add_argument('--out', default='-', type=str)
args = argp.parse_args()
class LineItem(object):
......@@ -246,16 +248,20 @@ FORMAT = [
('TO_SCOPE_END', time_format(TIME_TO_SCOPE_END)),
]
out = sys.stdout
if args.out != '-':
out = open(args.out, 'w')
if args.fmt == 'html':
print '<html>'
print '<head>'
print '<title>Profile Report</title>'
print '</head>'
print >>out, '<html>'
print >>out, '<head>'
print >>out, '<title>Profile Report</title>'
print >>out, '</head>'
accounted_for = 0
for cs in call_stacks:
if args.fmt in BANNER:
print BANNER[args.fmt] % {
print >>out, BANNER[args.fmt] % {
'count': cs.count,
}
header, _ = zip(*FORMAT)
......@@ -265,7 +271,7 @@ for cs in call_stacks:
for _, fn in FORMAT:
fields.append(fn(line))
table.append(fields)
print tabulate.tabulate(table, header, tablefmt=args.fmt)
print >>out, tabulate.tabulate(table, header, tablefmt=args.fmt)
accounted_for += cs.count
if accounted_for > .99 * total_stacks:
break
......
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
from __future__ import print_function
from six.moves import urllib
import os
import subprocess
import tempfile
import sys
import time
def start_port_server(port_server_port):
# check if a compatible port server is running
# if incompatible (version mismatch) ==> start a new one
# if not running ==> start a new one
# otherwise, leave it up
try:
version = int(urllib.request.urlopen(
'http://localhost:%d/version_number' % port_server_port,
timeout=10).read())
print('detected port server running version %d' % version)
running = True
except Exception as e:
print('failed to detect port server: %s' % sys.exc_info()[0])
print(e.strerror)
running = False
if running:
current_version = int(subprocess.check_output(
[sys.executable, os.path.abspath('tools/run_tests/python_utils/port_server.py'),
'dump_version']))
print('my port server is version %d' % current_version)
running = (version >= current_version)
if not running:
print('port_server version mismatch: killing the old one')
urllib.request.urlopen('http://localhost:%d/quitquitquit' % port_server_port).read()
time.sleep(1)
if not running:
fd, logfile = tempfile.mkstemp()
os.close(fd)
print('starting port_server, with log file %s' % logfile)
args = [sys.executable, os.path.abspath('tools/run_tests/python_utils/port_server.py'),
'-p', '%d' % port_server_port, '-l', logfile]
env = dict(os.environ)
env['BUILD_ID'] = 'pleaseDontKillMeJenkins'
if platform_string() == 'windows':
# Working directory of port server needs to be outside of Jenkins
# workspace to prevent file lock issues.
tempdir = tempfile.mkdtemp()
port_server = subprocess.Popen(
args,
env=env,
cwd=tempdir,
creationflags = 0x00000008, # detached process
close_fds=True)
else:
port_server = subprocess.Popen(
args,
env=env,
preexec_fn=os.setsid,
close_fds=True)
time.sleep(1)
# ensure port server is up
waits = 0
while True:
if waits > 10:
print('killing port server due to excessive start up waits')
port_server.kill()
if port_server.poll() is not None:
print('port_server failed to start')
# try one final time: maybe another build managed to start one
time.sleep(1)
try:
urllib.request.urlopen('http://localhost:%d/get' % port_server_port,
timeout=1).read()
print('last ditch attempt to contact port server succeeded')
break
except:
traceback.print_exc()
port_log = open(logfile, 'r').read()
print(port_log)
sys.exit(1)
try:
urllib.request.urlopen('http://localhost:%d/get' % port_server_port,
timeout=1).read()
print('port server is up and ready')
break
except socket.timeout:
print('waiting for port_server: timeout')
traceback.print_exc();
time.sleep(1)
waits += 1
except urllib.error.URLError:
print('waiting for port_server: urlerror')
traceback.print_exc();
time.sleep(1)
waits += 1
except:
traceback.print_exc()
port_server.kill()
raise
......@@ -33,8 +33,18 @@ import os
import subprocess
import sys
import python_utils.jobset as jobset
import python_utils.start_port_server as start_port_server
flamegraph_dir = os.path.join(os.path.expanduser('~'), 'FlameGraph')
os.chdir(os.path.join(os.path.dirname(sys.argv[0]), '../..'))
if not os.path.exists('reports'):
os.makedirs('reports')
port_server_port = 32766
start_port_server.start_port_server(port_server_port)
def fnize(s):
out = ''
for c in s:
......@@ -45,10 +55,6 @@ def fnize(s):
out += c
return out
os.chdir(os.path.join(os.path.dirname(sys.argv[0]), '../../..'))
if not os.path.exists('reports'):
os.makedirs('reports')
# index html
index_html = """
<html>
......@@ -66,6 +72,9 @@ def link(txt, tgt):
global index_html
index_html += "<p><a href=\"%s\">%s</a></p>\n" % (tgt, txt)
benchmarks = []
profile_analysis = []
for bm_name in sys.argv[1:]:
# generate latency profiles
heading('Latency Profiles: %s' % bm_name)
......@@ -75,13 +84,18 @@ for bm_name in sys.argv[1:]:
for line in subprocess.check_output(['bins/basicprof/%s' % bm_name,
'--benchmark_list_tests']).splitlines():
link(line, '%s.txt' % fnize(line))
with open('reports/%s.txt' % fnize(line), 'w') as f:
f.write(subprocess.check_output(['bins/basicprof/%s' % bm_name,
'--benchmark_filter=^%s$' % line]))
f.write('\n***********************************************************\n')
f.write(subprocess.check_output([
sys.executable, 'tools/profiling/latency_profile/profile_analyzer.py',
'--source', 'latency_trace.txt', '--fmt', 'simple']))
benchmarks.append(
jobset.JobSpec(['bins/basicprof/%s' % bm_name, '--benchmark_filter=^%s$' % line],
environ={'LATENCY_TRACE': '%s.trace' % fnize(line)}))
profile_analysis.append(
jobset.JobSpec([sys.executable,
'tools/profiling/latency_profile/profile_analyzer.py',
'--source', '%s.trace' % fnize(line), '--fmt', 'simple',
'--out', 'reports/%s.txt' % fnize(line)], timeout_seconds=None))
jobset.run(benchmarks, maxjobs=multiprocessing.cpu_count()/2,
add_env={'GRPC_TEST_PORT_SERVER': 'localhost:%d' % port_server_port})
jobset.run(profile_analysis, maxjobs=multiprocessing.cpu_count())
# generate flamegraphs
heading('Flamegraphs: %s' % bm_name)
......
......@@ -57,6 +57,7 @@ import uuid
import python_utils.jobset as jobset
import python_utils.report_utils as report_utils
import python_utils.watch_dirs as watch_dirs
import python_utils.start_port_server as start_port_server
_ROOT = os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]), '../..'))
......@@ -1322,97 +1323,6 @@ def _shut_down_legacy_server(legacy_server_port):
'http://localhost:%d/quitquitquit' % legacy_server_port).read()
def _start_port_server(port_server_port):
# check if a compatible port server is running
# if incompatible (version mismatch) ==> start a new one
# if not running ==> start a new one
# otherwise, leave it up
try:
version = int(urllib.request.urlopen(
'http://localhost:%d/version_number' % port_server_port,
timeout=10).read())
print('detected port server running version %d' % version)
running = True
except Exception as e:
print('failed to detect port server: %s' % sys.exc_info()[0])
print(e.strerror)
running = False
if running:
current_version = int(subprocess.check_output(
[sys.executable, os.path.abspath('tools/run_tests/python_utils/port_server.py'),
'dump_version']))
print('my port server is version %d' % current_version)
running = (version >= current_version)
if not running:
print('port_server version mismatch: killing the old one')
urllib.request.urlopen('http://localhost:%d/quitquitquit' % port_server_port).read()
time.sleep(1)
if not running:
fd, logfile = tempfile.mkstemp()
os.close(fd)
print('starting port_server, with log file %s' % logfile)
args = [sys.executable, os.path.abspath('tools/run_tests/python_utils/port_server.py'),
'-p', '%d' % port_server_port, '-l', logfile]
env = dict(os.environ)
env['BUILD_ID'] = 'pleaseDontKillMeJenkins'
if platform_string() == 'windows':
# Working directory of port server needs to be outside of Jenkins
# workspace to prevent file lock issues.
tempdir = tempfile.mkdtemp()
port_server = subprocess.Popen(
args,
env=env,
cwd=tempdir,
creationflags = 0x00000008, # detached process
close_fds=True)
else:
port_server = subprocess.Popen(
args,
env=env,
preexec_fn=os.setsid,
close_fds=True)
time.sleep(1)
# ensure port server is up
waits = 0
while True:
if waits > 10:
print('killing port server due to excessive start up waits')
port_server.kill()
if port_server.poll() is not None:
print('port_server failed to start')
# try one final time: maybe another build managed to start one
time.sleep(1)
try:
urllib.request.urlopen('http://localhost:%d/get' % port_server_port,
timeout=1).read()
print('last ditch attempt to contact port server succeeded')
break
except:
traceback.print_exc()
port_log = open(logfile, 'r').read()
print(port_log)
sys.exit(1)
try:
urllib.request.urlopen('http://localhost:%d/get' % port_server_port,
timeout=1).read()
print('port server is up and ready')
break
except socket.timeout:
print('waiting for port_server: timeout')
traceback.print_exc();
time.sleep(1)
waits += 1
except urllib.error.URLError:
print('waiting for port_server: urlerror')
traceback.print_exc();
time.sleep(1)
waits += 1
except:
traceback.print_exc()
port_server.kill()
raise
def _calculate_num_runs_failures(list_of_results):
"""Caculate number of runs and failures for a particular test.
......@@ -1460,7 +1370,7 @@ def _build_and_run(
antagonists = [subprocess.Popen(['tools/run_tests/python_utils/antagonist.py'])
for _ in range(0, args.antagonists)]
port_server_port = 32766
_start_port_server(port_server_port)
start_port_server.start_port_server(port_server_port)
resultset = None
num_test_failures = 0
try:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment