Skip to content
Snippets Groups Projects
Commit b9f11c3f authored by Jan Tattermusch's avatar Jan Tattermusch
Browse files

Merge pull request #6625 from sreecha/stress_test_misc

Misc changes to stress test framework (to reduce false positives and tools for debugging)
parents 8c1d7969 4dd02fc6
No related branches found
No related tags found
No related merge requests found
...@@ -42,13 +42,15 @@ ...@@ -42,13 +42,15 @@
#include "test/cpp/util/metrics_server.h" #include "test/cpp/util/metrics_server.h"
#include "test/cpp/util/test_config.h" #include "test/cpp/util/test_config.h"
DEFINE_string(metrics_server_address, "", int kDeadlineSecs = 10;
DEFINE_string(metrics_server_address, "localhost:8081",
"The metrics server addresses in the fomrat <hostname>:<port>"); "The metrics server addresses in the fomrat <hostname>:<port>");
DEFINE_int32(deadline_secs, kDeadlineSecs,
"The deadline (in seconds) for RCP call");
DEFINE_bool(total_only, false, DEFINE_bool(total_only, false,
"If true, this prints only the total value of all gauges"); "If true, this prints only the total value of all gauges");
int kDeadlineSecs = 10;
using grpc::testing::EmptyMessage; using grpc::testing::EmptyMessage;
using grpc::testing::GaugeResponse; using grpc::testing::GaugeResponse;
using grpc::testing::MetricsService; using grpc::testing::MetricsService;
...@@ -56,12 +58,13 @@ using grpc::testing::MetricsServiceImpl; ...@@ -56,12 +58,13 @@ using grpc::testing::MetricsServiceImpl;
// Prints the values of all Gauges (unless total_only is set to 'true' in which // Prints the values of all Gauges (unless total_only is set to 'true' in which
// case this only prints the sum of all gauge values). // case this only prints the sum of all gauge values).
bool PrintMetrics(std::unique_ptr<MetricsService::Stub> stub, bool total_only) { bool PrintMetrics(std::unique_ptr<MetricsService::Stub> stub, bool total_only,
int deadline_secs) {
grpc::ClientContext context; grpc::ClientContext context;
EmptyMessage message; EmptyMessage message;
std::chrono::system_clock::time_point deadline = std::chrono::system_clock::time_point deadline =
std::chrono::system_clock::now() + std::chrono::seconds(kDeadlineSecs); std::chrono::system_clock::now() + std::chrono::seconds(deadline_secs);
context.set_deadline(deadline); context.set_deadline(deadline);
...@@ -108,7 +111,8 @@ int main(int argc, char** argv) { ...@@ -108,7 +111,8 @@ int main(int argc, char** argv) {
std::shared_ptr<grpc::Channel> channel(grpc::CreateChannel( std::shared_ptr<grpc::Channel> channel(grpc::CreateChannel(
FLAGS_metrics_server_address, grpc::InsecureChannelCredentials())); FLAGS_metrics_server_address, grpc::InsecureChannelCredentials()));
if (!PrintMetrics(MetricsService::NewStub(channel), FLAGS_total_only)) { if (!PrintMetrics(MetricsService::NewStub(channel), FLAGS_total_only,
FLAGS_deadline_secs)) {
return 1; return 1;
} }
......
...@@ -133,12 +133,15 @@ def run_client(): ...@@ -133,12 +133,15 @@ def run_client():
details = 'Logfile: %s' % logfile_name details = 'Logfile: %s' % logfile_name
logfile = open(logfile_name, 'w') logfile = open(logfile_name, 'w')
metrics_cmd = metrics_client_cmd + [x
for x in metrics_client_args_str.split()]
stress_cmd = stress_client_cmd + [x for x in args_str.split()]
details = '%s, Metrics command: %s, Stress client command: %s' % (
details, str(metrics_cmd), str(stress_cmd))
# Update status that the test is starting (in the status table) # Update status that the test is starting (in the status table)
bq_helper.insert_summary_row(EventType.STARTING, details) bq_helper.insert_summary_row(EventType.STARTING, details)
metrics_cmd = metrics_client_cmd + [x for x in metrics_client_args_str.split()]
stress_cmd = stress_client_cmd + [x for x in args_str.split()]
print 'Launching process %s ...' % stress_cmd print 'Launching process %s ...' % stress_cmd
stress_p = subprocess.Popen(args=stress_cmd, stress_p = subprocess.Popen(args=stress_cmd,
stdout=logfile, stdout=logfile,
...@@ -147,6 +150,7 @@ def run_client(): ...@@ -147,6 +150,7 @@ def run_client():
qps_history = [1, 1, 1] # Maintain the last 3 qps readings qps_history = [1, 1, 1] # Maintain the last 3 qps readings
qps_history_idx = 0 # Index into the qps_history list qps_history_idx = 0 # Index into the qps_history list
is_running_status_written = False
is_error = False is_error = False
while True: while True:
# Check if stress_client is still running. If so, collect metrics and upload # Check if stress_client is still running. If so, collect metrics and upload
...@@ -165,6 +169,10 @@ def run_client(): ...@@ -165,6 +169,10 @@ def run_client():
print details print details
break break
if not is_running_status_written:
bq_helper.insert_summary_row(EventType.RUNNING, '')
is_running_status_written = True
# Stress client still running. Get metrics # Stress client still running. Get metrics
qps = _get_qps(metrics_cmd) qps = _get_qps(metrics_cmd)
qps_recorded_at = datetime.datetime.now().isoformat() qps_recorded_at = datetime.datetime.now().isoformat()
......
...@@ -106,16 +106,22 @@ def run_server(): ...@@ -106,16 +106,22 @@ def run_server():
logfile = open(logfile_name, 'w') logfile = open(logfile_name, 'w')
details = 'Logfile: %s' % logfile_name details = 'Logfile: %s' % logfile_name
stress_cmd = stress_server_cmd + [x for x in args_str.split()]
details = '%s, Stress server command: %s' % (details, str(stress_cmd))
# Update status that the test is starting (in the status table) # Update status that the test is starting (in the status table)
bq_helper.insert_summary_row(EventType.STARTING, details) bq_helper.insert_summary_row(EventType.STARTING, details)
stress_cmd = stress_server_cmd + [x for x in args_str.split()]
print 'Launching process %s ...' % stress_cmd print 'Launching process %s ...' % stress_cmd
stress_p = subprocess.Popen(args=stress_cmd, stress_p = subprocess.Popen(args=stress_cmd,
stdout=logfile, stdout=logfile,
stderr=subprocess.STDOUT) stderr=subprocess.STDOUT)
# Update the status to running if subprocess.Popen launched the server
if stress_p.poll() is None:
bq_helper.insert_summary_row(EventType.RUNNING, '')
# Wait for the server process to terminate
returncode = stress_p.wait() returncode = stress_p.wait()
if will_run_forever == '1' or returncode != 0: if will_run_forever == '1' or returncode != 0:
......
...@@ -46,6 +46,7 @@ import big_query_utils as bq_utils ...@@ -46,6 +46,7 @@ import big_query_utils as bq_utils
class EventType: class EventType:
STARTING = 'STARTING' STARTING = 'STARTING'
RUNNING = 'RUNNING'
SUCCESS = 'SUCCESS' SUCCESS = 'SUCCESS'
FAILURE = 'FAILURE' FAILURE = 'FAILURE'
...@@ -195,11 +196,11 @@ class BigQueryHelper: ...@@ -195,11 +196,11 @@ class BigQueryHelper:
('image_type', 'STRING', 'Client or Server?'), ('image_type', 'STRING', 'Client or Server?'),
('pod_name', 'STRING', 'GKE pod hosting this image'), ('pod_name', 'STRING', 'GKE pod hosting this image'),
('event_date', 'STRING', 'The date of this event'), ('event_date', 'STRING', 'The date of this event'),
('event_type', 'STRING', 'STARTED/SUCCESS/FAILURE'), ('event_type', 'STRING', 'STARTING/RUNNING/SUCCESS/FAILURE'),
('details', 'STRING', 'Any other relevant details') ('details', 'STRING', 'Any other relevant details')
] ]
desc = ('The table that contains START/SUCCESS/FAILURE events for ' desc = ('The table that contains STARTING/RUNNING/SUCCESS/FAILURE events '
' the stress test clients and servers') 'for the stress test clients and servers')
return bq_utils.create_table(self.bq, self.project_id, self.dataset_id, return bq_utils.create_table(self.bq, self.project_id, self.dataset_id,
self.summary_table_id, summary_table_schema, self.summary_table_id, summary_table_schema,
desc) desc)
......
...@@ -10,7 +10,7 @@ ...@@ -10,7 +10,7 @@
"baseTemplates": { "baseTemplates": {
"default": { "default": {
"wrapperScriptPath": "/var/local/git/grpc/tools/gcp/stress_test/run_client.py", "wrapperScriptPath": "/var/local/git/grpc/tools/gcp/stress_test/run_client.py",
"pollIntervalSecs": 60, "pollIntervalSecs": 100,
"clientArgs": { "clientArgs": {
"num_channels_per_server":5, "num_channels_per_server":5,
"num_stubs_per_channel":10, "num_stubs_per_channel":10,
...@@ -20,7 +20,8 @@ ...@@ -20,7 +20,8 @@
"metricsPort": 8081, "metricsPort": 8081,
"metricsArgs": { "metricsArgs": {
"metrics_server_address": "localhost:8081", "metrics_server_address": "localhost:8081",
"total_only": "true" "total_only": "true",
"deadline_secs": 60
} }
} }
}, },
...@@ -78,7 +79,7 @@ ...@@ -78,7 +79,7 @@
"globalSettings": { "globalSettings": {
"buildDockerImages": true, "buildDockerImages": true,
"pollIntervalSecs": 60, "pollIntervalSecs": 100,
"testDurationSecs": 7200, "testDurationSecs": 7200,
"kubernetesProxyPort": 8009, "kubernetesProxyPort": 8009,
"datasetIdNamePrefix": "stress_test_csharp", "datasetIdNamePrefix": "stress_test_csharp",
......
...@@ -10,7 +10,7 @@ ...@@ -10,7 +10,7 @@
"baseTemplates": { "baseTemplates": {
"default": { "default": {
"wrapperScriptPath": "/var/local/git/grpc/tools/gcp/stress_test/run_client.py", "wrapperScriptPath": "/var/local/git/grpc/tools/gcp/stress_test/run_client.py",
"pollIntervalSecs": 60, "pollIntervalSecs": 100,
"clientArgs": { "clientArgs": {
"num_channels_per_server":5, "num_channels_per_server":5,
"num_stubs_per_channel":10, "num_stubs_per_channel":10,
...@@ -20,7 +20,8 @@ ...@@ -20,7 +20,8 @@
"metricsPort": 8081, "metricsPort": 8081,
"metricsArgs": { "metricsArgs": {
"metrics_server_address": "localhost:8081", "metrics_server_address": "localhost:8081",
"total_only": "true" "total_only": "true",
"deadline_secs": 60
}, },
"env": { "env": {
"STRESSTEST_CLIENT_OPTS":"-Xmx3g -Xms3g -XX:NewSize=1500m -XX:MaxNewSize=1500m -XX:+UseConcMarkSweepGC" "STRESSTEST_CLIENT_OPTS":"-Xmx3g -Xms3g -XX:NewSize=1500m -XX:MaxNewSize=1500m -XX:+UseConcMarkSweepGC"
...@@ -85,7 +86,7 @@ ...@@ -85,7 +86,7 @@
"globalSettings": { "globalSettings": {
"buildDockerImages": true, "buildDockerImages": true,
"pollIntervalSecs": 60, "pollIntervalSecs": 100,
"testDurationSecs": 7200, "testDurationSecs": 7200,
"kubernetesProxyPort": 8008, "kubernetesProxyPort": 8008,
"datasetIdNamePrefix": "stress_test_java", "datasetIdNamePrefix": "stress_test_java",
......
#!/usr/bin/env python2.7
# Copyright 2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import argparse
import os
import sys
stress_test_utils_dir = os.path.abspath(os.path.join(
os.path.dirname(__file__), '../../gcp/stress_test'))
sys.path.append(stress_test_utils_dir)
from stress_test_utils import BigQueryHelper
argp = argparse.ArgumentParser(
description='Print summary tables',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
argp.add_argument('--gcp_project_id',
required=True,
help='The Google Cloud Platform Project Id')
argp.add_argument('--dataset_id', type=str, required=True)
argp.add_argument('--run_id', type=str, required=True)
argp.add_argument('--summary_table_id', type=str, default='summary')
argp.add_argument('--qps_table_id', type=str, default='qps')
argp.add_argument('--summary_only', action='store_true', default=True)
if __name__ == '__main__':
args = argp.parse_args()
bq_helper = BigQueryHelper(args.run_id, '', '', args.gcp_project_id,
args.dataset_id, args.summary_table_id,
args.qps_table_id)
bq_helper.initialize()
if not args.summary_only:
bq_helper.print_qps_records()
bq_helper.print_summary_records()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment