Skip to content
Snippets Groups Projects
Commit f4a745a6 authored by apolcyn's avatar apolcyn Committed by GitHub
Browse files

Merge pull request #8151 from apolcyn/dont_create_thread_on_bidi_ping_pong

dont create extra threads in server handlers for ruby bidi streams
parents d23f8166 b06d5341
No related branches found
No related tags found
No related merge requests found
...@@ -140,10 +140,24 @@ ROUTE_CHAT_NOTES = [ ...@@ -140,10 +140,24 @@ ROUTE_CHAT_NOTES = [
def run_route_chat(stub) def run_route_chat(stub)
p 'Route Chat' p 'Route Chat'
p '----------' p '----------'
# TODO: decouple sending and receiving, i.e have the response enumerator run sleeping_enumerator = SleepingEnumerator.new(ROUTE_CHAT_NOTES, 1)
# on its own thread. stub.route_chat(sleeping_enumerator.each_item) { |r| p "received #{r.inspect}" }
resps = stub.route_chat(ROUTE_CHAT_NOTES) end
resps.each { |r| p "received #{r.inspect}" }
# SleepingEnumerator yields through items, and sleeps between each one
class SleepingEnumerator
def initialize(items, delay)
@items = items
@delay = delay
end
def each_item
return enum_for(:each_item) unless block_given?
@items.each do |item|
sleep @delay
p "next item to send is #{item.inspect}"
yield item
end
end
end end
def main def main
......
...@@ -100,28 +100,6 @@ class RectangleEnum ...@@ -100,28 +100,6 @@ class RectangleEnum
end end
end end
# A EnumeratorQueue wraps a Queue to yield the items added to it.
class EnumeratorQueue
extend Forwardable
def_delegators :@q, :push
def initialize(sentinel)
@q = Queue.new
@sentinel = sentinel
@received_notes = {}
end
def each_item
return enum_for(:each_item) unless block_given?
loop do
r = @q.pop
break if r.equal?(@sentinel)
fail r if r.is_a? Exception
yield r
end
end
end
# ServerImpl provides an implementation of the RouteGuide service. # ServerImpl provides an implementation of the RouteGuide service.
class ServerImpl < RouteGuide::Service class ServerImpl < RouteGuide::Service
# @param [Hash] feature_db {location => name} # @param [Hash] feature_db {location => name}
...@@ -166,28 +144,33 @@ class ServerImpl < RouteGuide::Service ...@@ -166,28 +144,33 @@ class ServerImpl < RouteGuide::Service
end end
def route_chat(notes) def route_chat(notes)
q = EnumeratorQueue.new(self) RouteChatEnumerator.new(notes, @received_notes).each_item
# run a separate thread that processes the incoming requests end
t = Thread.new do end
begin
notes.each do |n| class RouteChatEnumerator
key = { def initialize(notes, received_notes)
'latitude' => n.location.latitude, @notes = notes
'longitude' => n.location.longitude @received_notes = received_notes
} end
earlier_msgs = @received_notes[key] def each_item
@received_notes[key] << n.message return enum_for(:each_item) unless block_given?
# send back the earlier messages at this point begin
earlier_msgs.each do |r| @notes.each do |n|
q.push(RouteNote.new(location: n.location, message: r)) key = {
end 'latitude' => n.location.latitude,
'longitude' => n.location.longitude
}
earlier_msgs = @received_notes[key]
@received_notes[key] << n.message
# send back the earlier messages at this point
earlier_msgs.each do |r|
yield RouteNote.new(location: n.location, message: r)
end end
q.push(self) # signal completion
rescue StandardError => e
q.push(e) # signal completion via an error
end end
rescue StandardError => e
fail e # signal completion via an error
end end
q.each_item
end end
end end
......
...@@ -129,27 +129,36 @@ def nulls(l) ...@@ -129,27 +129,36 @@ def nulls(l)
[].pack('x' * l).force_encoding('ascii-8bit') [].pack('x' * l).force_encoding('ascii-8bit')
end end
# A EnumeratorQueue wraps a Queue yielding the items added to it via each_item. # A FullDuplexEnumerator passes requests to a block and yields generated responses
class EnumeratorQueue class FullDuplexEnumerator
extend Forwardable include Grpc::Testing
def_delegators :@q, :push include Grpc::Testing::PayloadType
def initialize(sentinel)
@q = Queue.new
@sentinel = sentinel
end
def initialize(requests)
@requests = requests
end
def each_item def each_item
return enum_for(:each_item) unless block_given? return enum_for(:each_item) unless block_given?
loop do GRPC.logger.info('interop-server: started receiving')
r = @q.pop begin
break if r.equal?(@sentinel) cls = StreamingOutputCallResponse
fail r if r.is_a? Exception @requests.each do |req|
yield r req.response_parameters.each do |params|
resp_size = params.size
GRPC.logger.info("read a req, response size is #{resp_size}")
yield cls.new(payload: Payload.new(type: req.response_type,
body: nulls(resp_size)))
end
end
GRPC.logger.info('interop-server: finished receiving')
rescue StandardError => e
GRPC.logger.info('interop-server: failed')
GRPC.logger.warn(e)
fail e
end end
end end
end end
# A runnable implementation of the schema-specified testing service, with each # A runnable implementation of the schema-specified testing service, with each
# service method implemented as required by the interop testing spec. # service method implemented as required by the interop testing spec.
class TestTarget < Grpc::Testing::TestService::Service class TestTarget < Grpc::Testing::TestService::Service
...@@ -182,31 +191,9 @@ class TestTarget < Grpc::Testing::TestService::Service ...@@ -182,31 +191,9 @@ class TestTarget < Grpc::Testing::TestService::Service
def full_duplex_call(reqs) def full_duplex_call(reqs)
# reqs is a lazy Enumerator of the requests sent by the client. # reqs is a lazy Enumerator of the requests sent by the client.
q = EnumeratorQueue.new(self) FullDuplexEnumerator.new(reqs).each_item
cls = StreamingOutputCallResponse
Thread.new do
begin
GRPC.logger.info('interop-server: started receiving')
reqs.each do |req|
req.response_parameters.each do |params|
resp_size = params.size
GRPC.logger.info("read a req, response size is #{resp_size}")
resp = cls.new(payload: Payload.new(type: req.response_type,
body: nulls(resp_size)))
q.push(resp)
end
end
GRPC.logger.info('interop-server: finished receiving')
q.push(self)
rescue StandardError => e
GRPC.logger.info('interop-server: failed')
GRPC.logger.warn(e)
q.push(e) # share the exception with the enumerator
end
end
q.each_item
end end
def half_duplex_call(reqs) def half_duplex_call(reqs)
# TODO: update with unique behaviour of the half_duplex_call if that's # TODO: update with unique behaviour of the half_duplex_call if that's
# ever required by any of the tests. # ever required by any of the tests.
......
...@@ -52,6 +52,7 @@ def load_test_certs ...@@ -52,6 +52,7 @@ def load_test_certs
files.map { |f| File.open(File.join(data_dir, f)).read } files.map { |f| File.open(File.join(data_dir, f)).read }
end end
# A EnumeratorQueue wraps a Queue yielding the items added to it via each_item. # A EnumeratorQueue wraps a Queue yielding the items added to it via each_item.
class EnumeratorQueue class EnumeratorQueue
extend Forwardable extend Forwardable
...@@ -73,4 +74,19 @@ class EnumeratorQueue ...@@ -73,4 +74,19 @@ class EnumeratorQueue
end end
end end
# A PingPongEnumerator reads requests and responds one-by-one when enumerated
# via #each_item
class PingPongEnumerator
def initialize(reqs)
@reqs = reqs
end
def each_item
return enum_for(:each_item) unless block_given?
sr = Grpc::Testing::SimpleResponse
pl = Grpc::Testing::Payload
@reqs.each do |req|
yield sr.new(payload: pl.new(body: nulls(req.response_size)))
end
end
end
...@@ -49,16 +49,7 @@ class BenchmarkServiceImpl < Grpc::Testing::BenchmarkService::Service ...@@ -49,16 +49,7 @@ class BenchmarkServiceImpl < Grpc::Testing::BenchmarkService::Service
sr.new(payload: pl.new(body: nulls(req.response_size))) sr.new(payload: pl.new(body: nulls(req.response_size)))
end end
def streaming_call(reqs) def streaming_call(reqs)
q = EnumeratorQueue.new(self) PingPongEnumerator.new(reqs).each_item
Thread.new {
sr = Grpc::Testing::SimpleResponse
pl = Grpc::Testing::Payload
reqs.each do |req|
q.push(sr.new(payload: pl.new(body: nulls(req.response_size))))
end
q.push(self)
}
q.each_item
end end
end end
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment