Skip to content
Snippets Groups Projects
Commit 24fdc179 authored by Tim Emiola's avatar Tim Emiola
Browse files

Ensures that bidi calls obtain metadata.

Fixes an omission from earlier PRs that adds support metadata.
parent b48236a2
No related branches found
No related tags found
No related merge requests found
...@@ -199,11 +199,7 @@ module GRPC ...@@ -199,11 +199,7 @@ module GRPC
# marshalled. # marshalled.
def remote_send(req, marshalled = false) def remote_send(req, marshalled = false)
GRPC.logger.debug("sending #{req}, marshalled? #{marshalled}") GRPC.logger.debug("sending #{req}, marshalled? #{marshalled}")
if marshalled payload = marshalled ? req : @marshal.call(req)
payload = req
else
payload = @marshal.call(req)
end
@call.run_batch(@cq, self, INFINITE_FUTURE, SEND_MESSAGE => payload) @call.run_batch(@cq, self, INFINITE_FUTURE, SEND_MESSAGE => payload)
end end
...@@ -417,7 +413,9 @@ module GRPC ...@@ -417,7 +413,9 @@ module GRPC
# @return [Enumerator, nil] a response Enumerator # @return [Enumerator, nil] a response Enumerator
def bidi_streamer(requests, **kw, &blk) def bidi_streamer(requests, **kw, &blk)
start_call(**kw) unless @started start_call(**kw) unless @started
bd = BidiCall.new(@call, @cq, @marshal, @unmarshal) bd = BidiCall.new(@call, @cq, @marshal, @unmarshal,
metadata_tag: @metadata_tag)
@metadata_tag = nil # run_on_client ensures metadata is read
bd.run_on_client(requests, @op_notifier, &blk) bd.run_on_client(requests, @op_notifier, &blk)
end end
......
...@@ -56,7 +56,8 @@ module GRPC ...@@ -56,7 +56,8 @@ module GRPC
# the call # the call
# @param marshal [Function] f(obj)->string that marshal requests # @param marshal [Function] f(obj)->string that marshal requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses # @param unmarshal [Function] f(string)->obj that unmarshals responses
def initialize(call, q, marshal, unmarshal) # @param metadata_tag [Object] tag object used to collect metadata
def initialize(call, q, marshal, unmarshal, metadata_tag: nil)
fail(ArgumentError, 'not a call') unless call.is_a? Core::Call fail(ArgumentError, 'not a call') unless call.is_a? Core::Call
unless q.is_a? Core::CompletionQueue unless q.is_a? Core::CompletionQueue
fail(ArgumentError, 'not a CompletionQueue') fail(ArgumentError, 'not a CompletionQueue')
...@@ -67,6 +68,7 @@ module GRPC ...@@ -67,6 +68,7 @@ module GRPC
@op_notifier = nil # signals completion on clients @op_notifier = nil # signals completion on clients
@readq = Queue.new @readq = Queue.new
@unmarshal = unmarshal @unmarshal = unmarshal
@metadata_tag = metadata_tag
end end
# Begins orchestration of the Bidi stream for a client sending requests. # Begins orchestration of the Bidi stream for a client sending requests.
...@@ -113,6 +115,18 @@ module GRPC ...@@ -113,6 +115,18 @@ module GRPC
@op_notifier.notify(self) @op_notifier.notify(self)
end end
# performs a read using @call.run_batch, ensures metadata is set up
def read_using_run_batch
ops = { RECV_MESSAGE => nil }
ops[RECV_INITIAL_METADATA] = nil unless @metadata_tag.nil?
batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, ops)
unless @metadata_tag.nil?
@call.metadata = batch_result.metadata
@metadata_tag = nil
end
batch_result
end
# each_queued_msg yields each message on this instances readq # each_queued_msg yields each message on this instances readq
# #
# - messages are added to the readq by #read_loop # - messages are added to the readq by #read_loop
...@@ -169,9 +183,7 @@ module GRPC ...@@ -169,9 +183,7 @@ module GRPC
loop do loop do
GRPC.logger.debug("bidi-read-loop: #{count}") GRPC.logger.debug("bidi-read-loop: #{count}")
count += 1 count += 1
# TODO: ensure metadata is read if available, currently it's not batch_result = read_using_run_batch
batch_result = @call.run_batch(@cq, read_tag, INFINITE_FUTURE,
RECV_MESSAGE => nil)
# handle the next message # handle the next message
if batch_result.message.nil? if batch_result.message.nil?
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment