From 3040cb7c434e83c0e70839ac20218f1c2d77e1eb Mon Sep 17 00:00:00 2001 From: ctiller <ctiller@google.com> Date: Wed, 7 Jan 2015 12:13:17 -0800 Subject: [PATCH] Add a --forever flag, to continuously run tests as things change. Change on 2015/01/07 by ctiller <ctiller@google.com> ------------- Created by MOE: http://code.google.com/p/moe-java MOE_MIGRATED_REVID=83451760 --- tools/run_tests/jobset.py | 155 ++++++++++++++++++++++------------ tools/run_tests/run_tests.py | 57 +++++++++---- tools/run_tests/watch_dirs.py | 46 ++++++++++ 3 files changed, 189 insertions(+), 69 deletions(-) create mode 100755 tools/run_tests/watch_dirs.py diff --git a/tools/run_tests/jobset.py b/tools/run_tests/jobset.py index 0890cc5d99..d3a46b63e1 100755 --- a/tools/run_tests/jobset.py +++ b/tools/run_tests/jobset.py @@ -4,14 +4,11 @@ import multiprocessing import random import subprocess import sys -import threading +import tempfile +import time -# multiplicative factor to over subscribe CPU cores -# (many tests sleep for a long time) -_OVERSUBSCRIBE = 32 -_active_jobs = threading.Semaphore( - multiprocessing.cpu_count() * _OVERSUBSCRIBE) -_output_lock = threading.Lock() + +_MAX_JOBS = 16 * multiprocessing.cpu_count() def shuffle_iteratable(it): @@ -25,7 +22,7 @@ def shuffle_iteratable(it): p = 1 for val in it: if random.randint(0, p) == 0: - p *= 2 + p = min(p*2, 100) yield val else: nextit.append(val) @@ -36,53 +33,107 @@ def shuffle_iteratable(it): yield val +_SUCCESS = object() +_FAILURE = object() +_RUNNING = object() +_KILLED = object() + + +class Job(object): + """Manages one job.""" + + def __init__(self, cmdline): + self._cmdline = ' '.join(cmdline) + self._tempfile = tempfile.TemporaryFile() + self._process = subprocess.Popen(args=cmdline, + stderr=subprocess.STDOUT, + stdout=self._tempfile) + self._state = _RUNNING + sys.stdout.write('\x1b[0G\x1b[2K\x1b[33mSTART\x1b[0m: %s' % + self._cmdline) + sys.stdout.flush() + + def state(self): + """Poll current state of the job. Prints messages at completion.""" + if self._state == _RUNNING and self._process.poll() is not None: + if self._process.returncode != 0: + self._state = _FAILURE + self._tempfile.seek(0) + stdout = self._tempfile.read() + sys.stdout.write('\x1b[0G\x1b[2K\x1b[31mFAILED\x1b[0m: %s' + ' [ret=%d]\n' + '%s\n' % ( + self._cmdline, self._process.returncode, stdout)) + sys.stdout.flush() + else: + self._state = _SUCCESS + sys.stdout.write('\x1b[0G\x1b[2K\x1b[32mPASSED\x1b[0m: %s' % + self._cmdline) + sys.stdout.flush() + return self._state + + def kill(self): + if self._state == _RUNNING: + self._state = _KILLED + self._process.terminate() + + class Jobset(object): """Manages one run of jobs.""" - def __init__(self, cmdlines): - self._cmdlines = shuffle_iteratable(cmdlines) + def __init__(self, check_cancelled): + self._running = set() + self._check_cancelled = check_cancelled + self._cancelled = False self._failures = 0 - def _run_thread(self, cmdline): - try: - # start the process - p = subprocess.Popen(args=cmdline, - stderr=subprocess.STDOUT, - stdout=subprocess.PIPE) - stdout, _ = p.communicate() - # log output (under a lock) - _output_lock.acquire() - try: - if p.returncode != 0: - sys.stdout.write('\x1b[0G\x1b[2K\x1b[31mFAILED\x1b[0m: %s' - ' [ret=%d]\n' - '%s\n' % ( - ' '.join(cmdline), p.returncode, - stdout)) - self._failures += 1 - else: - sys.stdout.write('\x1b[0G\x1b[2K\x1b[32mPASSED\x1b[0m: %s' % - ' '.join(cmdline)) - sys.stdout.flush() - finally: - _output_lock.release() - finally: - _active_jobs.release() - - def run(self): - threads = [] - for cmdline in self._cmdlines: - # cap number of active jobs - release in _run_thread - _active_jobs.acquire() - t = threading.Thread(target=self._run_thread, - args=[cmdline]) - t.start() - threads.append(t) - for thread in threads: - thread.join() - return self._failures == 0 - - -def run(cmdlines): - return Jobset(cmdlines).run() + def start(self, cmdline): + """Start a job. Return True on success, False on failure.""" + while len(self._running) >= _MAX_JOBS: + if self.cancelled(): return False + self.reap() + if self.cancelled(): return False + self._running.add(Job(cmdline)) + return True + + def reap(self): + """Collect the dead jobs.""" + while self._running: + dead = set() + for job in self._running: + st = job.state() + if st == _RUNNING: continue + if st == _FAILURE: self._failures += 1 + dead.add(job) + for job in dead: + self._running.remove(job) + if not dead: return + time.sleep(0.1) + + def cancelled(self): + """Poll for cancellation.""" + if self._cancelled: return True + if not self._check_cancelled(): return False + for job in self._running: + job.kill() + self._cancelled = True + return True + + def finish(self): + while self._running: + if self.cancelled(): pass # poll cancellation + self.reap() + return not self.cancelled() and self._failures == 0 + + +def _never_cancelled(): + return False + + +def run(cmdlines, check_cancelled=_never_cancelled): + js = Jobset(check_cancelled) + for cmdline in shuffle_iteratable(cmdlines): + if not js.start(cmdline): + break + return js.finish() diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py index ee61f33484..9234682bea 100755 --- a/tools/run_tests/run_tests.py +++ b/tools/run_tests/run_tests.py @@ -6,8 +6,10 @@ import glob import itertools import multiprocessing import sys +import time import jobset +import watch_dirs # flags required for make for each configuration _CONFIGS = ['dbg', 'opt', 'tsan', 'msan', 'asan'] @@ -20,6 +22,10 @@ argp.add_argument('-c', '--config', default=['all']) argp.add_argument('-t', '--test-filter', nargs='*', default=['*']) argp.add_argument('-n', '--runs_per_test', default=1, type=int) +argp.add_argument('-f', '--forever', + default=False, + action='store_const', + const=True) args = argp.parse_args() # grab config @@ -29,21 +35,38 @@ configs = [cfg for x in args.config)] filters = args.test_filter runs_per_test = args.runs_per_test +forever = args.forever + + +def _build_and_run(check_cancelled): + """Do one pass of building & running tests.""" + # build latest, sharing cpu between the various makes + if not jobset.run( + (['make', + '-j', '%d' % max(multiprocessing.cpu_count() / len(configs), 1), + 'buildtests_c', + 'CONFIG=%s' % cfg] + for cfg in configs), check_cancelled): + sys.exit(1) + + # run all the tests + jobset.run(([x] + for x in itertools.chain.from_iterable( + itertools.chain.from_iterable(itertools.repeat( + glob.glob('bins/%s/%s_test' % (config, filt)), + runs_per_test)) + for config in configs + for filt in filters)), check_cancelled) + + +if forever: + while True: + dw = watch_dirs.DirWatcher(['src', 'include', 'test']) + initial_time = dw.most_recent_change() + have_files_changed = lambda: dw.most_recent_change() != initial_time + _build_and_run(have_files_changed) + while not have_files_changed(): + time.sleep(1) +else: + _build_and_run(lambda: False) -# build latest, sharing cpu between the various makes -if not jobset.run( - ['make', - '-j', '%d' % max(multiprocessing.cpu_count() / len(configs), 1), - 'buildtests_c', - 'CONFIG=%s' % cfg] - for cfg in configs): - sys.exit(1) - -# run all the tests -jobset.run([x] - for x in itertools.chain.from_iterable( - itertools.chain.from_iterable(itertools.repeat( - glob.glob('bins/%s/%s_test' % (config, filt)), - runs_per_test)) - for config in configs - for filt in filters)) diff --git a/tools/run_tests/watch_dirs.py b/tools/run_tests/watch_dirs.py new file mode 100755 index 0000000000..8ebbb27180 --- /dev/null +++ b/tools/run_tests/watch_dirs.py @@ -0,0 +1,46 @@ +"""Helper to watch a (set) of directories for modifications.""" + +import os +import threading +import time + + +class DirWatcher(object): + """Helper to watch a (set) of directories for modifications.""" + + def __init__(self, paths): + if isinstance(paths, basestring): + paths = [paths] + self._mu = threading.Lock() + self._done = False + self.paths = list(paths) + self.lastrun = time.time() + self._cache = self._calculate() + + def _calculate(self): + """Walk over all subscribed paths, check most recent mtime.""" + most_recent_change = None + for path in self.paths: + if not os.path.exists(path): + continue + if not os.path.isdir(path): + continue + for root, _, files in os.walk(path): + for f in files: + st = os.stat(os.path.join(root, f)) + if most_recent_change is None: + most_recent_change = st.st_mtime + else: + most_recent_change = max(most_recent_change, st.st_mtime) + return most_recent_change + + def most_recent_change(self): + self._mu.acquire() + try: + if time.time() - self.lastrun > 1: + self._cache = self._calculate() + self.lastrun = time.time() + return self._cache + finally: + self._mu.release() + -- GitLab