Skip to content
Snippets Groups Projects
Commit dc3d561f authored by Alexander Polcyn's avatar Alexander Polcyn
Browse files

remove dedicated thread for read loop in ruby bidi calls

parent 97f6994b
Branches
Tags
No related merge requests found
...@@ -61,7 +61,6 @@ module GRPC ...@@ -61,7 +61,6 @@ module GRPC
@call = call @call = call
@marshal = marshal @marshal = marshal
@op_notifier = nil # signals completion on clients @op_notifier = nil # signals completion on clients
@readq = Queue.new
@unmarshal = unmarshal @unmarshal = unmarshal
@metadata_received = metadata_received @metadata_received = metadata_received
@reads_complete = false @reads_complete = false
...@@ -81,8 +80,7 @@ module GRPC ...@@ -81,8 +80,7 @@ module GRPC
def run_on_client(requests, op_notifier, &blk) def run_on_client(requests, op_notifier, &blk)
@op_notifier = op_notifier @op_notifier = op_notifier
@enq_th = Thread.new { write_loop(requests) } @enq_th = Thread.new { write_loop(requests) }
@loop_th = start_read_loop read_loop(&blk)
each_queued_msg(&blk)
end end
# Begins orchestration of the Bidi stream for a server generating replies. # Begins orchestration of the Bidi stream for a server generating replies.
...@@ -97,8 +95,7 @@ module GRPC ...@@ -97,8 +95,7 @@ module GRPC
# #
# @param gen_each_reply [Proc] generates the BiDi stream replies. # @param gen_each_reply [Proc] generates the BiDi stream replies.
def run_on_server(gen_each_reply) def run_on_server(gen_each_reply)
replys = gen_each_reply.call(each_queued_msg) replys = gen_each_reply.call(read_loop(is_client: false))
@loop_th = start_read_loop(is_client: false)
write_loop(replys, is_client: false) write_loop(replys, is_client: false)
end end
...@@ -135,24 +132,6 @@ module GRPC ...@@ -135,24 +132,6 @@ module GRPC
batch_result batch_result
end end
# each_queued_msg yields each message on this instances readq
#
# - messages are added to the readq by #read_loop
# - iteration ends when the instance itself is added
def each_queued_msg
return enum_for(:each_queued_msg) unless block_given?
count = 0
loop do
GRPC.logger.debug("each_queued_msg: waiting##{count}")
count += 1
req = @readq.pop
GRPC.logger.debug("each_queued_msg: req = #{req}")
fail req if req.is_a? StandardError
break if req.equal?(END_OF_READS)
yield req
end
end
def write_loop(requests, is_client: true) def write_loop(requests, is_client: true)
GRPC.logger.debug('bidi-write-loop: starting') GRPC.logger.debug('bidi-write-loop: starting')
count = 0 count = 0
...@@ -190,9 +169,10 @@ module GRPC ...@@ -190,9 +169,10 @@ module GRPC
raise e raise e
end end
# starts the read loop # Provides an enumerator that yields results of remote reads
def start_read_loop(is_client: true) def read_loop(is_client: true)
Thread.new do return enum_for(:read_loop,
is_client: is_client) unless block_given?
GRPC.logger.debug('bidi-read-loop: starting') GRPC.logger.debug('bidi-read-loop: starting')
begin begin
count = 0 count = 0
...@@ -213,19 +193,17 @@ module GRPC ...@@ -213,19 +193,17 @@ module GRPC
GRPC.logger.debug("bidi-read-loop: done status #{@call.status}") GRPC.logger.debug("bidi-read-loop: done status #{@call.status}")
end end
@readq.push(END_OF_READS)
GRPC.logger.debug('bidi-read-loop: done reading!') GRPC.logger.debug('bidi-read-loop: done reading!')
break break
end end
# push the latest read onto the queue and continue reading
res = @unmarshal.call(batch_result.message) res = @unmarshal.call(batch_result.message)
@readq.push(res) yield res
end end
rescue StandardError => e rescue StandardError => e
GRPC.logger.warn('bidi: read-loop failed') GRPC.logger.warn('bidi: read-loop failed')
GRPC.logger.warn(e) GRPC.logger.warn(e)
@readq.push(e) # let each_queued_msg terminate with this error raise e
end end
GRPC.logger.debug('bidi-read-loop: finished') GRPC.logger.debug('bidi-read-loop: finished')
@reads_complete = true @reads_complete = true
...@@ -233,4 +211,3 @@ module GRPC ...@@ -233,4 +211,3 @@ module GRPC
end end
end end
end end
end
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment