From ddaa69f15d8b3bb1a6bf9aff231950406fe5e961 Mon Sep 17 00:00:00 2001 From: murgatroid99 <mlumish@google.com> Date: Fri, 22 Apr 2016 13:37:26 -0700 Subject: [PATCH] Got Ruby stress client working, with some modifications to interop tests --- src/ruby/pb/grpc/testing/metrics.rb | 28 ++++ src/ruby/pb/grpc/testing/metrics_services.rb | 27 ++++ src/ruby/pb/test/client.rb | 28 +--- src/ruby/pb/test/server.rb | 2 +- src/ruby/stress/metrics_server.rb | 83 ++++++++++ src/ruby/stress/stress_client.rb | 155 +++++++++++++++++++ 6 files changed, 301 insertions(+), 22 deletions(-) create mode 100644 src/ruby/pb/grpc/testing/metrics.rb create mode 100644 src/ruby/pb/grpc/testing/metrics_services.rb create mode 100644 src/ruby/stress/metrics_server.rb create mode 100755 src/ruby/stress/stress_client.rb diff --git a/src/ruby/pb/grpc/testing/metrics.rb b/src/ruby/pb/grpc/testing/metrics.rb new file mode 100644 index 0000000000..3b3c8cd61b --- /dev/null +++ b/src/ruby/pb/grpc/testing/metrics.rb @@ -0,0 +1,28 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: grpc/testing/metrics.proto + +require 'google/protobuf' + +Google::Protobuf::DescriptorPool.generated_pool.build do + add_message "grpc.testing.GaugeResponse" do + optional :name, :string, 1 + oneof :value do + optional :long_value, :int64, 2 + optional :double_value, :double, 3 + optional :string_value, :string, 4 + end + end + add_message "grpc.testing.GaugeRequest" do + optional :name, :string, 1 + end + add_message "grpc.testing.EmptyMessage" do + end +end + +module Grpc + module Testing + GaugeResponse = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.GaugeResponse").msgclass + GaugeRequest = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.GaugeRequest").msgclass + EmptyMessage = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.EmptyMessage").msgclass + end +end diff --git a/src/ruby/pb/grpc/testing/metrics_services.rb b/src/ruby/pb/grpc/testing/metrics_services.rb new file mode 100644 index 0000000000..f5778bbbb1 --- /dev/null +++ b/src/ruby/pb/grpc/testing/metrics_services.rb @@ -0,0 +1,27 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# Source: grpc/testing/metrics.proto for package 'grpc.testing' + +require 'grpc' +require 'grpc/testing/metrics' + +module Grpc + module Testing + module MetricsService + + # TODO: add proto service documentation here + class Service + + include GRPC::GenericService + + self.marshal_class_method = :encode + self.unmarshal_class_method = :decode + self.service_name = 'grpc.testing.MetricsService' + + rpc :GetAllGauges, EmptyMessage, stream(GaugeResponse) + rpc :GetGauge, GaugeRequest, GaugeResponse + end + + Stub = Service.rpc_stub_class + end + end +end diff --git a/src/ruby/pb/test/client.rb b/src/ruby/pb/test/client.rb index 695a5c4ea2..95b059a18e 100755 --- a/src/ruby/pb/test/client.rb +++ b/src/ruby/pb/test/client.rb @@ -38,23 +38,23 @@ # --server_port=<port> \ # --test_case=<testcase_name> +# These lines are required for the generated files to load grpc this_dir = File.expand_path(File.dirname(__FILE__)) lib_dir = File.join(File.dirname(File.dirname(this_dir)), 'lib') -pb_dir = File.dirname(File.dirname(this_dir)) +pb_dir = File.dirname(this_dir) $LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir) $LOAD_PATH.unshift(pb_dir) unless $LOAD_PATH.include?(pb_dir) -$LOAD_PATH.unshift(this_dir) unless $LOAD_PATH.include?(this_dir) require 'optparse' require 'logger' -require 'grpc' +require_relative '../../lib/grpc' require 'googleauth' require 'google/protobuf' -require 'test/proto/empty' -require 'test/proto/messages' -require 'test/proto/test_services' +require_relative 'proto/empty' +require_relative 'proto/messages' +require_relative 'proto/test_services' AUTH_ENV = Google::Auth::CredentialsLoader::ENV_VAR @@ -208,12 +208,10 @@ class NamedTests def empty_unary resp = @stub.empty_call(Empty.new) assert('empty_unary: invalid response') { resp.is_a?(Empty) } - p 'OK: empty_unary' end def large_unary perform_large_unary - p 'OK: large_unary' end def service_account_creds @@ -230,7 +228,6 @@ class NamedTests assert("#{__callee__}: bad oauth scope") do @args.oauth_scope.include?(resp.oauth_scope) end - p "OK: #{__callee__}" end def jwt_token_creds @@ -238,7 +235,6 @@ class NamedTests wanted_email = MultiJson.load(json_key)['client_email'] resp = perform_large_unary(fill_username: true) assert("#{__callee__}: bad username") { wanted_email == resp.username } - p "OK: #{__callee__}" end def compute_engine_creds @@ -247,7 +243,6 @@ class NamedTests assert("#{__callee__}: bad username") do @args.default_service_account == resp.username end - p "OK: #{__callee__}" end def oauth2_auth_token @@ -259,7 +254,6 @@ class NamedTests assert("#{__callee__}: bad oauth scope") do @args.oauth_scope.include?(resp.oauth_scope) end - p "OK: #{__callee__}" end def per_rpc_creds @@ -279,7 +273,6 @@ class NamedTests assert("#{__callee__}: bad oauth scope") do @args.oauth_scope.include?(resp.oauth_scope) end - p "OK: #{__callee__}" end def client_streaming @@ -293,7 +286,6 @@ class NamedTests assert("#{__callee__}: aggregate payload size is incorrect") do wanted_aggregate_size == resp.aggregated_payload_size end - p "OK: #{__callee__}" end def server_streaming @@ -311,7 +303,6 @@ class NamedTests :COMPRESSABLE == r.payload.type end end - p "OK: #{__callee__}" end def ping_pong @@ -319,7 +310,6 @@ class NamedTests ppp = PingPongPlayer.new(msg_sizes) resps = @stub.full_duplex_call(ppp.each_item) resps.each { |r| ppp.queue.push(r) } - p "OK: #{__callee__}" end def timeout_on_sleeping_server @@ -332,7 +322,6 @@ class NamedTests assert("#{__callee__}: status was wrong") do e.code == GRPC::Core::StatusCodes::DEADLINE_EXCEEDED end - p "OK: #{__callee__}" end def empty_stream @@ -346,7 +335,6 @@ class NamedTests assert("#{__callee__}: too many responses expected 0") do count == 0 end - p "OK: #{__callee__}" end def cancel_after_begin @@ -361,7 +349,6 @@ class NamedTests fail 'Should have raised GRPC:Cancelled' rescue GRPC::Cancelled assert("#{__callee__}: call operation should be CANCELLED") { op.cancelled } - p "OK: #{__callee__}" end def cancel_after_first_response @@ -374,7 +361,6 @@ class NamedTests rescue GRPC::Cancelled assert("#{__callee__}: call operation should be CANCELLED") { op.cancelled } op.wait - p "OK: #{__callee__}" end def all @@ -442,7 +428,7 @@ def parse_args opts.on('--use_tls USE_TLS', ['false', 'true'], 'require a secure connection?') do |v| args['secure'] = v == 'true' - end +p end opts.on('--use_test_ca USE_TEST_CA', ['false', 'true'], 'if secure, use the test certificate?') do |v| args['use_test_ca'] = v == 'true' diff --git a/src/ruby/pb/test/server.rb b/src/ruby/pb/test/server.rb index 851e815222..914c7cc79d 100755 --- a/src/ruby/pb/test/server.rb +++ b/src/ruby/pb/test/server.rb @@ -39,7 +39,7 @@ this_dir = File.expand_path(File.dirname(__FILE__)) lib_dir = File.join(File.dirname(File.dirname(this_dir)), 'lib') -pb_dir = File.dirname(File.dirname(this_dir)) +pb_dir = File.dirname(this_dir) $LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir) $LOAD_PATH.unshift(pb_dir) unless $LOAD_PATH.include?(pb_dir) $LOAD_PATH.unshift(this_dir) unless $LOAD_PATH.include?(this_dir) diff --git a/src/ruby/stress/metrics_server.rb b/src/ruby/stress/metrics_server.rb new file mode 100644 index 0000000000..13638c4d21 --- /dev/null +++ b/src/ruby/stress/metrics_server.rb @@ -0,0 +1,83 @@ +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +require_relative '../pb/grpc/testing/metrics.rb' +require_relative '../pb/grpc/testing/metrics_services.rb' + +class Gauge + def get_name + raise NoMethodError.new + end + + def get_type + raise NoMethodError.new + end + + def get_value + raise NoMethodError.new + end +end + +class MetricsServiceImpl < Grpc::Testing::MetricsService::Service + include Grpc::Testing + @gauges + + def initialize + @gauges = {} + end + + def register_gauge(gauge) + @gauges[gauge.get_name] = gauge + end + + def make_gauge_response(gauge) + response = GaugeResponse.new(:name => gauge.get_name) + value = gauge.get_value + case gauge.get_type + when 'long' + response.long_value = value + when 'double' + response.double_value = value + when 'string' + response.string_value = value + end + response + end + + def get_all_gauges(_empty, _call) + @gauges.values.map do |gauge| + make_gauge_response gauge + end + end + + def get_gauge(gauge_req, _call) + gauge = @gauges[gauge_req.name] + make_gauge_response gauge + end +end diff --git a/src/ruby/stress/stress_client.rb b/src/ruby/stress/stress_client.rb new file mode 100755 index 0000000000..698f9f1b87 --- /dev/null +++ b/src/ruby/stress/stress_client.rb @@ -0,0 +1,155 @@ +#!/usr/bin/env ruby + +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +require 'optparse' +require 'thread' +require_relative '../pb/test/client' +require_relative './metrics_server' +require_relative '../lib/grpc' + +class QpsGauge < Gauge + @query_count + @query_mutex + @start_time + + def initialize + @query_count = 0 + @query_mutex = Mutex.new + @start_time = Time.now + end + + def increment_queries + @query_mutex.synchronize { @query_count += 1} + end + + def get_name + 'qps' + end + + def get_type + 'long' + end + + def get_value + (@query_mutex.synchronize { @query_count / (Time.now - @start_time) }).to_i + end +end + +def start_metrics_server(port) + host = "0.0.0.0:#{port}" + server = GRPC::RpcServer.new + server.add_http2_port(host, :this_port_is_insecure) + service = MetricsServiceImpl.new + server.handle(service) + server_thread = Thread.new { server.run_till_terminated } + [server, service, server_thread] +end + +StressArgs = Struct.new(:server_addresses, :test_cases, :duration, + :channels_per_server, :concurrent_calls, :metrics_port) + +def start(stress_args) + running = true + threads = [] + qps_gauge = QpsGauge.new + metrics_server, metrics_service, metrics_thread = + start_metrics_server(stress_args.metrics_port) + metrics_service.register_gauge(qps_gauge) + stress_args.server_addresses.each do |address| + stress_args.channels_per_server.times do + client_args = Args.new + client_args.host, client_args.port = address.split(':') + client_args.secure = false + client_args.test_case = '' + stub = create_stub(client_args) + named_tests = NamedTests.new(stub, client_args) + stress_args.concurrent_calls.times do + threads << Thread.new do + while running + named_tests.method(stress_args.test_cases.sample).call + qps_gauge.increment_queries + end + end + end + end + end + if stress_args.duration >= 0 + sleep stress_args.duration + running = false + metrics_server.stop + p "QPS: #{qps_gauge.get_value}" + threads.each { |thd| thd.join; } + end + metrics_thread.join +end + +def parse_stress_args + stress_args = StressArgs.new + stress_args.server_addresses = ['localhost:8080'] + stress_args.test_cases = [] + stress_args.duration = -1 + stress_args.channels_per_server = 1 + stress_args.concurrent_calls = 1 + stress_args.metrics_port = '8081' + OptionParser.new do |opts| + opts.on('--server_addresses [LIST]', Array) do |addrs| + stress_args.server_addresses = addrs + end + opts.on('--test_cases cases', Array) do |cases| + stress_args.test_cases = (cases.map do |item| + split = item.split(':') + [split[0]] * split[1].to_i + end).reduce([], :+) + end + opts.on('--test_duration_secs [INT]', OptionParser::DecimalInteger) do |time| + stress_args.duration = time + end + opts.on('--num_channels_per_server [INT]', OptionParser::DecimalInteger) do |channels| + stress_args.channels_per_server = channels + end + opts.on('--num_stubs_per_channel [INT]', OptionParser::DecimalInteger) do |stubs| + stress_args.concurrent_calls = stubs + end + opts.on('--metrics_port [port]') do |port| + stress_args.metrics_port = port + end + end.parse! + stress_args +end + +def main + opts = parse_stress_args + start(opts) +end + +if __FILE__ == $0 + main +end -- GitLab