Skip to content
Snippets Groups Projects
Commit aa57bab3 authored by Tim Emiola's avatar Tim Emiola
Browse files

Corrects the cancel_after_first_response behaviour

- introduces a #wait method on the call operation view
- invokes #wait on a Notifier that is created for all operations
- ensures the Notifier is invoked if necessary whenever a client request
  completes
- updates the interop_test to use op.wait before checking if the call
  was cancelled.
parent a15f08cc
No related branches found
No related tags found
No related merge requests found
...@@ -12,7 +12,7 @@ Metrics/AbcSize: ...@@ -12,7 +12,7 @@ Metrics/AbcSize:
# Offense count: 3 # Offense count: 3
# Configuration parameters: CountComments. # Configuration parameters: CountComments.
Metrics/ClassLength: Metrics/ClassLength:
Max: 192 Max: 200
# Offense count: 35 # Offense count: 35
# Configuration parameters: CountComments. # Configuration parameters: CountComments.
......
...@@ -284,7 +284,8 @@ class NamedTests ...@@ -284,7 +284,8 @@ class NamedTests
op = @stub.full_duplex_call(ppp.each_item, return_op: true) op = @stub.full_duplex_call(ppp.each_item, return_op: true)
ppp.canceller_op = op # causes ppp to cancel after the 1st message ppp.canceller_op = op # causes ppp to cancel after the 1st message
op.execute.each { |r| ppp.queue.push(r) } op.execute.each { |r| ppp.queue.push(r) }
assert(op.cancelled, 'call operation should be CANCELLED') op.wait
assert(op.cancelled, 'call operation was not CANCELLED')
p 'OK: cancel_after_first_response' p 'OK: cancel_after_first_response'
end end
......
...@@ -120,6 +120,7 @@ module GRPC ...@@ -120,6 +120,7 @@ module GRPC
@started = started @started = started
@unmarshal = unmarshal @unmarshal = unmarshal
@metadata_tag = metadata_tag @metadata_tag = metadata_tag
@op_notifier = nil
end end
# output_metadata are provides access to hash that can be used to # output_metadata are provides access to hash that can be used to
...@@ -148,6 +149,7 @@ module GRPC ...@@ -148,6 +149,7 @@ module GRPC
# operation provides a restricted view of this ActiveCall for use as # operation provides a restricted view of this ActiveCall for use as
# a Operation. # a Operation.
def operation def operation
@op_notifier = Notifier.new
Operation.new(self) Operation.new(self)
end end
...@@ -167,6 +169,7 @@ module GRPC ...@@ -167,6 +169,7 @@ module GRPC
batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, ops) batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, ops)
return unless assert_finished return unless assert_finished
@call.status = batch_result.status @call.status = batch_result.status
op_is_done
batch_result.check_status batch_result.check_status
end end
...@@ -184,6 +187,7 @@ module GRPC ...@@ -184,6 +187,7 @@ module GRPC
end end
end end
@call.status = batch_result.status @call.status = batch_result.status
op_is_done
batch_result.check_status batch_result.check_status
end end
...@@ -415,7 +419,7 @@ module GRPC ...@@ -415,7 +419,7 @@ module GRPC
def bidi_streamer(requests, **kw, &blk) def bidi_streamer(requests, **kw, &blk)
start_call(**kw) unless @started start_call(**kw) unless @started
bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline) bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline)
bd.run_on_client(requests, &blk) bd.run_on_client(requests, @op_notifier, &blk)
end end
# run_server_bidi orchestrates a BiDi stream processing on a server. # run_server_bidi orchestrates a BiDi stream processing on a server.
...@@ -434,6 +438,19 @@ module GRPC ...@@ -434,6 +438,19 @@ module GRPC
bd.run_on_server(gen_each_reply) bd.run_on_server(gen_each_reply)
end end
# Waits till an operation completes
def wait
return if @op_notifier.nil?
GRPC.logger.debug("active_call.wait: on #{@op_notifier}")
@op_notifier.wait
end
# Signals that an operation is done
def op_is_done
return if @op_notifier.nil?
@op_notifier.notify(self)
end
private private
# Starts the call if not already started # Starts the call if not already started
...@@ -468,6 +485,6 @@ module GRPC ...@@ -468,6 +485,6 @@ module GRPC
# Operation limits access to an ActiveCall's methods for use as # Operation limits access to an ActiveCall's methods for use as
# a Operation on the client. # a Operation on the client.
Operation = view_class(:cancel, :cancelled, :deadline, :execute, Operation = view_class(:cancel, :cancelled, :deadline, :execute,
:metadata, :status, :start_call) :metadata, :status, :start_call, :wait)
end end
end end
...@@ -66,6 +66,7 @@ module GRPC ...@@ -66,6 +66,7 @@ module GRPC
@cq = q @cq = q
@deadline = deadline @deadline = deadline
@marshal = marshal @marshal = marshal
@op_notifier = nil # signals completion on clients
@readq = Queue.new @readq = Queue.new
@unmarshal = unmarshal @unmarshal = unmarshal
end end
...@@ -76,8 +77,10 @@ module GRPC ...@@ -76,8 +77,10 @@ module GRPC
# block that can be invoked with each response. # block that can be invoked with each response.
# #
# @param requests the Enumerable of requests to send # @param requests the Enumerable of requests to send
# @op_notifier a Notifier used to signal completion
# @return an Enumerator of requests to yield # @return an Enumerator of requests to yield
def run_on_client(requests, &blk) def run_on_client(requests, op_notifier, &blk)
@op_notifier = op_notifier
@enq_th = Thread.new { write_loop(requests) } @enq_th = Thread.new { write_loop(requests) }
@loop_th = start_read_loop @loop_th = start_read_loop
each_queued_msg(&blk) each_queued_msg(&blk)
...@@ -105,6 +108,13 @@ module GRPC ...@@ -105,6 +108,13 @@ module GRPC
END_OF_READS = :end_of_reads END_OF_READS = :end_of_reads
END_OF_WRITES = :end_of_writes END_OF_WRITES = :end_of_writes
# signals that bidi operation is complete
def notify_done
return unless @op_notifier
GRPC.logger.debug("bidi-notify-done: notifying #{@op_notifier}")
@op_notifier.notify(self)
end
# each_queued_msg yields each message on this instances readq # each_queued_msg yields each message on this instances readq
# #
# - messages are added to the readq by #read_loop # - messages are added to the readq by #read_loop
...@@ -143,11 +153,13 @@ module GRPC ...@@ -143,11 +153,13 @@ module GRPC
@call.status = batch_result.status @call.status = batch_result.status
batch_result.check_status batch_result.check_status
GRPC.logger.debug("bidi-write-loop: done status #{@call.status}") GRPC.logger.debug("bidi-write-loop: done status #{@call.status}")
notify_done
end end
GRPC.logger.debug('bidi-write-loop: finished') GRPC.logger.debug('bidi-write-loop: finished')
rescue StandardError => e rescue StandardError => e
GRPC.logger.warn('bidi-write-loop: failed') GRPC.logger.warn('bidi-write-loop: failed')
GRPC.logger.warn(e) GRPC.logger.warn(e)
notify_done
raise e raise e
end end
......
...@@ -38,6 +38,6 @@ Logging.logger.root.appenders = Logging.appenders.stdout ...@@ -38,6 +38,6 @@ Logging.logger.root.appenders = Logging.appenders.stdout
Logging.logger.root.level = :info Logging.logger.root.level = :info
# TODO: provide command-line configuration for logging # TODO: provide command-line configuration for logging
Logging.logger['GRPC'].level = :debug Logging.logger['GRPC'].level = :info
Logging.logger['GRPC::ActiveCall'].level = :info Logging.logger['GRPC::ActiveCall'].level = :info
Logging.logger['GRPC::BidiCall'].level = :info Logging.logger['GRPC::BidiCall'].level = :info
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment