Skip to content
Snippets Groups Projects
Commit 847f9ecc authored by Alexander Polcyn's avatar Alexander Polcyn
Browse files

fail benchmarks with errors in a child rpc thread

parent 3bc78cd0
No related branches found
No related tags found
No related merge requests found
...@@ -89,12 +89,14 @@ class BenchmarkClient ...@@ -89,12 +89,14 @@ class BenchmarkClient
payload: gtp.new(type: gtpt::COMPRESSABLE, payload: gtp.new(type: gtpt::COMPRESSABLE,
body: nulls(simple_params.req_size))) body: nulls(simple_params.req_size)))
@child_threads = []
(0..config.client_channels-1).each do |chan| (0..config.client_channels-1).each do |chan|
gtbss = Grpc::Testing::BenchmarkService::Stub gtbss = Grpc::Testing::BenchmarkService::Stub
st = config.server_targets st = config.server_targets
stub = gtbss.new(st[chan % st.length], cred, **opts) stub = gtbss.new(st[chan % st.length], cred, **opts)
(0..config.outstanding_rpcs_per_channel-1).each do |r| (0..config.outstanding_rpcs_per_channel-1).each do |r|
Thread.new { @child_threads << Thread.new {
case config.load_params.load.to_s case config.load_params.load.to_s
when 'closed_loop' when 'closed_loop'
waiter = nil waiter = nil
...@@ -162,5 +164,8 @@ class BenchmarkClient ...@@ -162,5 +164,8 @@ class BenchmarkClient
end end
def shutdown def shutdown
@done = true @done = true
@child_threads.each do |thread|
thread.join
end
end end
end end
...@@ -71,7 +71,8 @@ class BenchmarkServer ...@@ -71,7 +71,8 @@ class BenchmarkServer
else else
cred = :this_port_is_insecure cred = :this_port_is_insecure
end end
@server = GRPC::RpcServer.new # Make sure server can handle the large number of calls in benchmarks
@server = GRPC::RpcServer.new(pool_size: 100, max_waiting_requests: 100)
@port = @server.add_http2_port("0.0.0.0:" + port.to_s, cred) @port = @server.add_http2_port("0.0.0.0:" + port.to_s, cred)
@server.handle(BenchmarkServiceImpl.new) @server.handle(BenchmarkServiceImpl.new)
@start_time = Time.now @start_time = Time.now
......
...@@ -64,8 +64,8 @@ class WorkerServiceImpl < Grpc::Testing::WorkerService::Service ...@@ -64,8 +64,8 @@ class WorkerServiceImpl < Grpc::Testing::WorkerService::Service
q.push(gtss.new(stats: bms.mark(req.mark.reset), cores: cpu_cores)) q.push(gtss.new(stats: bms.mark(req.mark.reset), cores: cpu_cores))
end end
end end
q.push(self)
bms.stop bms.stop
q.push(self)
} }
q.each_item q.each_item
end end
...@@ -83,8 +83,8 @@ class WorkerServiceImpl < Grpc::Testing::WorkerService::Service ...@@ -83,8 +83,8 @@ class WorkerServiceImpl < Grpc::Testing::WorkerService::Service
client.mark(req.mark.reset))) client.mark(req.mark.reset)))
end end
end end
q.push(self)
client.shutdown client.shutdown
q.push(self)
} }
q.each_item q.each_item
end end
...@@ -118,6 +118,10 @@ def main ...@@ -118,6 +118,10 @@ def main
options['server_port'] = v options['server_port'] = v
end end
end.parse! end.parse!
# Configure any errors with client or server child threads to surface
Thread.abort_on_exception = true
s = GRPC::RpcServer.new s = GRPC::RpcServer.new
s.add_http2_port("0.0.0.0:" + options['driver_port'].to_s, s.add_http2_port("0.0.0.0:" + options['driver_port'].to_s,
:this_port_is_insecure) :this_port_is_insecure)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment