Skip to content
Snippets Groups Projects
Commit c1b54f2f authored by Craig Tiller's avatar Craig Tiller
Browse files

Run QPS workers just for the scenario that we want, to avoid interference

parent 8e6d8715
No related branches found
No related tags found
No related merge requests found
...@@ -63,14 +63,19 @@ class QpsWorkerJob: ...@@ -63,14 +63,19 @@ class QpsWorkerJob:
self._spec = spec self._spec = spec
self.language = language self.language = language
self.host_and_port = host_and_port self.host_and_port = host_and_port
self._job = None
def start(self):
self._job = jobset.Job(spec, newline_on_success=True, travis=True, add_env={}) self._job = jobset.Job(spec, newline_on_success=True, travis=True, add_env={})
def is_running(self): def is_running(self):
"""Polls a job and returns True if given job is still running.""" """Polls a job and returns True if given job is still running."""
return self._job.state() == jobset._RUNNING return self._job and self._job.state() == jobset._RUNNING
def kill(self): def kill(self):
return self._job.kill() if self._job:
self._job.kill()
self._job = None
def create_qpsworker_job(language, shortname=None, def create_qpsworker_job(language, shortname=None,
...@@ -253,8 +258,8 @@ def build_on_remote_hosts(hosts, languages=scenario_config.LANGUAGES.keys(), bui ...@@ -253,8 +258,8 @@ def build_on_remote_hosts(hosts, languages=scenario_config.LANGUAGES.keys(), bui
sys.exit(1) sys.exit(1)
def start_qpsworkers(languages, worker_hosts): def create_qpsworkers(languages, worker_hosts):
"""Starts QPS workers as background jobs.""" """Creates QPS workers (but does not start them)."""
if not worker_hosts: if not worker_hosts:
# run two workers locally (for each language) # run two workers locally (for each language)
workers=[(None, 10000), (None, 10010)] workers=[(None, 10000), (None, 10010)]
...@@ -274,6 +279,9 @@ def start_qpsworkers(languages, worker_hosts): ...@@ -274,6 +279,9 @@ def start_qpsworkers(languages, worker_hosts):
for worker_idx, worker in enumerate(workers)] for worker_idx, worker in enumerate(workers)]
Scenario = collections.namedtuple('Scenario', 'jobspec workers')
def create_scenarios(languages, workers_by_lang, remote_host=None, regex='.*', def create_scenarios(languages, workers_by_lang, remote_host=None, regex='.*',
category='all', bq_result_table=None, category='all', bq_result_table=None,
netperf=False, netperf_hosts=[]): netperf=False, netperf_hosts=[]):
...@@ -282,6 +290,7 @@ def create_scenarios(languages, workers_by_lang, remote_host=None, regex='.*', ...@@ -282,6 +290,7 @@ def create_scenarios(languages, workers_by_lang, remote_host=None, regex='.*',
for workers in workers_by_lang.values() for workers in workers_by_lang.values()
for worker in workers] for worker in workers]
scenarios = [] scenarios = []
_NO_WORKERS = []
if netperf: if netperf:
if not netperf_hosts: if not netperf_hosts:
...@@ -293,16 +302,18 @@ def create_scenarios(languages, workers_by_lang, remote_host=None, regex='.*', ...@@ -293,16 +302,18 @@ def create_scenarios(languages, workers_by_lang, remote_host=None, regex='.*',
else: else:
netperf_server=netperf_hosts[0] netperf_server=netperf_hosts[0]
netperf_client=netperf_hosts[1] netperf_client=netperf_hosts[1]
scenarios.append(create_netperf_jobspec(server_host=netperf_server, scenarios.append(Scenario(
client_host=netperf_client, create_netperf_jobspec(server_host=netperf_server,
bq_result_table=bq_result_table)) client_host=netperf_client,
bq_result_table=bq_result_table),
_NO_WORKERS))
for language in languages: for language in languages:
for scenario_json in language.scenarios(): for scenario_json in language.scenarios():
if re.search(args.regex, scenario_json['name']): if re.search(args.regex, scenario_json['name']):
categories = scenario_json.get('CATEGORIES', []) categories = scenario_json.get('CATEGORIES', [])
if category in categories or (category == 'all' and categories != ['sweep']): if category in categories or (category == 'all' and categories != ['sweep']):
workers = workers_by_lang[str(language)] workers = workers_by_lang[str(language)][:]
# 'SERVER_LANGUAGE' is an indicator for this script to pick # 'SERVER_LANGUAGE' is an indicator for this script to pick
# a server in different language. # a server in different language.
custom_server_lang = scenario_json.get('SERVER_LANGUAGE', None) custom_server_lang = scenario_json.get('SERVER_LANGUAGE', None)
...@@ -330,14 +341,14 @@ def create_scenarios(languages, workers_by_lang, remote_host=None, regex='.*', ...@@ -330,14 +341,14 @@ def create_scenarios(languages, workers_by_lang, remote_host=None, regex='.*',
# replace all client workers by workers of a different language, # replace all client workers by workers of a different language,
# leave num_server workers as they are server workers. # leave num_server workers as they are server workers.
workers[idx] = workers_by_lang[custom_client_lang][idx] workers[idx] = workers_by_lang[custom_client_lang][idx]
scenario = create_scenario_jobspec(scenario_json, scenario = Scenario(
workers, create_scenario_jobspec(scenario_json,
remote_host=remote_host, [w.host_and_port for w in workers],
bq_result_table=bq_result_table) remote_host=remote_host,
bq_result_table=bq_result_table),
workers)
scenarios.append(scenario) scenarios.append(scenario)
# the very last scenario requests shutting down the workers.
scenarios.append(create_quit_jobspec(all_workers, remote_host=remote_host))
return scenarios return scenarios
...@@ -411,42 +422,31 @@ if not args.remote_driver_host: ...@@ -411,42 +422,31 @@ if not args.remote_driver_host:
build_local = True build_local = True
build_on_remote_hosts(remote_hosts, languages=[str(l) for l in languages], build_local=build_local) build_on_remote_hosts(remote_hosts, languages=[str(l) for l in languages], build_local=build_local)
qpsworker_jobs = start_qpsworkers(languages, args.remote_worker_host) qpsworker_jobs = create_qpsworkers(languages, args.remote_worker_host)
# TODO(jtattermusch): see https://github.com/grpc/grpc/issues/6174
time.sleep(5)
# get list of worker addresses for each language. # get list of worker addresses for each language.
worker_addresses = dict([(str(language), []) for language in languages]) workers_by_lang = dict([(str(language), []) for language in languages])
for job in qpsworker_jobs: for job in qpsworker_jobs:
worker_addresses[str(job.language)].append(job.host_and_port) workers_by_lang[str(job.language)].append(job)
try: scenarios = create_scenarios(languages,
scenarios = create_scenarios(languages, workers_by_lang=worker_addresses,
workers_by_lang=worker_addresses, remote_host=args.remote_driver_host,
remote_host=args.remote_driver_host, regex=args.regex,
regex=args.regex, category=args.category,
category=args.category, bq_result_table=args.bq_result_table,
bq_result_table=args.bq_result_table, netperf=args.netperf,
netperf=args.netperf, netperf_hosts=args.remote_worker_host)
netperf_hosts=args.remote_worker_host)
if not scenarios:
if not scenarios: raise Exception('No scenarios to run')
raise Exception('No scenarios to run')
for scenario in scenarios:
jobset.message('START', 'Running scenarios.', do_newline=True) try:
num_failures, _ = jobset.run( for worker in scenario.workers:
scenarios, newline_on_success=True, maxjobs=1) worker.start()
if num_failures == 0: jobset.run([scenario.jobspec,
jobset.message('SUCCESS', create_quit_jobspec(scenario.workers, remote_host=remote_host)],
'All scenarios finished successfully.', newline_on_success=True, maxjobs=1)
do_newline=True) finally:
else: finish_qps_workers(scenario.workers)
jobset.message('FAILED', 'Some of the scenarios failed.',
do_newline=True)
sys.exit(1)
except:
traceback.print_exc()
raise
finally:
finish_qps_workers(qpsworker_jobs)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment