diff --git a/examples/ruby/route_guide/route_guide_client.rb b/examples/ruby/route_guide/route_guide_client.rb index 330725ece0d202826f6871997e13d2c88479e335..01ea42c186f530c21d569a2d2ff3bdbecebf33e7 100755 --- a/examples/ruby/route_guide/route_guide_client.rb +++ b/examples/ruby/route_guide/route_guide_client.rb @@ -140,10 +140,24 @@ ROUTE_CHAT_NOTES = [ def run_route_chat(stub) p 'Route Chat' p '----------' - # TODO: decouple sending and receiving, i.e have the response enumerator run - # on its own thread. - resps = stub.route_chat(ROUTE_CHAT_NOTES) - resps.each { |r| p "received #{r.inspect}" } + sleeping_enumerator = SleepingEnumerator.new(ROUTE_CHAT_NOTES, 1) + stub.route_chat(sleeping_enumerator.each_item) { |r| p "received #{r.inspect}" } +end + +# SleepingEnumerator yields through items, and sleeps between each one +class SleepingEnumerator + def initialize(items, delay) + @items = items + @delay = delay + end + def each_item + return enum_for(:each_item) unless block_given? + @items.each do |item| + sleep @delay + p "next item to send is #{item.inspect}" + yield item + end + end end def main diff --git a/examples/ruby/route_guide/route_guide_server.rb b/examples/ruby/route_guide/route_guide_server.rb index a5a73a8bac1e188dc1dace3cb2689874f38c802d..41b9174b1d2086e1bf17b7f07a6739137a6a6714 100755 --- a/examples/ruby/route_guide/route_guide_server.rb +++ b/examples/ruby/route_guide/route_guide_server.rb @@ -100,28 +100,6 @@ class RectangleEnum end end -# A EnumeratorQueue wraps a Queue to yield the items added to it. -class EnumeratorQueue - extend Forwardable - def_delegators :@q, :push - - def initialize(sentinel) - @q = Queue.new - @sentinel = sentinel - @received_notes = {} - end - - def each_item - return enum_for(:each_item) unless block_given? - loop do - r = @q.pop - break if r.equal?(@sentinel) - fail r if r.is_a? Exception - yield r - end - end -end - # ServerImpl provides an implementation of the RouteGuide service. class ServerImpl < RouteGuide::Service # @param [Hash] feature_db {location => name} @@ -166,28 +144,33 @@ class ServerImpl < RouteGuide::Service end def route_chat(notes) - q = EnumeratorQueue.new(self) - # run a separate thread that processes the incoming requests - t = Thread.new do - begin - notes.each do |n| - key = { - 'latitude' => n.location.latitude, - 'longitude' => n.location.longitude - } - earlier_msgs = @received_notes[key] - @received_notes[key] << n.message - # send back the earlier messages at this point - earlier_msgs.each do |r| - q.push(RouteNote.new(location: n.location, message: r)) - end + RouteChatEnumerator.new(notes, @received_notes).each_item + end +end + +class RouteChatEnumerator + def initialize(notes, received_notes) + @notes = notes + @received_notes = received_notes + end + def each_item + return enum_for(:each_item) unless block_given? + begin + @notes.each do |n| + key = { + 'latitude' => n.location.latitude, + 'longitude' => n.location.longitude + } + earlier_msgs = @received_notes[key] + @received_notes[key] << n.message + # send back the earlier messages at this point + earlier_msgs.each do |r| + yield RouteNote.new(location: n.location, message: r) end - q.push(self) # signal completion - rescue StandardError => e - q.push(e) # signal completion via an error end + rescue StandardError => e + fail e # signal completion via an error end - q.each_item end end diff --git a/src/ruby/pb/test/server.rb b/src/ruby/pb/test/server.rb index 0808121661c5bb24693b29ed7de78399eb4e72ad..3f1e0a1ccfc00f94bd21879265af866936bd7f88 100755 --- a/src/ruby/pb/test/server.rb +++ b/src/ruby/pb/test/server.rb @@ -129,27 +129,36 @@ def nulls(l) [].pack('x' * l).force_encoding('ascii-8bit') end -# A EnumeratorQueue wraps a Queue yielding the items added to it via each_item. -class EnumeratorQueue - extend Forwardable - def_delegators :@q, :push - - def initialize(sentinel) - @q = Queue.new - @sentinel = sentinel - end +# A FullDuplexEnumerator passes requests to a block and yields generated responses +class FullDuplexEnumerator + include Grpc::Testing + include Grpc::Testing::PayloadType + def initialize(requests) + @requests = requests + end def each_item return enum_for(:each_item) unless block_given? - loop do - r = @q.pop - break if r.equal?(@sentinel) - fail r if r.is_a? Exception - yield r + GRPC.logger.info('interop-server: started receiving') + begin + cls = StreamingOutputCallResponse + @requests.each do |req| + req.response_parameters.each do |params| + resp_size = params.size + GRPC.logger.info("read a req, response size is #{resp_size}") + yield cls.new(payload: Payload.new(type: req.response_type, + body: nulls(resp_size))) + end + end + GRPC.logger.info('interop-server: finished receiving') + rescue StandardError => e + GRPC.logger.info('interop-server: failed') + GRPC.logger.warn(e) + fail e end end end - + # A runnable implementation of the schema-specified testing service, with each # service method implemented as required by the interop testing spec. class TestTarget < Grpc::Testing::TestService::Service @@ -182,31 +191,9 @@ class TestTarget < Grpc::Testing::TestService::Service def full_duplex_call(reqs) # reqs is a lazy Enumerator of the requests sent by the client. - q = EnumeratorQueue.new(self) - cls = StreamingOutputCallResponse - Thread.new do - begin - GRPC.logger.info('interop-server: started receiving') - reqs.each do |req| - req.response_parameters.each do |params| - resp_size = params.size - GRPC.logger.info("read a req, response size is #{resp_size}") - resp = cls.new(payload: Payload.new(type: req.response_type, - body: nulls(resp_size))) - q.push(resp) - end - end - GRPC.logger.info('interop-server: finished receiving') - q.push(self) - rescue StandardError => e - GRPC.logger.info('interop-server: failed') - GRPC.logger.warn(e) - q.push(e) # share the exception with the enumerator - end - end - q.each_item + FullDuplexEnumerator.new(reqs).each_item end - + def half_duplex_call(reqs) # TODO: update with unique behaviour of the half_duplex_call if that's # ever required by any of the tests. diff --git a/src/ruby/qps/qps-common.rb b/src/ruby/qps/qps-common.rb index 4119d600b197e47592b9f1916be99af926a1f20e..4714ccfdb7c3c05ed845d3d5acc4d0373cc92e41 100644 --- a/src/ruby/qps/qps-common.rb +++ b/src/ruby/qps/qps-common.rb @@ -52,6 +52,7 @@ def load_test_certs files.map { |f| File.open(File.join(data_dir, f)).read } end + # A EnumeratorQueue wraps a Queue yielding the items added to it via each_item. class EnumeratorQueue extend Forwardable @@ -73,4 +74,19 @@ class EnumeratorQueue end end +# A PingPongEnumerator reads requests and responds one-by-one when enumerated +# via #each_item +class PingPongEnumerator + def initialize(reqs) + @reqs = reqs + end + def each_item + return enum_for(:each_item) unless block_given? + sr = Grpc::Testing::SimpleResponse + pl = Grpc::Testing::Payload + @reqs.each do |req| + yield sr.new(payload: pl.new(body: nulls(req.response_size))) + end + end +end diff --git a/src/ruby/qps/server.rb b/src/ruby/qps/server.rb index f51f86d9a963fe954d2df19ed555ff3314b786df..d0c2073dd1e1ca4f2a23dd269c443b236fe1e9b2 100644 --- a/src/ruby/qps/server.rb +++ b/src/ruby/qps/server.rb @@ -49,16 +49,7 @@ class BenchmarkServiceImpl < Grpc::Testing::BenchmarkService::Service sr.new(payload: pl.new(body: nulls(req.response_size))) end def streaming_call(reqs) - q = EnumeratorQueue.new(self) - Thread.new { - sr = Grpc::Testing::SimpleResponse - pl = Grpc::Testing::Payload - reqs.each do |req| - q.push(sr.new(payload: pl.new(body: nulls(req.response_size)))) - end - q.push(self) - } - q.each_item + PingPongEnumerator.new(reqs).each_item end end