Skip to content
Snippets Groups Projects
stress_test_utils.py 7.74 KiB
#!/usr/bin/env python2.7
# Copyright 2015-2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
#     * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#     * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
#     * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import datetime
import json
import os
import re
import select
import subprocess
import sys
import time

# Import big_query_utils module
bq_utils_dir = os.path.abspath(os.path.join(
    os.path.dirname(__file__), '../utils'))
sys.path.append(bq_utils_dir)
import big_query_utils as bq_utils


class EventType:
  STARTING = 'STARTING'
  SUCCESS = 'SUCCESS'
  FAILURE = 'FAILURE'


class BigQueryHelper:
  """Helper class for the stress test wrappers to interact with BigQuery.
  """

  def __init__(self, run_id, image_type, pod_name, project_id, dataset_id,
               summary_table_id, qps_table_id):
    self.run_id = run_id
    self.image_type = image_type
    self.pod_name = pod_name
    self.project_id = project_id
    self.dataset_id = dataset_id
    self.summary_table_id = summary_table_id
    self.qps_table_id = qps_table_id

  def initialize(self):
    self.bq = bq_utils.create_big_query()

  def setup_tables(self):
    return bq_utils.create_dataset(self.bq, self.project_id, self.dataset_id) \
        and self.__create_summary_table() \
        and self.__create_qps_table()

  def insert_summary_row(self, event_type, details):
    row_values_dict = {
        'run_id': self.run_id,
        'image_type': self.image_type,
        'pod_name': self.pod_name,
        'event_date': datetime.datetime.now().isoformat(),
        'event_type': event_type,
        'details': details
    }
    # row_unique_id is something that uniquely identifies the row (BigQuery uses
    # it for duplicate detection).
    row_unique_id = '%s_%s_%s' % (self.run_id, self.pod_name, event_type)
    row = bq_utils.make_row(row_unique_id, row_values_dict)
    return bq_utils.insert_rows(self.bq, self.project_id, self.dataset_id,
                                self.summary_table_id, [row])

  def insert_qps_row(self, qps, recorded_at):
    row_values_dict = {
        'run_id': self.run_id,
        'pod_name': self.pod_name,
        'recorded_at': recorded_at,
        'qps': qps
    }

    # row_unique_id is something that uniquely identifies the row (BigQuery uses
    # it for duplicate detection).
    row_unique_id = '%s_%s_%s' % (self.run_id, self.pod_name, recorded_at)
    row = bq_utils.make_row(row_unique_id, row_values_dict)
    return bq_utils.insert_rows(self.bq, self.project_id, self.dataset_id,
                                self.qps_table_id, [row])

  def check_if_any_tests_failed(self, num_query_retries=3):
    query = ('SELECT event_type FROM %s.%s WHERE run_id = \'%s\' AND '
             'event_type="%s"') % (self.dataset_id, self.summary_table_id,
                                   self.run_id, EventType.FAILURE)
    query_job = bq_utils.sync_query_job(self.bq, self.project_id, query)
    page = self.bq.jobs().getQueryResults(**query_job['jobReference']).execute(
        num_retries=num_query_retries)
    num_failures = int(page['totalRows'])
    print 'num rows: ', num_failures
    return num_failures > 0

  def print_summary_records(self, num_query_retries=3):
    line = '-' * 120
    print line
    print 'Summary records'
    print 'Run Id: ', self.run_id
    print 'Dataset Id: ', self.dataset_id
    print line
    query = ('SELECT pod_name, image_type, event_type, event_date, details'
             ' FROM %s.%s WHERE run_id = \'%s\' ORDER by event_date;') % (
                 self.dataset_id, self.summary_table_id, self.run_id)
    query_job = bq_utils.sync_query_job(self.bq, self.project_id, query)

    print '{:<25} {:<12} {:<12} {:<30} {}'.format(
        'Pod name', 'Image type', 'Event type', 'Date', 'Details')
    print line
    page_token = None
    while True:
      page = self.bq.jobs().getQueryResults(
          pageToken=page_token,
          **query_job['jobReference']).execute(num_retries=num_query_retries)
      rows = page.get('rows', [])
      for row in rows:
        print '{:<25} {:<12} {:<12} {:<30} {}'.format(
            row['f'][0]['v'], row['f'][1]['v'], row['f'][2]['v'],
            row['f'][3]['v'], row['f'][4]['v'])
      page_token = page.get('pageToken')
      if not page_token:
        break

  def print_qps_records(self, num_query_retries=3):
    line = '-' * 80
    print line
    print 'QPS Summary'
    print 'Run Id: ', self.run_id
    print 'Dataset Id: ', self.dataset_id
    print line
    query = (
        'SELECT pod_name, recorded_at, qps FROM %s.%s WHERE run_id = \'%s\' '
        'ORDER by recorded_at;') % (self.dataset_id, self.qps_table_id,
                                    self.run_id)
    query_job = bq_utils.sync_query_job(self.bq, self.project_id, query)
    print '{:<25} {:30} {}'.format('Pod name', 'Recorded at', 'Qps')
    print line
    page_token = None
    while True:
      page = self.bq.jobs().getQueryResults(
          pageToken=page_token,
          **query_job['jobReference']).execute(num_retries=num_query_retries)
      rows = page.get('rows', [])
      for row in rows:
        print '{:<25} {:30} {}'.format(row['f'][0]['v'], row['f'][1]['v'],
                                       row['f'][2]['v'])
      page_token = page.get('pageToken')
      if not page_token:
        break

  def __create_summary_table(self):
    summary_table_schema = [
        ('run_id', 'STRING', 'Test run id'),
        ('image_type', 'STRING', 'Client or Server?'),
        ('pod_name', 'STRING', 'GKE pod hosting this image'),
        ('event_date', 'STRING', 'The date of this event'),
        ('event_type', 'STRING', 'STARTED/SUCCESS/FAILURE'),
        ('details', 'STRING', 'Any other relevant details')
    ]
    desc = ('The table that contains START/SUCCESS/FAILURE events for '
            ' the stress test clients and servers')
    return bq_utils.create_table(self.bq, self.project_id, self.dataset_id,
                                 self.summary_table_id, summary_table_schema,
                                 desc)

  def __create_qps_table(self):
    qps_table_schema = [
        ('run_id', 'STRING', 'Test run id'),
        ('pod_name', 'STRING', 'GKE pod hosting this image'),
        ('recorded_at', 'STRING', 'Metrics recorded at time'),
        ('qps', 'INTEGER', 'Queries per second')
    ]
    desc = 'The table that cointains the qps recorded at various intervals'
    return bq_utils.create_table(self.bq, self.project_id, self.dataset_id,
                                 self.qps_table_id, qps_table_schema, desc)