Skip to content
Snippets Groups Projects
Commit 45be26ef authored by vjpai's avatar vjpai
Browse files

Working ruby server implementation

parent 691387af
No related branches found
No related tags found
No related merge requests found
......@@ -40,6 +40,7 @@ require 'grpc'
require 'qps-common'
require 'src/proto/grpc/testing/messages'
require 'src/proto/grpc/testing/services_services'
require 'src/proto/grpc/testing/stats'
class BenchmarkServiceImpl < Grpc::Testing::BenchmarkService::Service
def unary_call(req, _call)
......@@ -61,8 +62,38 @@ class BenchmarkServiceImpl < Grpc::Testing::BenchmarkService::Service
end
end
def benchmark_server(config)
def load_test_certs
this_dir = File.expand_path(File.dirname(__FILE__))
data_dir = File.join(File.dirname(this_dir), 'spec/testdata')
files = ['ca.pem', 'server1.key', 'server1.pem']
files.map { |f| File.open(File.join(data_dir, f)).read }
end
class BenchmarkServer
def initialize(config, port)
if config.security_params
certs = load_test_certs
cred = GRPC::Core::Credentials.new(certs[0])
else
cred = :this_port_is_insecure
end
@server = GRPC::RpcServer.new
@port = @server.add_http2_port("0.0.0.0:" + port.to_s, cred)
@server.handle(BenchmarkServiceImpl.new)
@start_time = Time.now
Thread.new {
@server.run
}
end
def mark(reset)
s = Grpc::Testing::ServerStats.new(time_elapsed: (Time.now-@start_time).to_f)
if reset
@start_time = Time.now
end
s
end
def get_port
@port
end
end
......@@ -54,15 +54,16 @@ class WorkerServiceImpl < Grpc::Testing::WorkerService::Service
q = EnumeratorQueue.new(self)
Thread.new {
reqs.each do |req|
case req.argtype
when "setup"
server = BenchmarkServer.new(req.setup)
q.push(Grpc::Testing::ServerStatus.new(stats: server.mark(false), port: server.get_port))
when "mark"
q.push(Grpc::Testing::ServerStatus.new(stats: server.mark(req.mark.reset), cores: cpu_cores))
case req.argtype.to_s
when 'setup'
@bms = BenchmarkServer.new(req.setup, @server_port)
q.push(Grpc::Testing::ServerStatus.new(stats: @bms.mark(false), port: @bms.get_port))
when 'mark'
q.push(Grpc::Testing::ServerStatus.new(stats: @bms.mark(req.mark.reset), cores: cpu_cores))
end
end
q.push(self)
@bms.stop
}
q.each_item
end
......@@ -71,10 +72,10 @@ class WorkerServiceImpl < Grpc::Testing::WorkerService::Service
Thread.new {
reqs.each do |req|
case req.argtype
when "setup"
server = BenchmarkClient.new(req.setup)
when 'setup'
client = BenchmarkClient.new(req.setup)
q.push(Grpc::Testing::ClientStatus.new(stats: client.mark(false)))
when "mark"
when 'mark'
q.push(Grpc::Testing::ClientStatus.new(stats: client.mark(req.mark.reset)))
end
end
......@@ -92,25 +93,30 @@ class WorkerServiceImpl < Grpc::Testing::WorkerService::Service
}
Grpc::Testing::Void.new
end
def initialize(s)
def initialize(s, sp)
@server = s
@server_port = sp
end
end
def main
options = {
'driver_port' => 0
'driver_port' => 0,
'server_port' => 0
}
OptionParser.new do |opts|
opts.banner = 'Usage: [--driver_port <port>]'
opts.banner = 'Usage: [--driver_port <port>] [--server_port <port>]'
opts.on('--driver_port PORT', '<port>') do |v|
options['driver_port'] = v
end
opts.on('--server_port PORT', '<port>') do |v|
options['server_port'] = v
end
end.parse!
s = GRPC::RpcServer.new
s.add_http2_port("0.0.0.0:" + options['driver_port'].to_s,
:this_port_is_insecure)
s.handle(WorkerServiceImpl.new(s))
s.handle(WorkerServiceImpl.new(s, options['server_port'].to_i))
s.run
end
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment