Skip to content
Snippets Groups Projects
Commit 56c6b6ab authored by Craig Tiller's avatar Craig Tiller
Browse files

Use CPU cost modelling to increase parallelism

parent a5a5cab3
No related branches found
No related tags found
No related merge requests found
...@@ -1005,6 +1005,7 @@ targets: ...@@ -1005,6 +1005,7 @@ targets:
- grpc - grpc
- gpr_test_util - gpr_test_util
- gpr - gpr
cpu_cost: 2
platforms: platforms:
- mac - mac
- linux - linux
...@@ -1019,6 +1020,7 @@ targets: ...@@ -1019,6 +1020,7 @@ targets:
- grpc - grpc
- gpr_test_util - gpr_test_util
- gpr - gpr
cpu_cost: 2
platforms: platforms:
- mac - mac
- linux - linux
...@@ -1141,6 +1143,7 @@ targets: ...@@ -1141,6 +1143,7 @@ targets:
deps: deps:
- gpr_test_util - gpr_test_util
- gpr - gpr
cpu_cost: 10
- name: gpr_thd_test - name: gpr_thd_test
build: test build: test
language: c language: c
...@@ -1149,6 +1152,7 @@ targets: ...@@ -1149,6 +1152,7 @@ targets:
deps: deps:
- gpr_test_util - gpr_test_util
- gpr - gpr
cpu_cost: 10
- name: gpr_time_test - name: gpr_time_test
build: test build: test
language: c language: c
......
...@@ -10,7 +10,8 @@ ...@@ -10,7 +10,8 @@
"ci_platforms": tgt.ci_platforms, "ci_platforms": tgt.ci_platforms,
"exclude_configs": tgt.get("exclude_configs", []), "exclude_configs": tgt.get("exclude_configs", []),
"args": [], "args": [],
"flaky": tgt.flaky} "flaky": tgt.flaky,
"cpu_cost": tgt.get("cpu_cost", 1.0)}
for tgt in targets for tgt in targets
if tgt.get('run', True) and tgt.build == 'test'] + if tgt.get('run', True) and tgt.build == 'test'] +
tests, tests,
......
...@@ -77,21 +77,23 @@ END2END_FIXTURES = { ...@@ -77,21 +77,23 @@ END2END_FIXTURES = {
} }
TestOptions = collections.namedtuple( TestOptions = collections.namedtuple(
'TestOptions', 'needs_fullstack needs_dns proxyable secure traceable') 'TestOptions', 'needs_fullstack needs_dns proxyable secure traceable cpu_cost')
default_test_options = TestOptions(False, False, True, False, True) default_test_options = TestOptions(False, False, True, False, True, 1.0)
connectivity_test_options = default_test_options._replace(needs_fullstack=True) connectivity_test_options = default_test_options._replace(needs_fullstack=True)
LOWCPU = 0.01
# maps test names to options # maps test names to options
END2END_TESTS = { END2END_TESTS = {
'bad_hostname': default_test_options, 'bad_hostname': default_test_options,
'binary_metadata': default_test_options, 'binary_metadata': default_test_options,
'call_creds': default_test_options._replace(secure=True), 'call_creds': default_test_options._replace(secure=True),
'cancel_after_accept': default_test_options, 'cancel_after_accept': default_test_options._replace(cpu_cost=LOWCPU),
'cancel_after_client_done': default_test_options, 'cancel_after_client_done': default_test_options._replace(cpu_cost=LOWCPU),
'cancel_after_invoke': default_test_options, 'cancel_after_invoke': default_test_options._replace(cpu_cost=LOWCPU),
'cancel_before_invoke': default_test_options, 'cancel_before_invoke': default_test_options._replace(cpu_cost=LOWCPU),
'cancel_in_a_vacuum': default_test_options, 'cancel_in_a_vacuum': default_test_options._replace(cpu_cost=LOWCPU),
'cancel_with_status': default_test_options, 'cancel_with_status': default_test_options._replace(cpu_cost=LOWCPU),
'channel_connectivity': connectivity_test_options._replace(proxyable=False), 'channel_connectivity': connectivity_test_options._replace(proxyable=False),
'channel_ping': connectivity_test_options._replace(proxyable=False), 'channel_ping': connectivity_test_options._replace(proxyable=False),
'compressed_payload': default_test_options._replace(proxyable=False), 'compressed_payload': default_test_options._replace(proxyable=False),
...@@ -101,7 +103,8 @@ END2END_TESTS = { ...@@ -101,7 +103,8 @@ END2END_TESTS = {
'empty_batch': default_test_options, 'empty_batch': default_test_options,
'graceful_server_shutdown': default_test_options, 'graceful_server_shutdown': default_test_options,
'hpack_size': default_test_options._replace(proxyable=False, 'hpack_size': default_test_options._replace(proxyable=False,
traceable=False), traceable=False,
cpu_cost=2.0),
'high_initial_seqno': default_test_options, 'high_initial_seqno': default_test_options,
'invoke_large_request': default_test_options, 'invoke_large_request': default_test_options,
'large_metadata': default_test_options, 'large_metadata': default_test_options,
...@@ -252,6 +255,7 @@ def main(): ...@@ -252,6 +255,7 @@ def main():
END2END_FIXTURES[f].platforms, 'mac')), END2END_FIXTURES[f].platforms, 'mac')),
'flaky': False, 'flaky': False,
'language': 'c', 'language': 'c',
'cpu_cost': END2END_TESTS[t].cpu_cost,
} }
for f in sorted(END2END_FIXTURES.keys()) for f in sorted(END2END_FIXTURES.keys())
for t in sorted(END2END_TESTS.keys()) if compatible(f, t) for t in sorted(END2END_TESTS.keys()) if compatible(f, t)
...@@ -266,6 +270,7 @@ def main(): ...@@ -266,6 +270,7 @@ def main():
END2END_FIXTURES[f].platforms, 'mac')), END2END_FIXTURES[f].platforms, 'mac')),
'flaky': False, 'flaky': False,
'language': 'c', 'language': 'c',
'cpu_cost': END2END_TESTS[t].cpu_cost,
} }
for f in sorted(END2END_FIXTURES.keys()) for f in sorted(END2END_FIXTURES.keys())
if not END2END_FIXTURES[f].secure if not END2END_FIXTURES[f].secure
......
...@@ -146,7 +146,7 @@ class JobSpec(object): ...@@ -146,7 +146,7 @@ class JobSpec(object):
def __init__(self, cmdline, shortname=None, environ=None, hash_targets=None, def __init__(self, cmdline, shortname=None, environ=None, hash_targets=None,
cwd=None, shell=False, timeout_seconds=5*60, flake_retries=0, cwd=None, shell=False, timeout_seconds=5*60, flake_retries=0,
timeout_retries=0, kill_handler=None): timeout_retries=0, kill_handler=None, cpu_cost=1.0):
""" """
Arguments: Arguments:
cmdline: a list of arguments to pass as the command line cmdline: a list of arguments to pass as the command line
...@@ -154,6 +154,7 @@ class JobSpec(object): ...@@ -154,6 +154,7 @@ class JobSpec(object):
hash_targets: which files to include in the hash representing the jobs version hash_targets: which files to include in the hash representing the jobs version
(or empty, indicating the job should not be hashed) (or empty, indicating the job should not be hashed)
kill_handler: a handler that will be called whenever job.kill() is invoked kill_handler: a handler that will be called whenever job.kill() is invoked
cpu_cost: number of cores per second this job needs
""" """
if environ is None: if environ is None:
environ = {} environ = {}
...@@ -169,6 +170,7 @@ class JobSpec(object): ...@@ -169,6 +170,7 @@ class JobSpec(object):
self.flake_retries = flake_retries self.flake_retries = flake_retries
self.timeout_retries = timeout_retries self.timeout_retries = timeout_retries
self.kill_handler = kill_handler self.kill_handler = kill_handler
self.cpu_cost = cpu_cost
def identity(self): def identity(self):
return '%r %r %r' % (self.cmdline, self.environ, self.hash_targets) return '%r %r %r' % (self.cmdline, self.environ, self.hash_targets)
...@@ -329,10 +331,19 @@ class Jobset(object): ...@@ -329,10 +331,19 @@ class Jobset(object):
def get_num_failures(self): def get_num_failures(self):
return self._failures return self._failures
def cpu_cost(self):
c = 0
for job in self._running:
c += job._spec.cpu_cost
return c
def start(self, spec): def start(self, spec):
"""Start a job. Return True on success, False on failure.""" """Start a job. Return True on success, False on failure."""
while len(self._running) >= self._maxjobs: while True:
if self.cancelled(): return False if self.cancelled(): return False
current_cpu_cost = self.cpu_cost()
if current_cpu_cost == 0: break
if current_cpu_cost + spec.cpu_cost < self._maxjobs: break
self.reap() self.reap()
if self.cancelled(): return False if self.cancelled(): return False
if spec.hash_targets: if spec.hash_targets:
......
...@@ -78,7 +78,7 @@ class SimpleConfig(object): ...@@ -78,7 +78,7 @@ class SimpleConfig(object):
self.timeout_multiplier = timeout_multiplier self.timeout_multiplier = timeout_multiplier
def job_spec(self, cmdline, hash_targets, timeout_seconds=5*60, def job_spec(self, cmdline, hash_targets, timeout_seconds=5*60,
shortname=None, environ={}): shortname=None, environ={}, cpu_cost=1.0):
"""Construct a jobset.JobSpec for a test under this config """Construct a jobset.JobSpec for a test under this config
Args: Args:
...@@ -96,6 +96,7 @@ class SimpleConfig(object): ...@@ -96,6 +96,7 @@ class SimpleConfig(object):
return jobset.JobSpec(cmdline=cmdline, return jobset.JobSpec(cmdline=cmdline,
shortname=shortname, shortname=shortname,
environ=actual_environ, environ=actual_environ,
cpu_cost=cpu_cost,
timeout_seconds=self.timeout_multiplier * timeout_seconds, timeout_seconds=self.timeout_multiplier * timeout_seconds,
hash_targets=hash_targets hash_targets=hash_targets
if self.allow_hashing else None, if self.allow_hashing else None,
...@@ -114,11 +115,12 @@ class ValgrindConfig(object): ...@@ -114,11 +115,12 @@ class ValgrindConfig(object):
self.args = args self.args = args
self.allow_hashing = False self.allow_hashing = False
def job_spec(self, cmdline, hash_targets): def job_spec(self, cmdline, hash_targets, cpu_cost=1.0):
return jobset.JobSpec(cmdline=['valgrind', '--tool=%s' % self.tool] + return jobset.JobSpec(cmdline=['valgrind', '--tool=%s' % self.tool] +
self.args + cmdline, self.args + cmdline,
shortname='valgrind %s' % cmdline[0], shortname='valgrind %s' % cmdline[0],
hash_targets=None, hash_targets=None,
cpu_cost=cpu_cost,
flake_retries=5 if args.allow_flakes else 0, flake_retries=5 if args.allow_flakes else 0,
timeout_retries=3 if args.allow_flakes else 0) timeout_retries=3 if args.allow_flakes else 0)
...@@ -157,6 +159,7 @@ class CLanguage(object): ...@@ -157,6 +159,7 @@ class CLanguage(object):
cmdline = [binary] + target['args'] cmdline = [binary] + target['args']
out.append(config.job_spec(cmdline, [binary], out.append(config.job_spec(cmdline, [binary],
shortname=' '.join(cmdline), shortname=' '.join(cmdline),
cpu_cost=target['cpu_cost'],
environ={'GRPC_DEFAULT_SSL_ROOTS_FILE_PATH': environ={'GRPC_DEFAULT_SSL_ROOTS_FILE_PATH':
os.path.abspath(os.path.dirname( os.path.abspath(os.path.dirname(
sys.argv[0]) + '/../../src/core/tsi/test_creds/ca.pem')})) sys.argv[0]) + '/../../src/core/tsi/test_creds/ca.pem')}))
......
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment