diff --git a/test/cpp/interop/metrics_client.cc b/test/cpp/interop/metrics_client.cc
index 0c140ffd85ea42c90cc0db97f1bf55f4ce07ec94..bd48c7d4ef2b9d564cc295ae8ad08139a3fba213 100644
--- a/test/cpp/interop/metrics_client.cc
+++ b/test/cpp/interop/metrics_client.cc
@@ -1,6 +1,6 @@
 /*
  *
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
@@ -37,39 +37,45 @@
 #include <gflags/gflags.h>
 #include <grpc++/grpc++.h>
 
-#include "test/cpp/util/metrics_server.h"
-#include "test/cpp/util/test_config.h"
 #include "src/proto/grpc/testing/metrics.grpc.pb.h"
 #include "src/proto/grpc/testing/metrics.pb.h"
+#include "test/cpp/util/metrics_server.h"
+#include "test/cpp/util/test_config.h"
 
 DEFINE_string(metrics_server_address, "",
               "The metrics server addresses in the fomrat <hostname>:<port>");
+DEFINE_bool(total_only, false,
+            "If true, this prints only the total value of all gauges");
+
+int kDeadlineSecs = 10;
 
 using grpc::testing::EmptyMessage;
 using grpc::testing::GaugeResponse;
 using grpc::testing::MetricsService;
 using grpc::testing::MetricsServiceImpl;
 
-void PrintMetrics(const grpc::string& server_address) {
-  gpr_log(GPR_INFO, "creating a channel to %s", server_address.c_str());
-  std::shared_ptr<grpc::Channel> channel(
-      grpc::CreateChannel(server_address, grpc::InsecureChannelCredentials()));
-
-  std::unique_ptr<MetricsService::Stub> stub(MetricsService::NewStub(channel));
-
+// Prints the values of all Gauges (unless total_only is set to 'true' in which
+// case this only prints the sum of all gauge values).
+bool PrintMetrics(std::unique_ptr<MetricsService::Stub> stub, bool total_only) {
   grpc::ClientContext context;
   EmptyMessage message;
 
+  std::chrono::system_clock::time_point deadline =
+      std::chrono::system_clock::now() + std::chrono::seconds(kDeadlineSecs);
+
+  context.set_deadline(deadline);
+
   std::unique_ptr<grpc::ClientReader<GaugeResponse>> reader(
       stub->GetAllGauges(&context, message));
 
   GaugeResponse gauge_response;
   long overall_qps = 0;
-  int idx = 0;
   while (reader->Read(&gauge_response)) {
     if (gauge_response.value_case() == GaugeResponse::kLongValue) {
-      gpr_log(GPR_INFO, "Gauge: %d (%s: %ld)", ++idx,
-              gauge_response.name().c_str(), gauge_response.long_value());
+      if (!total_only) {
+        gpr_log(GPR_INFO, "%s: %ld", gauge_response.name().c_str(),
+                gauge_response.long_value());
+      }
       overall_qps += gauge_response.long_value();
     } else {
       gpr_log(GPR_INFO, "Gauge %s is not a long value",
@@ -77,12 +83,14 @@ void PrintMetrics(const grpc::string& server_address) {
     }
   }
 
-  gpr_log(GPR_INFO, "OVERALL: %ld", overall_qps);
+  gpr_log(GPR_INFO, "%ld", overall_qps);
 
   const grpc::Status status = reader->Finish();
   if (!status.ok()) {
     gpr_log(GPR_ERROR, "Error in getting metrics from the client");
   }
+
+  return status.ok();
 }
 
 int main(int argc, char** argv) {
@@ -97,7 +105,12 @@ int main(int argc, char** argv) {
     return 1;
   }
 
-  PrintMetrics(FLAGS_metrics_server_address);
+  std::shared_ptr<grpc::Channel> channel(grpc::CreateChannel(
+      FLAGS_metrics_server_address, grpc::InsecureChannelCredentials()));
+
+  if (!PrintMetrics(MetricsService::NewStub(channel), FLAGS_total_only)) {
+    return 1;
+  }
 
   return 0;
 }
diff --git a/test/cpp/util/metrics_server.cc b/test/cpp/util/metrics_server.cc
index 07978d0bdbfb9e237bc73c8f2fab4b0d625ab9fd..34d51eb3169e4e5c2416dda753d33c1c9c94fc6b 100644
--- a/test/cpp/util/metrics_server.cc
+++ b/test/cpp/util/metrics_server.cc
@@ -57,7 +57,7 @@ long Gauge::Get() {
 grpc::Status MetricsServiceImpl::GetAllGauges(
     ServerContext* context, const EmptyMessage* request,
     ServerWriter<GaugeResponse>* writer) {
-  gpr_log(GPR_INFO, "GetAllGauges called");
+  gpr_log(GPR_DEBUG, "GetAllGauges called");
 
   std::lock_guard<std::mutex> lock(mu_);
   for (auto it = gauges_.begin(); it != gauges_.end(); it++) {
diff --git a/tools/dockerfile/grpc_interop_stress_cxx/Dockerfile b/tools/dockerfile/grpc_interop_stress_cxx/Dockerfile
index 58a8c32e344fa4944c62e233eeb05151279be998..4123cc1a26ad4580c429caa1e8bceb0423e476e3 100644
--- a/tools/dockerfile/grpc_interop_stress_cxx/Dockerfile
+++ b/tools/dockerfile/grpc_interop_stress_cxx/Dockerfile
@@ -59,6 +59,8 @@ RUN apt-get update && apt-get install -y \
   wget \
   zip && apt-get clean
 
+RUN easy_install -U pip
+
 # Prepare ccache
 RUN ln -s /usr/bin/ccache /usr/local/bin/gcc
 RUN ln -s /usr/bin/ccache /usr/local/bin/g++
@@ -71,5 +73,8 @@ RUN ln -s /usr/bin/ccache /usr/local/bin/clang++
 # C++ dependencies
 RUN apt-get update && apt-get -y install libgflags-dev libgtest-dev libc++-dev clang
 
+# Google Cloud platform API libraries (for BigQuery)
+RUN pip install --upgrade google-api-python-client
+
 # Define the default command.
 CMD ["bash"]
diff --git a/tools/dockerfile/grpc_interop_stress_cxx/build_interop_stress.sh b/tools/dockerfile/grpc_interop_stress_cxx/build_interop_stress.sh
index 6ed3ccb3fa08e175020edbb8e7f8bfbcedf036c8..392bdfccda07b7f8966ce674dce176c53d4c0a3c 100755
--- a/tools/dockerfile/grpc_interop_stress_cxx/build_interop_stress.sh
+++ b/tools/dockerfile/grpc_interop_stress_cxx/build_interop_stress.sh
@@ -42,4 +42,4 @@ cd /var/local/git/grpc
 make install-certs
 
 # build C++ interop stress client, interop client and server
-make stress_test interop_client interop_server
+make stress_test metrics_client interop_client interop_server
diff --git a/tools/gcp/stress_test/run_client.py b/tools/gcp/stress_test/run_client.py
new file mode 100755
index 0000000000000000000000000000000000000000..0fa1bf1cb975d11cd75578ffeaab3142857aca14
--- /dev/null
+++ b/tools/gcp/stress_test/run_client.py
@@ -0,0 +1,187 @@
+#!/usr/bin/env python2.7
+# Copyright 2015-2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import datetime
+import os
+import re
+import select
+import subprocess
+import sys
+import time
+
+from stress_test_utils import EventType
+from stress_test_utils import BigQueryHelper
+
+
+# TODO (sree): Write a python grpc client to directly query the metrics instead
+# of calling metrics_client
+def _get_qps(metrics_cmd):
+  qps = 0
+  try:
+    # Note: gpr_log() writes even non-error messages to stderr stream. So it is 
+    # important that we set stderr=subprocess.STDOUT
+    p = subprocess.Popen(args=metrics_cmd,
+                         stdout=subprocess.PIPE,
+                         stderr=subprocess.STDOUT)
+    retcode = p.wait()
+    (out_str, err_str) = p.communicate()
+    if retcode != 0:
+      print 'Error in reading metrics information'
+      print 'Output: ', out_str
+    else:
+      # The overall qps is printed at the end of the line
+      m = re.search('\d+$', out_str)
+      qps = int(m.group()) if m else 0
+  except Exception as ex:
+    print 'Exception while reading metrics information: ' + str(ex)
+  return qps
+
+
+def run_client():
+  """This is a wrapper around the stress test client and performs the following:
+      1) Create the following two tables in Big Query:
+         (i) Summary table: To record events like the test started, completed
+                            successfully or failed
+        (ii) Qps table: To periodically record the QPS sent by this client
+      2) Start the stress test client and add a row in the Big Query summary
+         table
+      3) Once every few seconds (as specificed by the poll_interval_secs) poll
+         the status of the stress test client process and perform the
+         following:
+          3.1) If the process is still running, get the current qps by invoking
+               the metrics client program and add a row in the Big Query
+               Qps table. Sleep for a duration specified by poll_interval_secs
+          3.2) If the process exited successfully, add a row in the Big Query
+               Summary table and exit
+          3.3) If the process failed, add a row in Big Query summary table and
+               wait forever.
+               NOTE: This script typically runs inside a GKE pod which means
+               that the pod gets destroyed when the script exits. However, in
+               case the stress test client fails, we would not want the pod to
+               be destroyed (since we might want to connect to the pod for
+               examining logs). This is the reason why the script waits forever
+               in case of failures
+  """
+  env = dict(os.environ)
+  image_type = env['STRESS_TEST_IMAGE_TYPE']
+  image_name = env['STRESS_TEST_IMAGE']
+  args_str = env['STRESS_TEST_ARGS_STR']
+  metrics_client_image = env['METRICS_CLIENT_IMAGE']
+  metrics_client_args_str = env['METRICS_CLIENT_ARGS_STR']
+  run_id = env['RUN_ID']
+  pod_name = env['POD_NAME']
+  logfile_name = env.get('LOGFILE_NAME')
+  poll_interval_secs = float(env['POLL_INTERVAL_SECS'])
+  project_id = env['GCP_PROJECT_ID']
+  dataset_id = env['DATASET_ID']
+  summary_table_id = env['SUMMARY_TABLE_ID']
+  qps_table_id = env['QPS_TABLE_ID']
+
+  bq_helper = BigQueryHelper(run_id, image_type, pod_name, project_id,
+                             dataset_id, summary_table_id, qps_table_id)
+  bq_helper.initialize()
+
+  # Create BigQuery Dataset and Tables: Summary Table and Metrics Table
+  if not bq_helper.setup_tables():
+    print 'Error in creating BigQuery tables'
+    return
+
+  start_time = datetime.datetime.now()
+
+  logfile = None
+  details = 'Logging to stdout'
+  if logfile_name is not None:
+    print 'Opening logfile: %s ...' % logfile_name
+    details = 'Logfile: %s' % logfile_name
+    logfile = open(logfile_name, 'w')
+
+  # Update status that the test is starting (in the status table)
+  bq_helper.insert_summary_row(EventType.STARTING, details)
+
+  metrics_cmd = [metrics_client_image
+                ] + [x for x in metrics_client_args_str.split()]
+  stress_cmd = [image_name] + [x for x in args_str.split()]
+
+  print 'Launching process %s ...' % stress_cmd
+  stress_p = subprocess.Popen(args=stress_cmd,
+                              stdout=logfile,
+                              stderr=subprocess.STDOUT)
+
+  qps_history = [1, 1, 1]  # Maintain the last 3 qps readings
+  qps_history_idx = 0  # Index into the qps_history list
+
+  is_error = False
+  while True:
+    # Check if stress_client is still running. If so, collect metrics and upload
+    # to BigQuery status table
+    if stress_p.poll() is not None:
+      end_time = datetime.datetime.now().isoformat()
+      event_type = EventType.SUCCESS
+      details = 'End time: %s' % end_time
+      if stress_p.returncode != 0:
+        event_type = EventType.FAILURE
+        details = 'Return code = %d. End time: %s' % (stress_p.returncode,
+                                                      end_time)
+        is_error = True
+      bq_helper.insert_summary_row(event_type, details)
+      print details
+      break
+
+    # Stress client still running. Get metrics
+    qps = _get_qps(metrics_cmd)
+    qps_recorded_at = datetime.datetime.now().isoformat()
+    print 'qps: %d at %s' % (qps, qps_recorded_at)
+
+    # If QPS has been zero for the last 3 iterations, flag it as error and exit
+    qps_history[qps_history_idx] = qps
+    qps_history_idx = (qps_history_idx + 1) % len(qps_history)
+    if sum(qps_history) == 0:
+      details = 'QPS has been zero for the last %d seconds - as of : %s' % (
+          poll_interval_secs * 3, qps_recorded_at)
+      is_error = True
+      bq_helper.insert_summary_row(EventType.FAILURE, details)
+      print details
+      break
+
+    # Upload qps metrics to BiqQuery
+    bq_helper.insert_qps_row(qps, qps_recorded_at)
+
+    time.sleep(poll_interval_secs)
+
+  if is_error:
+    print 'Waiting indefinitely..'
+    select.select([], [], [])
+
+  print 'Completed'
+  return
+
+
+if __name__ == '__main__':
+  run_client()
diff --git a/tools/gcp/stress_test/run_server.py b/tools/gcp/stress_test/run_server.py
new file mode 100755
index 0000000000000000000000000000000000000000..64322f61004f22349c95dc25a8f22105159a0b95
--- /dev/null
+++ b/tools/gcp/stress_test/run_server.py
@@ -0,0 +1,120 @@
+#!/usr/bin/env python2.7
+# Copyright 2015-2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import datetime
+import os
+import select
+import subprocess
+import sys
+import time
+
+from stress_test_utils import BigQueryHelper
+from stress_test_utils import EventType
+
+
+def run_server():
+  """This is a wrapper around the interop server and performs the following:
+      1) Create a 'Summary table' in Big Query to record events like the server
+         started, completed successfully or failed. NOTE: This also creates
+         another table called the QPS table which is currently NOT needed on the
+         server (it is needed on the stress test clients)
+      2) Start the server process and add a row in Big Query summary table
+      3) Wait for the server process to terminate. The server process does not
+         terminate unless there is an error.
+         If the server process terminated with a failure, add a row in Big Query
+         and wait forever.
+         NOTE: This script typically runs inside a GKE pod which means that the
+         pod gets destroyed when the script exits. However, in case the server
+         process fails, we would not want the pod to be destroyed (since we
+         might want to connect to the pod for examining logs). This is the
+         reason why the script waits forever in case of failures.
+  """
+
+  # Read the parameters from environment variables
+  env = dict(os.environ)
+
+  run_id = env['RUN_ID']  # The unique run id for this test
+  image_type = env['STRESS_TEST_IMAGE_TYPE']
+  image_name = env['STRESS_TEST_IMAGE']
+  args_str = env['STRESS_TEST_ARGS_STR']
+  pod_name = env['POD_NAME']
+  project_id = env['GCP_PROJECT_ID']
+  dataset_id = env['DATASET_ID']
+  summary_table_id = env['SUMMARY_TABLE_ID']
+  qps_table_id = env['QPS_TABLE_ID']
+
+  logfile_name = env.get('LOGFILE_NAME')
+
+  print('pod_name: %s, project_id: %s, run_id: %s, dataset_id: %s, '
+        'summary_table_id: %s, qps_table_id: %s') % (
+            pod_name, project_id, run_id, dataset_id, summary_table_id,
+            qps_table_id)
+
+  bq_helper = BigQueryHelper(run_id, image_type, pod_name, project_id,
+                             dataset_id, summary_table_id, qps_table_id)
+  bq_helper.initialize()
+
+  # Create BigQuery Dataset and Tables: Summary Table and Metrics Table
+  if not bq_helper.setup_tables():
+    print 'Error in creating BigQuery tables'
+    return
+
+  start_time = datetime.datetime.now()
+
+  logfile = None
+  details = 'Logging to stdout'
+  if logfile_name is not None:
+    print 'Opening log file: ', logfile_name
+    logfile = open(logfile_name, 'w')
+    details = 'Logfile: %s' % logfile_name
+
+  # Update status that the test is starting (in the status table)
+  bq_helper.insert_summary_row(EventType.STARTING, details)
+
+  stress_cmd = [image_name] + [x for x in args_str.split()]
+
+  print 'Launching process %s ...' % stress_cmd
+  stress_p = subprocess.Popen(args=stress_cmd,
+                              stdout=logfile,
+                              stderr=subprocess.STDOUT)
+
+  returncode = stress_p.wait()
+  if returncode != 0:
+    end_time = datetime.datetime.now().isoformat()
+    event_type = EventType.FAILURE
+    details = 'Returncode: %d; End time: %s' % (returncode, end_time)
+    bq_helper.insert_summary_row(event_type, details)
+    print 'Waiting indefinitely..'
+    select.select([], [], [])
+  return returncode
+
+
+if __name__ == '__main__':
+  run_server()
diff --git a/tools/gcp/stress_test/stress_test_utils.py b/tools/gcp/stress_test/stress_test_utils.py
new file mode 100755
index 0000000000000000000000000000000000000000..c4b437e3459daf0d5314f21ac26caf4d37095e6f
--- /dev/null
+++ b/tools/gcp/stress_test/stress_test_utils.py
@@ -0,0 +1,197 @@
+#!/usr/bin/env python2.7
+# Copyright 2015-2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import datetime
+import json
+import os
+import re
+import select
+import subprocess
+import sys
+import time
+
+# Import big_query_utils module
+bq_utils_dir = os.path.abspath(os.path.join(
+    os.path.dirname(__file__), '../utils'))
+sys.path.append(bq_utils_dir)
+import big_query_utils as bq_utils
+
+
+class EventType:
+  STARTING = 'STARTING'
+  SUCCESS = 'SUCCESS'
+  FAILURE = 'FAILURE'
+
+
+class BigQueryHelper:
+  """Helper class for the stress test wrappers to interact with BigQuery.
+  """
+
+  def __init__(self, run_id, image_type, pod_name, project_id, dataset_id,
+               summary_table_id, qps_table_id):
+    self.run_id = run_id
+    self.image_type = image_type
+    self.pod_name = pod_name
+    self.project_id = project_id
+    self.dataset_id = dataset_id
+    self.summary_table_id = summary_table_id
+    self.qps_table_id = qps_table_id
+
+  def initialize(self):
+    self.bq = bq_utils.create_big_query()
+
+  def setup_tables(self):
+    return bq_utils.create_dataset(self.bq, self.project_id, self.dataset_id) \
+        and self.__create_summary_table() \
+        and self.__create_qps_table()
+
+  def insert_summary_row(self, event_type, details):
+    row_values_dict = {
+        'run_id': self.run_id,
+        'image_type': self.image_type,
+        'pod_name': self.pod_name,
+        'event_date': datetime.datetime.now().isoformat(),
+        'event_type': event_type,
+        'details': details
+    }
+    # row_unique_id is something that uniquely identifies the row (BigQuery uses
+    # it for duplicate detection).
+    row_unique_id = '%s_%s_%s' % (self.run_id, self.pod_name, event_type)
+    row = bq_utils.make_row(row_unique_id, row_values_dict)
+    return bq_utils.insert_rows(self.bq, self.project_id, self.dataset_id,
+                                self.summary_table_id, [row])
+
+  def insert_qps_row(self, qps, recorded_at):
+    row_values_dict = {
+        'run_id': self.run_id,
+        'pod_name': self.pod_name,
+        'recorded_at': recorded_at,
+        'qps': qps
+    }
+
+    # row_unique_id is something that uniquely identifies the row (BigQuery uses
+    # it for duplicate detection).
+    row_unique_id = '%s_%s_%s' % (self.run_id, self.pod_name, recorded_at)
+    row = bq_utils.make_row(row_unique_id, row_values_dict)
+    return bq_utils.insert_rows(self.bq, self.project_id, self.dataset_id,
+                                self.qps_table_id, [row])
+
+  def check_if_any_tests_failed(self, num_query_retries=3):
+    query = ('SELECT event_type FROM %s.%s WHERE run_id = \'%s\' AND '
+             'event_type="%s"') % (self.dataset_id, self.summary_table_id,
+                                   self.run_id, EventType.FAILURE)
+    query_job = bq_utils.sync_query_job(self.bq, self.project_id, query)
+    page = self.bq.jobs().getQueryResults(**query_job['jobReference']).execute(
+        num_retries=num_query_retries)
+    num_failures = int(page['totalRows'])
+    print 'num rows: ', num_failures
+    return num_failures > 0
+
+  def print_summary_records(self, num_query_retries=3):
+    line = '-' * 120
+    print line
+    print 'Summary records'
+    print 'Run Id: ', self.run_id
+    print 'Dataset Id: ', self.dataset_id
+    print line
+    query = ('SELECT pod_name, image_type, event_type, event_date, details'
+             ' FROM %s.%s WHERE run_id = \'%s\' ORDER by event_date;') % (
+                 self.dataset_id, self.summary_table_id, self.run_id)
+    query_job = bq_utils.sync_query_job(self.bq, self.project_id, query)
+
+    print '{:<25} {:<12} {:<12} {:<30} {}'.format(
+        'Pod name', 'Image type', 'Event type', 'Date', 'Details')
+    print line
+    page_token = None
+    while True:
+      page = self.bq.jobs().getQueryResults(
+          pageToken=page_token,
+          **query_job['jobReference']).execute(num_retries=num_query_retries)
+      rows = page.get('rows', [])
+      for row in rows:
+        print '{:<25} {:<12} {:<12} {:<30} {}'.format(
+            row['f'][0]['v'], row['f'][1]['v'], row['f'][2]['v'],
+            row['f'][3]['v'], row['f'][4]['v'])
+      page_token = page.get('pageToken')
+      if not page_token:
+        break
+
+  def print_qps_records(self, num_query_retries=3):
+    line = '-' * 80
+    print line
+    print 'QPS Summary'
+    print 'Run Id: ', self.run_id
+    print 'Dataset Id: ', self.dataset_id
+    print line
+    query = (
+        'SELECT pod_name, recorded_at, qps FROM %s.%s WHERE run_id = \'%s\' '
+        'ORDER by recorded_at;') % (self.dataset_id, self.qps_table_id,
+                                    self.run_id)
+    query_job = bq_utils.sync_query_job(self.bq, self.project_id, query)
+    print '{:<25} {:30} {}'.format('Pod name', 'Recorded at', 'Qps')
+    print line
+    page_token = None
+    while True:
+      page = self.bq.jobs().getQueryResults(
+          pageToken=page_token,
+          **query_job['jobReference']).execute(num_retries=num_query_retries)
+      rows = page.get('rows', [])
+      for row in rows:
+        print '{:<25} {:30} {}'.format(row['f'][0]['v'], row['f'][1]['v'],
+                                       row['f'][2]['v'])
+      page_token = page.get('pageToken')
+      if not page_token:
+        break
+
+  def __create_summary_table(self):
+    summary_table_schema = [
+        ('run_id', 'STRING', 'Test run id'),
+        ('image_type', 'STRING', 'Client or Server?'),
+        ('pod_name', 'STRING', 'GKE pod hosting this image'),
+        ('event_date', 'STRING', 'The date of this event'),
+        ('event_type', 'STRING', 'STARTED/SUCCESS/FAILURE'),
+        ('details', 'STRING', 'Any other relevant details')
+    ]
+    desc = ('The table that contains START/SUCCESS/FAILURE events for '
+            ' the stress test clients and servers')
+    return bq_utils.create_table(self.bq, self.project_id, self.dataset_id,
+                                 self.summary_table_id, summary_table_schema,
+                                 desc)
+
+  def __create_qps_table(self):
+    qps_table_schema = [
+        ('run_id', 'STRING', 'Test run id'),
+        ('pod_name', 'STRING', 'GKE pod hosting this image'),
+        ('recorded_at', 'STRING', 'Metrics recorded at time'),
+        ('qps', 'INTEGER', 'Queries per second')
+    ]
+    desc = 'The table that cointains the qps recorded at various intervals'
+    return bq_utils.create_table(self.bq, self.project_id, self.dataset_id,
+                                 self.qps_table_id, qps_table_schema, desc)
diff --git a/tools/gcp/utils/big_query_utils.py b/tools/gcp/utils/big_query_utils.py
new file mode 100755
index 0000000000000000000000000000000000000000..7bb1e143549154279e3e90b8b5d93871303ec0b6
--- /dev/null
+++ b/tools/gcp/utils/big_query_utils.py
@@ -0,0 +1,140 @@
+#!/usr/bin/env python2.7
+# Copyright 2015-2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import argparse
+import json
+import uuid
+import httplib2
+
+from apiclient import discovery
+from apiclient.errors import HttpError
+from oauth2client.client import GoogleCredentials
+
+NUM_RETRIES = 3
+
+
+def create_big_query():
+  """Authenticates with cloud platform and gets a BiqQuery service object
+  """
+  creds = GoogleCredentials.get_application_default()
+  return discovery.build('bigquery', 'v2', credentials=creds)
+
+
+def create_dataset(biq_query, project_id, dataset_id):
+  is_success = True
+  body = {
+      'datasetReference': {
+          'projectId': project_id,
+          'datasetId': dataset_id
+      }
+  }
+
+  try:
+    dataset_req = biq_query.datasets().insert(projectId=project_id, body=body)
+    dataset_req.execute(num_retries=NUM_RETRIES)
+  except HttpError as http_error:
+    if http_error.resp.status == 409:
+      print 'Warning: The dataset %s already exists' % dataset_id
+    else:
+      # Note: For more debugging info, print "http_error.content"
+      print 'Error in creating dataset: %s. Err: %s' % (dataset_id, http_error)
+      is_success = False
+  return is_success
+
+
+def create_table(big_query, project_id, dataset_id, table_id, table_schema,
+                 description):
+  is_success = True
+
+  body = {
+      'description': description,
+      'schema': {
+          'fields': [{
+              'name': field_name,
+              'type': field_type,
+              'description': field_description
+          } for (field_name, field_type, field_description) in table_schema]
+      },
+      'tableReference': {
+          'datasetId': dataset_id,
+          'projectId': project_id,
+          'tableId': table_id
+      }
+  }
+
+  try:
+    table_req = big_query.tables().insert(projectId=project_id,
+                                          datasetId=dataset_id,
+                                          body=body)
+    res = table_req.execute(num_retries=NUM_RETRIES)
+    print 'Successfully created %s "%s"' % (res['kind'], res['id'])
+  except HttpError as http_error:
+    if http_error.resp.status == 409:
+      print 'Warning: Table %s already exists' % table_id
+    else:
+      print 'Error in creating table: %s. Err: %s' % (table_id, http_error)
+      is_success = False
+  return is_success
+
+
+def insert_rows(big_query, project_id, dataset_id, table_id, rows_list):
+  is_success = True
+  body = {'rows': rows_list}
+  try:
+    insert_req = big_query.tabledata().insertAll(projectId=project_id,
+                                                 datasetId=dataset_id,
+                                                 tableId=table_id,
+                                                 body=body)
+    print body
+    res = insert_req.execute(num_retries=NUM_RETRIES)
+    print res
+  except HttpError as http_error:
+    print 'Error in inserting rows in the table %s' % table_id
+    is_success = False
+  return is_success
+
+
+def sync_query_job(big_query, project_id, query, timeout=5000):
+  query_data = {'query': query, 'timeoutMs': timeout}
+  query_job = None
+  try:
+    query_job = big_query.jobs().query(
+        projectId=project_id,
+        body=query_data).execute(num_retries=NUM_RETRIES)
+  except HttpError as http_error:
+    print 'Query execute job failed with error: %s' % http_error
+    print http_error.content
+  return query_job
+
+  # List of (column name, column type, description) tuples
+def make_row(unique_row_id, row_values_dict):
+  """row_values_dict is a dictionary of column name and column value.
+  """
+  return {'insertId': unique_row_id, 'json': row_values_dict}
diff --git a/tools/gke/kubernetes_api.py b/tools/gcp/utils/kubernetes_api.py
similarity index 74%
rename from tools/gke/kubernetes_api.py
rename to tools/gcp/utils/kubernetes_api.py
index 7dd30153650383a5669e07cb82283f226b7ec968..e8ddd2f1b3502dff664f61a1933833d6fc271108 100755
--- a/tools/gke/kubernetes_api.py
+++ b/tools/gcp/utils/kubernetes_api.py
@@ -1,5 +1,5 @@
 #!/usr/bin/env python2.7
-# Copyright 2015, Google Inc.
+# Copyright 2015-2016, Google Inc.
 # All rights reserved.
 #
 # Redistribution and use in source and binary forms, with or without
@@ -33,8 +33,9 @@ import json
 
 _REQUEST_TIMEOUT_SECS = 10
 
+
 def _make_pod_config(pod_name, image_name, container_port_list, cmd_list,
-                    arg_list):
+                     arg_list, env_dict):
   """Creates a string containing the Pod defintion as required by the Kubernetes API"""
   body = {
       'kind': 'Pod',
@@ -48,20 +49,23 @@ def _make_pod_config(pod_name, image_name, container_port_list, cmd_list,
               {
                   'name': pod_name,
                   'image': image_name,
-                  'ports': []
+                  'ports': [{'containerPort': port,
+                             'protocol': 'TCP'}
+                            for port in container_port_list],
+                  'imagePullPolicy': 'Always'
               }
           ]
       }
   }
-  # Populate the 'ports' list
-  for port in container_port_list:
-    port_entry = {'containerPort': port, 'protocol': 'TCP'}
-    body['spec']['containers'][0]['ports'].append(port_entry)
+
+  env_list = [{'name': k, 'value': v} for (k, v) in env_dict.iteritems()]
+  if len(env_list) > 0:
+    body['spec']['containers'][0]['env'] = env_list
 
   # Add the 'Command' and 'Args' attributes if they are passed.
   # Note:
   #  - 'Command' overrides the ENTRYPOINT in the Docker Image
-  #  - 'Args' override the COMMAND in Docker image (yes, it is confusing!)
+  #  - 'Args' override the CMD in Docker image (yes, it is confusing!)
   if len(cmd_list) > 0:
     body['spec']['containers'][0]['command'] = cmd_list
   if len(arg_list) > 0:
@@ -70,7 +74,7 @@ def _make_pod_config(pod_name, image_name, container_port_list, cmd_list,
 
 
 def _make_service_config(service_name, pod_name, service_port_list,
-                        container_port_list, is_headless):
+                         container_port_list, is_headless):
   """Creates a string containing the Service definition as required by the Kubernetes API.
 
   NOTE:
@@ -124,6 +128,7 @@ def _print_connection_error(msg):
   print('ERROR: Connection failed. Did you remember to run Kubenetes proxy on '
         'localhost (i.e kubectl proxy --port=<proxy_port>) ?. Error: %s' % msg)
 
+
 def _do_post(post_url, api_name, request_body):
   """Helper to do HTTP POST.
 
@@ -135,7 +140,9 @@ def _do_post(post_url, api_name, request_body):
   """
   is_success = True
   try:
-    r = requests.post(post_url, data=request_body, timeout=_REQUEST_TIMEOUT_SECS)
+    r = requests.post(post_url,
+                      data=request_body,
+                      timeout=_REQUEST_TIMEOUT_SECS)
     if r.status_code == requests.codes.conflict:
       print('WARN: Looks like the resource already exists. Api: %s, url: %s' %
             (api_name, post_url))
@@ -143,7 +150,8 @@ def _do_post(post_url, api_name, request_body):
       print('ERROR: %s API returned error. HTTP response: (%d) %s' %
             (api_name, r.status_code, r.text))
       is_success = False
-  except(requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
+  except (requests.exceptions.Timeout,
+          requests.exceptions.ConnectionError) as e:
     is_success = False
     _print_connection_error(str(e))
   return is_success
@@ -165,7 +173,8 @@ def _do_delete(del_url, api_name):
       print('ERROR: %s API returned error. HTTP response: %s' %
             (api_name, r.text))
       is_success = False
-  except(requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
+  except (requests.exceptions.Timeout,
+          requests.exceptions.ConnectionError) as e:
     is_success = False
     _print_connection_error(str(e))
   return is_success
@@ -179,12 +188,12 @@ def create_service(kube_host, kube_port, namespace, service_name, pod_name,
   post_url = 'http://%s:%d/api/v1/namespaces/%s/services' % (
       kube_host, kube_port, namespace)
   request_body = _make_service_config(service_name, pod_name, service_port_list,
-                                     container_port_list, is_headless)
+                                      container_port_list, is_headless)
   return _do_post(post_url, 'Create Service', request_body)
 
 
 def create_pod(kube_host, kube_port, namespace, pod_name, image_name,
-               container_port_list, cmd_list, arg_list):
+               container_port_list, cmd_list, arg_list, env_dict):
   """Creates a Kubernetes Pod.
 
   Note that it is generally NOT considered a good practice to directly create
@@ -200,7 +209,7 @@ def create_pod(kube_host, kube_port, namespace, pod_name, image_name,
   post_url = 'http://%s:%d/api/v1/namespaces/%s/pods' % (kube_host, kube_port,
                                                          namespace)
   request_body = _make_pod_config(pod_name, image_name, container_port_list,
-                                 cmd_list, arg_list)
+                                  cmd_list, arg_list, env_dict)
   return _do_post(post_url, 'Create Pod', request_body)
 
 
@@ -214,3 +223,47 @@ def delete_pod(kube_host, kube_port, namespace, pod_name):
   del_url = 'http://%s:%d/api/v1/namespaces/%s/pods/%s' % (kube_host, kube_port,
                                                            namespace, pod_name)
   return _do_delete(del_url, 'Delete Pod')
+
+
+def create_pod_and_service(kube_host, kube_port, namespace, pod_name,
+                           image_name, container_port_list, cmd_list, arg_list,
+                           env_dict, is_headless_service):
+  """A helper function that creates a pod and a service (if pod creation was successful)."""
+  is_success = create_pod(kube_host, kube_port, namespace, pod_name, image_name,
+                          container_port_list, cmd_list, arg_list, env_dict)
+  if not is_success:
+    print 'Error in creating Pod'
+    return False
+
+  is_success = create_service(
+      kube_host,
+      kube_port,
+      namespace,
+      pod_name,  # Use pod_name for service
+      pod_name,
+      container_port_list,  # Service port list same as container port list
+      container_port_list,
+      is_headless_service)
+  if not is_success:
+    print 'Error in creating Service'
+    return False
+
+  print 'Successfully created the pod/service %s' % pod_name
+  return True
+
+
+def delete_pod_and_service(kube_host, kube_port, namespace, pod_name):
+  """ A helper function that calls delete_pod and delete_service """
+  is_success = delete_pod(kube_host, kube_port, namespace, pod_name)
+  if not is_success:
+    print 'Error in deleting pod %s' % pod_name
+    return False
+
+  # Note: service name assumed to the the same as pod name
+  is_success = delete_service(kube_host, kube_port, namespace, pod_name)
+  if not is_success:
+    print 'Error in deleting service %s' % pod_name
+    return False
+
+  print 'Successfully deleted the Pod/Service: %s' % pod_name
+  return True
diff --git a/tools/jenkins/build_interop_stress_image.sh b/tools/jenkins/build_interop_stress_image.sh
index 92f2dab5e34be0bc8012137e6d00527aeaffb1ef..501dc5b7ca4a41ff801c5bf6550426b652adb8e2 100755
--- a/tools/jenkins/build_interop_stress_image.sh
+++ b/tools/jenkins/build_interop_stress_image.sh
@@ -35,6 +35,8 @@ set -x
 
 # Params:
 #  INTEROP_IMAGE - name of tag of the final interop image
+#  INTEROP_IMAGE_TAG - Optional. If set, the created image will be tagged using
+#    the command: 'docker tag $INTEROP_IMAGE $INTEROP_IMAGE_REPOSITORY_TAG'
 #  BASE_NAME - base name used to locate the base Dockerfile and build script
 #  TTY_FLAG - optional -t flag to make docker allocate tty
 #  BUILD_INTEROP_DOCKER_EXTRA_ARGS - optional args to be passed to the
@@ -77,6 +79,7 @@ CONTAINER_NAME="build_${BASE_NAME}_$(uuidgen)"
   $BASE_IMAGE \
   bash -l /var/local/jenkins/grpc/tools/dockerfile/$BASE_NAME/build_interop_stress.sh \
   && docker commit $CONTAINER_NAME $INTEROP_IMAGE \
+  && ( if [ -n "$INTEROP_IMAGE_REPOSITORY_TAG" ]; then docker tag -f $INTEROP_IMAGE $INTEROP_IMAGE_REPOSITORY_TAG ; fi ) \
   && echo "Successfully built image $INTEROP_IMAGE")
 EXITCODE=$?
 
diff --git a/tools/run_tests/stress_test/run_stress_tests_on_gke.py b/tools/run_tests/stress_test/run_stress_tests_on_gke.py
new file mode 100755
index 0000000000000000000000000000000000000000..634eb1aca53265dc4fe6f2842f089d86c21b71bb
--- /dev/null
+++ b/tools/run_tests/stress_test/run_stress_tests_on_gke.py
@@ -0,0 +1,556 @@
+#!/usr/bin/env python2.7
+# Copyright 2015-2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+import argparse
+import datetime
+import os
+import subprocess
+import sys
+import time
+
+stress_test_utils_dir = os.path.abspath(os.path.join(
+    os.path.dirname(__file__), '../../gcp/stress_test'))
+sys.path.append(stress_test_utils_dir)
+from stress_test_utils import BigQueryHelper
+
+kubernetes_api_dir = os.path.abspath(os.path.join(
+    os.path.dirname(__file__), '../../gcp/utils'))
+sys.path.append(kubernetes_api_dir)
+
+import kubernetes_api
+
+_GRPC_ROOT = os.path.abspath(os.path.join(
+    os.path.dirname(sys.argv[0]), '../../..'))
+os.chdir(_GRPC_ROOT)
+
+# num of seconds to wait for the GKE image to start and warmup
+_GKE_IMAGE_WARMUP_WAIT_SECS = 60
+
+_SERVER_POD_NAME = 'stress-server'
+_CLIENT_POD_NAME_PREFIX = 'stress-client'
+_DATASET_ID_PREFIX = 'stress_test'
+_SUMMARY_TABLE_ID = 'summary'
+_QPS_TABLE_ID = 'qps'
+
+_DEFAULT_DOCKER_IMAGE_NAME = 'grpc_stress_test'
+
+# The default port on which the kubernetes proxy server is started on localhost
+# (i.e kubectl proxy --port=<port>)
+_DEFAULT_KUBERNETES_PROXY_PORT = 8001
+
+# How frequently should the stress client wrapper script (running inside a GKE
+# container) poll the health of the stress client (also running inside the GKE
+# container) and upload metrics to BigQuery
+_DEFAULT_STRESS_CLIENT_POLL_INTERVAL_SECS = 60
+
+# The default setting for stress test server and client
+_DEFAULT_STRESS_SERVER_PORT = 8080
+_DEFAULT_METRICS_PORT = 8081
+_DEFAULT_TEST_CASES_STR = 'empty_unary:1,large_unary:1,client_streaming:1,server_streaming:1,empty_stream:1'
+_DEFAULT_NUM_CHANNELS_PER_SERVER = 5
+_DEFAULT_NUM_STUBS_PER_CHANNEL = 10
+_DEFAULT_METRICS_COLLECTION_INTERVAL_SECS = 30
+
+# Number of stress client instances to launch
+_DEFAULT_NUM_CLIENTS = 3
+
+# How frequently should this test monitor the health of Stress clients and
+# Servers running in GKE
+_DEFAULT_TEST_POLL_INTERVAL_SECS = 60
+
+# Default run time for this test (2 hour)
+_DEFAULT_TEST_DURATION_SECS = 7200
+
+# The number of seconds it would take a GKE pod to warm up (i.e get to 'Running'
+# state from the time of creation). Ideally this is something the test should
+# automatically determine by using Kubernetes API to poll the pods status.
+_DEFAULT_GKE_WARMUP_SECS = 60
+
+
+class KubernetesProxy:
+  """ Class to start a proxy on localhost to the Kubernetes API server """
+
+  def __init__(self, api_port):
+    self.port = api_port
+    self.p = None
+    self.started = False
+
+  def start(self):
+    cmd = ['kubectl', 'proxy', '--port=%d' % self.port]
+    self.p = subprocess.Popen(args=cmd)
+    self.started = True
+    time.sleep(2)
+    print '..Started'
+
+  def get_port(self):
+    return self.port
+
+  def is_started(self):
+    return self.started
+
+  def __del__(self):
+    if self.p is not None:
+      print 'Shutting down Kubernetes proxy..'
+      self.p.kill()
+
+
+class TestSettings:
+
+  def __init__(self, build_docker_image, test_poll_interval_secs,
+               test_duration_secs, kubernetes_proxy_port):
+    self.build_docker_image = build_docker_image
+    self.test_poll_interval_secs = test_poll_interval_secs
+    self.test_duration_secs = test_duration_secs
+    self.kubernetes_proxy_port = kubernetes_proxy_port
+
+
+class GkeSettings:
+
+  def __init__(self, project_id, docker_image_name):
+    self.project_id = project_id
+    self.docker_image_name = docker_image_name
+    self.tag_name = 'gcr.io/%s/%s' % (project_id, docker_image_name)
+
+
+class BigQuerySettings:
+
+  def __init__(self, run_id, dataset_id, summary_table_id, qps_table_id):
+    self.run_id = run_id
+    self.dataset_id = dataset_id
+    self.summary_table_id = summary_table_id
+    self.qps_table_id = qps_table_id
+
+
+class StressServerSettings:
+
+  def __init__(self, server_pod_name, server_port):
+    self.server_pod_name = server_pod_name
+    self.server_port = server_port
+
+
+class StressClientSettings:
+
+  def __init__(self, num_clients, client_pod_name_prefix, server_pod_name,
+               server_port, metrics_port, metrics_collection_interval_secs,
+               stress_client_poll_interval_secs, num_channels_per_server,
+               num_stubs_per_channel, test_cases_str):
+    self.num_clients = num_clients
+    self.client_pod_name_prefix = client_pod_name_prefix
+    self.server_pod_name = server_pod_name
+    self.server_port = server_port
+    self.metrics_port = metrics_port
+    self.metrics_collection_interval_secs = metrics_collection_interval_secs
+    self.stress_client_poll_interval_secs = stress_client_poll_interval_secs
+    self.num_channels_per_server = num_channels_per_server
+    self.num_stubs_per_channel = num_stubs_per_channel
+    self.test_cases_str = test_cases_str
+
+    # == Derived properties ==
+    # Note: Client can accept a list of server addresses (a comma separated list
+    # of 'server_name:server_port'). In this case, we only have one server
+    # address to pass
+    self.server_addresses = '%s.default.svc.cluster.local:%d' % (
+        server_pod_name, server_port)
+    self.client_pod_names_list = ['%s-%d' % (client_pod_name_prefix, i)
+                                  for i in range(1, num_clients + 1)]
+
+
+def _build_docker_image(image_name, tag_name):
+  """ Build the docker image and add tag it to the GKE repository """
+  print 'Building docker image: %s' % image_name
+  os.environ['INTEROP_IMAGE'] = image_name
+  os.environ['INTEROP_IMAGE_REPOSITORY_TAG'] = tag_name
+  # Note that 'BASE_NAME' HAS to be 'grpc_interop_stress_cxx' since the script
+  # build_interop_stress_image.sh invokes the following script:
+  #   tools/dockerfile/$BASE_NAME/build_interop_stress.sh
+  os.environ['BASE_NAME'] = 'grpc_interop_stress_cxx'
+  cmd = ['tools/jenkins/build_interop_stress_image.sh']
+  retcode = subprocess.call(args=cmd)
+  if retcode != 0:
+    print 'Error in building docker image'
+    return False
+  return True
+
+
+def _push_docker_image_to_gke_registry(docker_tag_name):
+  """Executes 'gcloud docker push <docker_tag_name>' to push the image to GKE registry"""
+  cmd = ['gcloud', 'docker', 'push', docker_tag_name]
+  print 'Pushing %s to GKE registry..' % docker_tag_name
+  retcode = subprocess.call(args=cmd)
+  if retcode != 0:
+    print 'Error in pushing docker image %s to the GKE registry' % docker_tag_name
+    return False
+  return True
+
+
+def _launch_server(gke_settings, stress_server_settings, bq_settings,
+                   kubernetes_proxy):
+  """ Launches a stress test server instance in GKE cluster """
+  if not kubernetes_proxy.is_started:
+    print 'Kubernetes proxy must be started before calling this function'
+    return False
+
+  # This is the wrapper script that is run in the container. This script runs
+  # the actual stress test server
+  server_cmd_list = ['/var/local/git/grpc/tools/gcp/stress_test/run_server.py']
+
+  # run_server.py does not take any args from the command line. The args are
+  # instead passed via environment variables (see server_env below)
+  server_arg_list = []
+
+  # The parameters to the script run_server.py are injected into the container
+  # via environment variables
+  server_env = {
+      'STRESS_TEST_IMAGE_TYPE': 'SERVER',
+      'STRESS_TEST_IMAGE': '/var/local/git/grpc/bins/opt/interop_server',
+      'STRESS_TEST_ARGS_STR': '--port=%s' % stress_server_settings.server_port,
+      'RUN_ID': bq_settings.run_id,
+      'POD_NAME': stress_server_settings.server_pod_name,
+      'GCP_PROJECT_ID': gke_settings.project_id,
+      'DATASET_ID': bq_settings.dataset_id,
+      'SUMMARY_TABLE_ID': bq_settings.summary_table_id,
+      'QPS_TABLE_ID': bq_settings.qps_table_id
+  }
+
+  # Launch Server
+  is_success = kubernetes_api.create_pod_and_service(
+      'localhost',
+      kubernetes_proxy.get_port(),
+      'default',  # Use 'default' namespace
+      stress_server_settings.server_pod_name,
+      gke_settings.tag_name,
+      [stress_server_settings.server_port],  # Port that should be exposed
+      server_cmd_list,
+      server_arg_list,
+      server_env,
+      True  # Headless = True for server. Since we want DNS records to be created by GKE
+  )
+
+  return is_success
+
+
+def _launch_client(gke_settings, stress_server_settings, stress_client_settings,
+                   bq_settings, kubernetes_proxy):
+  """ Launches a configurable number of stress test clients on GKE cluster """
+  if not kubernetes_proxy.is_started:
+    print 'Kubernetes proxy must be started before calling this function'
+    return False
+
+  stress_client_arg_list = [
+      '--server_addresses=%s' % stress_client_settings.server_addresses,
+      '--test_cases=%s' % stress_client_settings.test_cases_str,
+      '--num_stubs_per_channel=%d' %
+      stress_client_settings.num_stubs_per_channel
+  ]
+
+  # This is the wrapper script that is run in the container. This script runs
+  # the actual stress client
+  client_cmd_list = ['/var/local/git/grpc/tools/gcp/stress_test/run_client.py']
+
+  # run_client.py takes no args. All args are passed as env variables (see
+  # client_env)
+  client_arg_list = []
+
+  metrics_server_address = 'localhost:%d' % stress_client_settings.metrics_port
+  metrics_client_arg_list = [
+      '--metrics_server_address=%s' % metrics_server_address,
+      '--total_only=true'
+  ]
+
+  # The parameters to the script run_client.py are injected into the container
+  # via environment variables
+  client_env = {
+      'STRESS_TEST_IMAGE_TYPE': 'CLIENT',
+      'STRESS_TEST_IMAGE': '/var/local/git/grpc/bins/opt/stress_test',
+      'STRESS_TEST_ARGS_STR': ' '.join(stress_client_arg_list),
+      'METRICS_CLIENT_IMAGE': '/var/local/git/grpc/bins/opt/metrics_client',
+      'METRICS_CLIENT_ARGS_STR': ' '.join(metrics_client_arg_list),
+      'RUN_ID': bq_settings.run_id,
+      'POLL_INTERVAL_SECS':
+          str(stress_client_settings.stress_client_poll_interval_secs),
+      'GCP_PROJECT_ID': gke_settings.project_id,
+      'DATASET_ID': bq_settings.dataset_id,
+      'SUMMARY_TABLE_ID': bq_settings.summary_table_id,
+      'QPS_TABLE_ID': bq_settings.qps_table_id
+  }
+
+  for pod_name in stress_client_settings.client_pod_names_list:
+    client_env['POD_NAME'] = pod_name
+    is_success = kubernetes_api.create_pod_and_service(
+        'localhost',  # Since proxy is running on localhost
+        kubernetes_proxy.get_port(),
+        'default',  # default namespace
+        pod_name,
+        gke_settings.tag_name,
+        [stress_client_settings.metrics_port
+        ],  # Client pods expose metrics port
+        client_cmd_list,
+        client_arg_list,
+        client_env,
+        False  # Client is not a headless service
+    )
+    if not is_success:
+      print 'Error in launching client %s' % pod_name
+      return False
+
+  return True
+
+
+def _launch_server_and_client(gke_settings, stress_server_settings,
+                              stress_client_settings, bq_settings,
+                              kubernetes_proxy_port):
+  # Start kubernetes proxy
+  print 'Kubernetes proxy'
+  kubernetes_proxy = KubernetesProxy(kubernetes_proxy_port)
+  kubernetes_proxy.start()
+
+  print 'Launching server..'
+  is_success = _launch_server(gke_settings, stress_server_settings, bq_settings,
+                              kubernetes_proxy)
+  if not is_success:
+    print 'Error in launching server'
+    return False
+
+  # Server takes a while to start.
+  # TODO(sree) Use Kubernetes API to query the status of the server instead of
+  # sleeping
+  print 'Waiting for %s seconds for the server to start...' % _GKE_IMAGE_WARMUP_WAIT_SECS
+  time.sleep(_GKE_IMAGE_WARMUP_WAIT_SECS)
+
+  # Launch client
+  client_pod_name_prefix = 'stress-client'
+  is_success = _launch_client(gke_settings, stress_server_settings,
+                              stress_client_settings, bq_settings,
+                              kubernetes_proxy)
+
+  if not is_success:
+    print 'Error in launching client(s)'
+    return False
+
+  print 'Waiting for %s seconds for the client images to start...' % _GKE_IMAGE_WARMUP_WAIT_SECS
+  time.sleep(_GKE_IMAGE_WARMUP_WAIT_SECS)
+  return True
+
+
+def _delete_server_and_client(stress_server_settings, stress_client_settings,
+                              kubernetes_proxy_port):
+  kubernetes_proxy = KubernetesProxy(kubernetes_proxy_port)
+  kubernetes_proxy.start()
+
+  # Delete clients first
+  is_success = True
+  for pod_name in stress_client_settings.client_pod_names_list:
+    is_success = kubernetes_api.delete_pod_and_service(
+        'localhost', kubernetes_proxy_port, 'default', pod_name)
+    if not is_success:
+      return False
+
+  # Delete server
+  is_success = kubernetes_api.delete_pod_and_service(
+      'localhost', kubernetes_proxy_port, 'default',
+      stress_server_settings.server_pod_name)
+  return is_success
+
+
+def run_test_main(test_settings, gke_settings, stress_server_settings,
+                  stress_client_clients):
+  is_success = True
+
+  if test_settings.build_docker_image:
+    is_success = _build_docker_image(gke_settings.docker_image_name,
+                                     gke_settings.tag_name)
+    if not is_success:
+      return False
+
+    is_success = _push_docker_image_to_gke_registry(gke_settings.tag_name)
+    if not is_success:
+      return False
+
+  # Create a unique id for this run (Note: Using timestamp instead of UUID to
+  # make it easier to deduce the date/time of the run just by looking at the run
+  # run id. This is useful in debugging when looking at records in Biq query)
+  run_id = datetime.datetime.now().strftime('%Y_%m_%d_%H_%M_%S')
+  dataset_id = '%s_%s' % (_DATASET_ID_PREFIX, run_id)
+
+  # Big Query settings (common for both Stress Server and Client)
+  bq_settings = BigQuerySettings(run_id, dataset_id, _SUMMARY_TABLE_ID,
+                                 _QPS_TABLE_ID)
+
+  bq_helper = BigQueryHelper(run_id, '', '', args.project_id, dataset_id,
+                             _SUMMARY_TABLE_ID, _QPS_TABLE_ID)
+  bq_helper.initialize()
+
+  try:
+    is_success = _launch_server_and_client(gke_settings, stress_server_settings,
+                                           stress_client_settings, bq_settings,
+                                           test_settings.kubernetes_proxy_port)
+    if not is_success:
+      return False
+
+    start_time = datetime.datetime.now()
+    end_time = start_time + datetime.timedelta(
+        seconds=test_settings.test_duration_secs)
+    print 'Running the test until %s' % end_time.isoformat()
+
+    while True:
+      if datetime.datetime.now() > end_time:
+        print 'Test was run for %d seconds' % test_settings.test_duration_secs
+        break
+
+      # Check if either stress server or clients have failed
+      if bq_helper.check_if_any_tests_failed():
+        is_success = False
+        print 'Some tests failed.'
+        break
+
+      # Things seem to be running fine. Wait until next poll time to check the
+      # status
+      print 'Sleeping for %d seconds..' % test_settings.test_poll_interval_secs
+      time.sleep(test_settings.test_poll_interval_secs)
+
+    # Print BiqQuery tables
+    bq_helper.print_summary_records()
+    bq_helper.print_qps_records()
+
+  finally:
+    # If is_success is False at this point, it means that the stress tests were
+    # started successfully but failed while running the tests. In this case we
+    # do should not delete the pods (since they contain all the failure
+    # information)
+    if is_success:
+      _delete_server_and_client(stress_server_settings, stress_client_settings,
+                                test_settings.kubernetes_proxy_port)
+
+  return is_success
+
+
+argp = argparse.ArgumentParser(
+    description='Launch stress tests in GKE',
+    formatter_class=argparse.ArgumentDefaultsHelpFormatter)
+argp.add_argument('--project_id',
+                  required=True,
+                  help='The Google Cloud Platform Project Id')
+argp.add_argument('--num_clients',
+                  default=1,
+                  type=int,
+                  help='Number of client instances to start')
+argp.add_argument('--docker_image_name',
+                  default=_DEFAULT_DOCKER_IMAGE_NAME,
+                  help='The name of the docker image containing stress client '
+                  'and stress servers')
+argp.add_argument('--build_docker_image',
+                  dest='build_docker_image',
+                  action='store_true',
+                  help='Build a docker image and push to Google Container '
+                  'Registry')
+argp.add_argument('--do_not_build_docker_image',
+                  dest='build_docker_image',
+                  action='store_false',
+                  help='Do not build and push docker image to Google Container '
+                  'Registry')
+argp.set_defaults(build_docker_image=True)
+
+argp.add_argument('--test_poll_interval_secs',
+                  default=_DEFAULT_TEST_POLL_INTERVAL_SECS,
+                  type=int,
+                  help='How frequently should this script should monitor the '
+                  'health of stress clients and servers running in the GKE '
+                  'cluster')
+argp.add_argument('--test_duration_secs',
+                  default=_DEFAULT_TEST_DURATION_SECS,
+                  type=int,
+                  help='How long should this test be run')
+argp.add_argument('--kubernetes_proxy_port',
+                  default=_DEFAULT_KUBERNETES_PROXY_PORT,
+                  type=int,
+                  help='The port on which the kubernetes proxy (on localhost)'
+                  ' is started')
+argp.add_argument('--stress_server_port',
+                  default=_DEFAULT_STRESS_SERVER_PORT,
+                  type=int,
+                  help='The port on which the stress server (in GKE '
+                  'containers) listens')
+argp.add_argument('--stress_client_metrics_port',
+                  default=_DEFAULT_METRICS_PORT,
+                  type=int,
+                  help='The port on which the stress clients (in GKE '
+                  'containers) expose metrics')
+argp.add_argument('--stress_client_poll_interval_secs',
+                  default=_DEFAULT_STRESS_CLIENT_POLL_INTERVAL_SECS,
+                  type=int,
+                  help='How frequently should the stress client wrapper script'
+                  ' running inside GKE should monitor health of the actual '
+                  ' stress client process and upload the metrics to BigQuery')
+argp.add_argument('--stress_client_metrics_collection_interval_secs',
+                  default=_DEFAULT_METRICS_COLLECTION_INTERVAL_SECS,
+                  type=int,
+                  help='How frequently should metrics be collected in-memory on'
+                  ' the stress clients (running inside GKE containers). Note '
+                  'that this is NOT the same as the upload-to-BigQuery '
+                  'frequency. The metrics upload frequency is controlled by the'
+                  ' --stress_client_poll_interval_secs flag')
+argp.add_argument('--stress_client_num_channels_per_server',
+                  default=_DEFAULT_NUM_CHANNELS_PER_SERVER,
+                  type=int,
+                  help='The number of channels created to each server from a '
+                  'stress client')
+argp.add_argument('--stress_client_num_stubs_per_channel',
+                  default=_DEFAULT_NUM_STUBS_PER_CHANNEL,
+                  type=int,
+                  help='The number of stubs created per channel. This number '
+                  'indicates the max number of RPCs that can be made in '
+                  'parallel on each channel at any given time')
+argp.add_argument('--stress_client_test_cases',
+                  default=_DEFAULT_TEST_CASES_STR,
+                  help='List of test cases (with weights) to be executed by the'
+                  ' stress test client. The list is in the following format:\n'
+                  '  <testcase_1:w_1,<test_case2:w_2>..<testcase_n:w_n>\n'
+                  ' (Note: The weights do not have to add up to 100)')
+
+if __name__ == '__main__':
+  args = argp.parse_args()
+
+  test_settings = TestSettings(
+      args.build_docker_image, args.test_poll_interval_secs,
+      args.test_duration_secs, args.kubernetes_proxy_port)
+
+  gke_settings = GkeSettings(args.project_id, args.docker_image_name)
+
+  stress_server_settings = StressServerSettings(_SERVER_POD_NAME,
+                                                args.stress_server_port)
+  stress_client_settings = StressClientSettings(
+      args.num_clients, _CLIENT_POD_NAME_PREFIX, _SERVER_POD_NAME,
+      args.stress_server_port, args.stress_client_metrics_port,
+      args.stress_client_metrics_collection_interval_secs,
+      args.stress_client_poll_interval_secs,
+      args.stress_client_num_channels_per_server,
+      args.stress_client_num_stubs_per_channel, args.stress_client_test_cases)
+
+  run_test_main(test_settings, gke_settings, stress_server_settings,
+                stress_client_settings)