diff --git a/src/ruby/qps/server.rb b/src/ruby/qps/server.rb index edc06806aa0e258985df80af1a8cea38ad0e9ba8..44a80988d75abd8039eab80a497755eee1b8dec5 100644 --- a/src/ruby/qps/server.rb +++ b/src/ruby/qps/server.rb @@ -40,6 +40,7 @@ require 'grpc' require 'qps-common' require 'src/proto/grpc/testing/messages' require 'src/proto/grpc/testing/services_services' +require 'src/proto/grpc/testing/stats' class BenchmarkServiceImpl < Grpc::Testing::BenchmarkService::Service def unary_call(req, _call) @@ -61,8 +62,38 @@ class BenchmarkServiceImpl < Grpc::Testing::BenchmarkService::Service end end -def benchmark_server(config) - +def load_test_certs + this_dir = File.expand_path(File.dirname(__FILE__)) + data_dir = File.join(File.dirname(this_dir), 'spec/testdata') + files = ['ca.pem', 'server1.key', 'server1.pem'] + files.map { |f| File.open(File.join(data_dir, f)).read } end +class BenchmarkServer + def initialize(config, port) + if config.security_params + certs = load_test_certs + cred = GRPC::Core::Credentials.new(certs[0]) + else + cred = :this_port_is_insecure + end + @server = GRPC::RpcServer.new + @port = @server.add_http2_port("0.0.0.0:" + port.to_s, cred) + @server.handle(BenchmarkServiceImpl.new) + @start_time = Time.now + Thread.new { + @server.run + } + end + def mark(reset) + s = Grpc::Testing::ServerStats.new(time_elapsed: (Time.now-@start_time).to_f) + if reset + @start_time = Time.now + end + s + end + def get_port + @port + end +end diff --git a/src/ruby/qps/worker.rb b/src/ruby/qps/worker.rb index ca60f5192560282be55d8cdbef4f4a704cdad58c..1ebf129a13b724b9f1dff265cb6ce95c28211f10 100755 --- a/src/ruby/qps/worker.rb +++ b/src/ruby/qps/worker.rb @@ -54,15 +54,16 @@ class WorkerServiceImpl < Grpc::Testing::WorkerService::Service q = EnumeratorQueue.new(self) Thread.new { reqs.each do |req| - case req.argtype - when "setup" - server = BenchmarkServer.new(req.setup) - q.push(Grpc::Testing::ServerStatus.new(stats: server.mark(false), port: server.get_port)) - when "mark" - q.push(Grpc::Testing::ServerStatus.new(stats: server.mark(req.mark.reset), cores: cpu_cores)) + case req.argtype.to_s + when 'setup' + @bms = BenchmarkServer.new(req.setup, @server_port) + q.push(Grpc::Testing::ServerStatus.new(stats: @bms.mark(false), port: @bms.get_port)) + when 'mark' + q.push(Grpc::Testing::ServerStatus.new(stats: @bms.mark(req.mark.reset), cores: cpu_cores)) end end q.push(self) + @bms.stop } q.each_item end @@ -71,10 +72,10 @@ class WorkerServiceImpl < Grpc::Testing::WorkerService::Service Thread.new { reqs.each do |req| case req.argtype - when "setup" - server = BenchmarkClient.new(req.setup) + when 'setup' + client = BenchmarkClient.new(req.setup) q.push(Grpc::Testing::ClientStatus.new(stats: client.mark(false))) - when "mark" + when 'mark' q.push(Grpc::Testing::ClientStatus.new(stats: client.mark(req.mark.reset))) end end @@ -92,25 +93,30 @@ class WorkerServiceImpl < Grpc::Testing::WorkerService::Service } Grpc::Testing::Void.new end - def initialize(s) + def initialize(s, sp) @server = s + @server_port = sp end end def main options = { - 'driver_port' => 0 + 'driver_port' => 0, + 'server_port' => 0 } OptionParser.new do |opts| - opts.banner = 'Usage: [--driver_port <port>]' + opts.banner = 'Usage: [--driver_port <port>] [--server_port <port>]' opts.on('--driver_port PORT', '<port>') do |v| options['driver_port'] = v end + opts.on('--server_port PORT', '<port>') do |v| + options['server_port'] = v + end end.parse! s = GRPC::RpcServer.new s.add_http2_port("0.0.0.0:" + options['driver_port'].to_s, :this_port_is_insecure) - s.handle(WorkerServiceImpl.new(s)) + s.handle(WorkerServiceImpl.new(s, options['server_port'].to_i)) s.run end