diff --git a/src/ruby/qps/client.rb b/src/ruby/qps/client.rb index 7ed648acefc5682a4180dff42ed8149cf73ca67d..8aed866da5d98c1dbd88b8728792879142d6acf9 100644 --- a/src/ruby/qps/client.rb +++ b/src/ruby/qps/client.rb @@ -89,12 +89,14 @@ class BenchmarkClient payload: gtp.new(type: gtpt::COMPRESSABLE, body: nulls(simple_params.req_size))) + @child_threads = [] + (0..config.client_channels-1).each do |chan| gtbss = Grpc::Testing::BenchmarkService::Stub st = config.server_targets stub = gtbss.new(st[chan % st.length], cred, **opts) (0..config.outstanding_rpcs_per_channel-1).each do |r| - Thread.new { + @child_threads << Thread.new { case config.load_params.load.to_s when 'closed_loop' waiter = nil @@ -162,5 +164,8 @@ class BenchmarkClient end def shutdown @done = true + @child_threads.each do |thread| + thread.join + end end end diff --git a/src/ruby/qps/server.rb b/src/ruby/qps/server.rb index cd98ee1fd9439c9066a59bda06cc0772212f76fc..f51f86d9a963fe954d2df19ed555ff3314b786df 100644 --- a/src/ruby/qps/server.rb +++ b/src/ruby/qps/server.rb @@ -71,7 +71,8 @@ class BenchmarkServer else cred = :this_port_is_insecure end - @server = GRPC::RpcServer.new + # Make sure server can handle the large number of calls in benchmarks + @server = GRPC::RpcServer.new(pool_size: 100, max_waiting_requests: 100) @port = @server.add_http2_port("0.0.0.0:" + port.to_s, cred) @server.handle(BenchmarkServiceImpl.new) @start_time = Time.now diff --git a/src/ruby/qps/worker.rb b/src/ruby/qps/worker.rb index 12b8087ca0590dc27c419403f3fe7276369cc97f..61a0b723a3fd5ca1c8908827441e7c4cc43fc2b9 100755 --- a/src/ruby/qps/worker.rb +++ b/src/ruby/qps/worker.rb @@ -64,8 +64,8 @@ class WorkerServiceImpl < Grpc::Testing::WorkerService::Service q.push(gtss.new(stats: bms.mark(req.mark.reset), cores: cpu_cores)) end end - q.push(self) bms.stop + q.push(self) } q.each_item end @@ -83,8 +83,8 @@ class WorkerServiceImpl < Grpc::Testing::WorkerService::Service client.mark(req.mark.reset))) end end - q.push(self) client.shutdown + q.push(self) } q.each_item end @@ -118,6 +118,10 @@ def main options['server_port'] = v end end.parse! + + # Configure any errors with client or server child threads to surface + Thread.abort_on_exception = true + s = GRPC::RpcServer.new s.add_http2_port("0.0.0.0:" + options['driver_port'].to_s, :this_port_is_insecure)