diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index 4260d854376b56884e68a8ce9cbbd51ee5890adc..d43a9e7a4b714505d435098adef9d01c60b28952 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -58,7 +58,7 @@ module GRPC include Core::TimeConsts include Core::CallOps extend Forwardable - attr_reader(:deadline) + attr_reader :deadline, :metadata_sent, :metadata_to_send def_delegators :@call, :cancel, :metadata, :write_flag, :write_flag=, :peer, :peer_cert, :trailing_metadata @@ -101,7 +101,7 @@ module GRPC # @param metadata_received [true|false] indicates if metadata has already # been received. Should always be true for server calls def initialize(call, marshal, unmarshal, deadline, started: true, - metadata_received: false) + metadata_received: false, metadata_to_send: nil) fail(TypeError, '!Core::Call') unless call.is_a? Core::Call @call = call @deadline = deadline @@ -110,6 +110,14 @@ module GRPC @metadata_received = metadata_received @metadata_sent = started @op_notifier = nil + + fail(ArgumentError, 'Already sent md') if started && metadata_to_send + @metadata_to_send = metadata_to_send || {} unless started + end + + def send_initial_metadata + fail 'Already sent metadata' if @metadata_sent + start_call(@metadata_to_send) end # output_metadata are provides access to hash that can be used to @@ -187,7 +195,7 @@ module GRPC # @param marshalled [false, true] indicates if the object is already # marshalled. def remote_send(req, marshalled = false) - # TODO(murgatroid99): ensure metadata was sent + start_call(@metadata_to_send) unless @metadata_sent GRPC.logger.debug("sending #{req}, marshalled? #{marshalled}") payload = marshalled ? req : @marshal.call(req) @call.run_batch(SEND_MESSAGE => payload) @@ -203,6 +211,7 @@ module GRPC # list, mulitple metadata for its key are sent def send_status(code = OK, details = '', assert_finished = false, metadata: {}) + start_call unless @metadata_sent ops = { SEND_STATUS_FROM_SERVER => Struct::Status.new(code, details, metadata) } @@ -392,9 +401,12 @@ module GRPC # a list, multiple metadata for its key are sent # @return [Enumerator, nil] a response Enumerator def bidi_streamer(requests, metadata: {}, &blk) - start_call(metadata) - bd = BidiCall.new(@call, @marshal, @unmarshal, + start_call(metadata) unless @metadata_sent + bd = BidiCall.new(@call, + @marshal, + @unmarshal, metadata_received: @metadata_received) + bd.run_on_client(requests, @op_notifier, &blk) end @@ -410,8 +422,12 @@ module GRPC # # @param gen_each_reply [Proc] generates the BiDi stream replies def run_server_bidi(gen_each_reply) - bd = BidiCall.new(@call, @marshal, @unmarshal, - metadata_received: @metadata_received) + bd = BidiCall.new(@call, + @marshal, + @unmarshal, + metadata_received: @metadata_received, + req_view: MultiReqView.new(self)) + bd.run_on_server(gen_each_reply) end @@ -428,6 +444,11 @@ module GRPC @op_notifier.notify(self) end + def merge_metadata_to_send(new_metadata = {}) + fail('cant change metadata after already sent') if @metadata_sent + @metadata_to_send.merge!(new_metadata) + end + private # Starts the call if not already started @@ -454,12 +475,20 @@ module GRPC # SingleReqView limits access to an ActiveCall's methods for use in server # handlers that receive just one request. SingleReqView = view_class(:cancelled?, :deadline, :metadata, - :output_metadata, :peer, :peer_cert) + :output_metadata, :peer, :peer_cert, + :send_initial_metadata, + :metadata_to_send, + :merge_metadata_to_send, + :metadata_sent) # MultiReqView limits access to an ActiveCall's methods for use in # server client_streamer handlers. MultiReqView = view_class(:cancelled?, :deadline, :each_queued_msg, - :each_remote_read, :metadata, :output_metadata) + :each_remote_read, :metadata, :output_metadata, + :send_initial_metadata, + :metadata_to_send, + :merge_metadata_to_send, + :metadata_sent) # Operation limits access to an ActiveCall's methods for use as # a Operation on the client. diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb index 425dc3e5198756420b111c8f385208b2314fee4a..0b6ef4918c6621638a741855b668c808ea024107 100644 --- a/src/ruby/lib/grpc/generic/bidi_call.rb +++ b/src/ruby/lib/grpc/generic/bidi_call.rb @@ -56,7 +56,8 @@ module GRPC # @param unmarshal [Function] f(string)->obj that unmarshals responses # @param metadata_received [true|false] indicates if metadata has already # been received. Should always be true for server calls - def initialize(call, marshal, unmarshal, metadata_received: false) + def initialize(call, marshal, unmarshal, metadata_received: false, + req_view: nil) fail(ArgumentError, 'not a call') unless call.is_a? Core::Call @call = call @marshal = marshal @@ -68,6 +69,7 @@ module GRPC @writes_complete = false @complete = false @done_mutex = Mutex.new + @req_view = req_view end # Begins orchestration of the Bidi stream for a client sending requests. @@ -97,7 +99,15 @@ module GRPC # # @param gen_each_reply [Proc] generates the BiDi stream replies. def run_on_server(gen_each_reply) - replys = gen_each_reply.call(each_queued_msg) + # Pass in the optional call object parameter if possible + if gen_each_reply.arity == 1 + replys = gen_each_reply.call(each_queued_msg) + elsif gen_each_reply.arity == 2 + replys = gen_each_reply.call(each_queued_msg, @req_view) + else + fail 'Illegal arity of reply generator' + end + @loop_th = start_read_loop(is_client: false) write_loop(replys, is_client: false) end @@ -162,6 +172,8 @@ module GRPC payload = @marshal.call(req) # Fails if status already received begin + @req_view.send_initial_metadata unless + @req_view.nil? || @req_view.metadata_sent @call.run_batch(SEND_MESSAGE => payload) rescue GRPC::Core::CallError => e # This is almost definitely caused by a status arriving while still diff --git a/src/ruby/lib/grpc/generic/rpc_desc.rb b/src/ruby/lib/grpc/generic/rpc_desc.rb index 913f55d0d3ba56552ba712261fb32d2e7e12ce68..584fe781698388240f0811125fe4c9e1692d4848 100644 --- a/src/ruby/lib/grpc/generic/rpc_desc.rb +++ b/src/ruby/lib/grpc/generic/rpc_desc.rb @@ -104,7 +104,14 @@ module GRPC end def assert_arity_matches(mth) - if request_response? || server_streamer? + # A bidi handler function can optionally be passed a second + # call object parameter for access to metadata, cancelling, etc. + if bidi_streamer? + if mth.arity != 2 && mth.arity != 1 + fail arity_error(mth, 2, "should be #{mth.name}(req, call) or " \ + "#{mth.name}(req)") + end + elsif request_response? || server_streamer? if mth.arity != 2 fail arity_error(mth, 2, "should be #{mth.name}(req, call)") end diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index c92a532a50085ab03ec6d2c6bac66bc377d56937..b1d30b8e38c19e301f436f63020cbb5b25ac7095 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -339,8 +339,11 @@ module GRPC return an_rpc if @pool.jobs_waiting <= @max_waiting_requests GRPC.logger.warn("NOT AVAILABLE: too many jobs_waiting: #{an_rpc}") noop = proc { |x| x } + + # Create a new active call that knows that metadata hasn't been + # sent yet c = ActiveCall.new(an_rpc.call, noop, noop, an_rpc.deadline, - metadata_received: true) + metadata_received: true, started: false) c.send_status(GRPC::Core::StatusCodes::RESOURCE_EXHAUSTED, '') nil end @@ -351,8 +354,11 @@ module GRPC return an_rpc if rpc_descs.key?(mth) GRPC.logger.warn("UNIMPLEMENTED: #{an_rpc}") noop = proc { |x| x } + + # Create a new active call that knows that + # metadata hasn't been sent yet c = ActiveCall.new(an_rpc.call, noop, noop, an_rpc.deadline, - metadata_received: true) + metadata_received: true, started: false) c.send_status(GRPC::Core::StatusCodes::UNIMPLEMENTED, '') nil end @@ -400,17 +406,20 @@ module GRPC unless @connect_md_proc.nil? connect_md = @connect_md_proc.call(an_rpc.method, an_rpc.metadata) end - an_rpc.call.run_batch(SEND_INITIAL_METADATA => connect_md) return nil unless available?(an_rpc) return nil unless implemented?(an_rpc) - # Create the ActiveCall + # Create the ActiveCall. Indicate that metadata hasnt been sent yet. GRPC.logger.info("deadline is #{an_rpc.deadline}; (now=#{Time.now})") rpc_desc = rpc_descs[an_rpc.method.to_sym] - c = ActiveCall.new(an_rpc.call, rpc_desc.marshal_proc, - rpc_desc.unmarshal_proc(:input), an_rpc.deadline, - metadata_received: true) + c = ActiveCall.new(an_rpc.call, + rpc_desc.marshal_proc, + rpc_desc.unmarshal_proc(:input), + an_rpc.deadline, + metadata_received: true, + started: false, + metadata_to_send: connect_md) mth = an_rpc.method.to_sym [c, mth] end diff --git a/src/ruby/spec/generic/active_call_spec.rb b/src/ruby/spec/generic/active_call_spec.rb index 018580e0dfa51e674e07803653e62dc534d67a54..0c72be9a98a0892aea5be01497a682a506144727 100644 --- a/src/ruby/spec/generic/active_call_spec.rb +++ b/src/ruby/spec/generic/active_call_spec.rb @@ -60,8 +60,10 @@ describe GRPC::ActiveCall do end describe '#multi_req_view' do - it 'exposes a fixed subset of the ActiveCall methods' do - want = %w(cancelled?, deadline, each_remote_read, metadata, shutdown) + it 'exposes a fixed subset of the ActiveCall.methods' do + want = %w(cancelled?, deadline, each_remote_read, metadata, \ + shutdown, peer, peer_cert, send_initial_metadata, \ + initial_metadata_sent) v = @client_call.multi_req_view want.each do |w| expect(v.methods.include?(w)) @@ -70,8 +72,10 @@ describe GRPC::ActiveCall do end describe '#single_req_view' do - it 'exposes a fixed subset of the ActiveCall methods' do - want = %w(cancelled?, deadline, metadata, shutdown) + it 'exposes a fixed subset of the ActiveCall.methods' do + want = %w(cancelled?, deadline, metadata, shutdown, \ + send_initial_metadata, metadata_to_send, \ + merge_metadata_to_send, initial_metadata_sent) v = @client_call.single_req_view want.each do |w| expect(v.methods.include?(w)) @@ -149,6 +153,158 @@ describe GRPC::ActiveCall do end end + describe 'sending initial metadata', send_initial_metadata: true do + it 'sends metadata before sending a message if it hasnt been sent yet' do + call = make_test_call + @client_call = ActiveCall.new( + call, + @pass_through, + @pass_through, + deadline, + started: false) + + metadata = { key: 'dummy_val', other: 'other_val' } + expect(@client_call.metadata_sent).to eq(false) + @client_call.merge_metadata_to_send(metadata) + + message = 'dummy message' + + expect(call).to( + receive(:run_batch) + .with( + hash_including( + CallOps::SEND_INITIAL_METADATA => metadata)).once) + + expect(call).to( + receive(:run_batch).with(hash_including( + CallOps::SEND_MESSAGE => message)).once) + @client_call.remote_send(message) + + expect(@client_call.metadata_sent).to eq(true) + end + + it 'doesnt send metadata if it thinks its already been sent' do + call = make_test_call + + @client_call = ActiveCall.new(call, + @pass_through, + @pass_through, + deadline) + + expect(@client_call.metadata_sent).to eql(true) + expect(call).to( + receive(:run_batch).with(hash_including( + CallOps::SEND_INITIAL_METADATA)).never) + + @client_call.remote_send('test message') + end + + it 'sends metadata if it is explicitly sent and ok to do so' do + call = make_test_call + + @client_call = ActiveCall.new(call, + @pass_through, + @pass_through, + deadline, + started: false) + + expect(@client_call.metadata_sent).to eql(false) + + metadata = { test_key: 'val' } + @client_call.merge_metadata_to_send(metadata) + expect(@client_call.metadata_to_send).to eq(metadata) + + expect(call).to( + receive(:run_batch).with(hash_including( + CallOps::SEND_INITIAL_METADATA => + metadata)).once) + @client_call.send_initial_metadata + end + + it 'explicit sending fails if metadata has already been sent' do + call = make_test_call + + @client_call = ActiveCall.new(call, + @pass_through, + @pass_through, + deadline) + + expect(@client_call.metadata_sent).to eql(true) + + blk = proc do + @client_call.send_initial_metadata + end + + expect { blk.call }.to raise_error + end + end + + describe '#merge_metadata_to_send', merge_metadata_to_send: true do + it 'adds to existing metadata when there is existing metadata to send' do + call = make_test_call + starting_metadata = { k1: 'key1_val', k2: 'key2_val' } + @client_call = ActiveCall.new( + call, + @pass_through, @pass_through, + deadline, + started: false, + metadata_to_send: starting_metadata) + + expect(@client_call.metadata_to_send).to eq(starting_metadata) + + @client_call.merge_metadata_to_send( + k3: 'key3_val', + k4: 'key4_val') + + expected_md_to_send = { + k1: 'key1_val', + k2: 'key2_val', + k3: 'key3_val', + k4: 'key4_val' } + + expect(@client_call.metadata_to_send).to eq(expected_md_to_send) + + @client_call.merge_metadata_to_send(k5: 'key5_val') + expected_md_to_send.merge!(k5: 'key5_val') + expect(@client_call.metadata_to_send).to eq(expected_md_to_send) + end + + it 'overrides existing metadata if adding metadata with an existing key' do + call = make_test_call + starting_metadata = { k1: 'key1_val', k2: 'key2_val' } + @client_call = ActiveCall.new( + call, + @pass_through, + @pass_through, + deadline, + started: false, + metadata_to_send: starting_metadata) + + expect(@client_call.metadata_to_send).to eq(starting_metadata) + @client_call.merge_metadata_to_send(k1: 'key1_new_val') + expect(@client_call.metadata_to_send).to eq(k1: 'key1_new_val', + k2: 'key2_val') + end + + it 'fails when initial metadata has already been sent' do + call = make_test_call + @client_call = ActiveCall.new( + call, + @pass_through, + @pass_through, + deadline, + started: true) + + expect(@client_call.metadata_sent).to eq(true) + + blk = proc do + @client_call.merge_metadata_to_send(k1: 'key1_val') + end + + expect { blk.call }.to raise_error + end + end + describe '#client_invoke' do it 'sends metadata to the server when present' do call = make_test_call @@ -163,7 +319,26 @@ describe GRPC::ActiveCall do end end - describe '#remote_read' do + describe '#send_status', send_status: true do + it 'works when no metadata or messages have been sent yet' do + call = make_test_call + ActiveCall.client_invoke(call) + + recvd_rpc = @server.request_call + server_call = ActiveCall.new( + recvd_rpc.call, + @pass_through, + @pass_through, + deadline, + started: false) + + expect(server_call.metadata_sent).to eq(false) + blk = proc { server_call.send_status(OK) } + expect { blk.call }.to_not raise_error + end + end + + describe '#remote_read', remote_read: true do it 'reads the response sent by a server' do call = make_test_call ActiveCall.client_invoke(call) @@ -205,6 +380,31 @@ describe GRPC::ActiveCall do expect(client_call.metadata).to eq(expected) end + it 'get a status from server when nothing else sent from server' do + client_call = make_test_call + ActiveCall.client_invoke(client_call) + + recvd_rpc = @server.request_call + recvd_call = recvd_rpc.call + + server_call = ActiveCall.new( + recvd_call, + @pass_through, + @pass_through, + deadline, + started: false) + + server_call.send_status(OK, 'OK') + + # Check that we can receive initial metadata and a status + client_call.run_batch( + CallOps::RECV_INITIAL_METADATA => nil) + batch_result = client_call.run_batch( + CallOps::RECV_STATUS_ON_CLIENT => nil) + + expect(batch_result.status.code).to eq(OK) + end + it 'get a nil msg before a status when an OK status is sent' do call = make_test_call ActiveCall.client_invoke(call) @@ -329,6 +529,125 @@ describe GRPC::ActiveCall do end end + # Test sending of the initial metadata in #run_server_bidi + # from the server handler both implicitly and explicitly, + # when the server handler function has one argument and two arguments + describe '#run_server_bidi sanity tests', run_server_bidi: true do + it 'sends the initial metadata implicitly if not already sent' do + requests = ['first message', 'second message'] + server_to_client_metadata = { 'test_key' => 'test_val' } + server_status = OK + + client_call = make_test_call + client_call.run_batch(CallOps::SEND_INITIAL_METADATA => {}) + + recvd_rpc = @server.request_call + recvd_call = recvd_rpc.call + server_call = ActiveCall.new(recvd_call, + @pass_through, + @pass_through, + deadline, + metadata_received: true, + started: false, + metadata_to_send: server_to_client_metadata) + + # Server handler that doesn't have access to a "call" + # It echoes the requests + fake_gen_each_reply_with_no_call_param = proc do |msgs| + msgs + end + + server_thread = Thread.new do + server_call.run_server_bidi( + fake_gen_each_reply_with_no_call_param) + server_call.send_status(server_status) + end + + # Send the requests and send a close so the server can send a status + requests.each do |message| + client_call.run_batch(CallOps::SEND_MESSAGE => message) + end + client_call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) + + server_thread.join + + # Expect that initial metadata was sent, + # the requests were echoed, and a status was sent + batch_result = client_call.run_batch( + CallOps::RECV_INITIAL_METADATA => nil) + expect(batch_result.metadata).to eq(server_to_client_metadata) + + requests.each do |message| + batch_result = client_call.run_batch( + CallOps::RECV_MESSAGE => nil) + expect(batch_result.message).to eq(message) + end + + batch_result = client_call.run_batch( + CallOps::RECV_STATUS_ON_CLIENT => nil) + expect(batch_result.status.code).to eq(server_status) + end + + it 'sends the metadata when sent explicitly and not already sent' do + requests = ['first message', 'second message'] + server_to_client_metadata = { 'test_key' => 'test_val' } + server_status = OK + + client_call = make_test_call + client_call.run_batch(CallOps::SEND_INITIAL_METADATA => {}) + + recvd_rpc = @server.request_call + recvd_call = recvd_rpc.call + server_call = ActiveCall.new(recvd_call, + @pass_through, + @pass_through, + deadline, + metadata_received: true, + started: false) + + # Fake server handler that has access to a "call" object and + # uses it to explicitly update and sent the initial metadata + fake_gen_each_reply_with_call_param = proc do |msgs, call_param| + call_param.merge_metadata_to_send(server_to_client_metadata) + call_param.send_initial_metadata + msgs + end + + server_thread = Thread.new do + server_call.run_server_bidi( + fake_gen_each_reply_with_call_param) + server_call.send_status(server_status) + end + + # Send requests and a close from the client so the server + # can send a status + requests.each do |message| + client_call.run_batch( + CallOps::SEND_MESSAGE => message) + end + client_call.run_batch( + CallOps::SEND_CLOSE_FROM_CLIENT => nil) + + server_thread.join + + # Verify that the correct metadata was sent, the requests + # were echoed, and the correct status was sent + batch_result = client_call.run_batch( + CallOps::RECV_INITIAL_METADATA => nil) + expect(batch_result.metadata).to eq(server_to_client_metadata) + + requests.each do |message| + batch_result = client_call.run_batch( + CallOps::RECV_MESSAGE => nil) + expect(batch_result.message).to eq(message) + end + + batch_result = client_call.run_batch( + CallOps::RECV_STATUS_ON_CLIENT => nil) + expect(batch_result.status.code).to eq(server_status) + end + end + def expect_server_to_receive(sent_text, **kw) c = expect_server_to_be_invoked(**kw) expect(c.remote_read).to eq(sent_text) diff --git a/src/ruby/spec/generic/rpc_desc_spec.rb b/src/ruby/spec/generic/rpc_desc_spec.rb index d2080b7ca2d8b8721f57ce6d4a7520387246854e..1a895005bc32f13887f1658f8822830b09206156 100644 --- a/src/ruby/spec/generic/rpc_desc_spec.rb +++ b/src/ruby/spec/generic/rpc_desc_spec.rb @@ -196,6 +196,9 @@ describe GRPC::RpcDesc do def fake_svstream(_arg1, _arg2) end + def fake_three_args(_arg1, _arg2, _arg3) + end + it 'raises when a request_response does not have 2 args' do [:fake_clstream, :no_arg].each do |mth| blk = proc do @@ -244,8 +247,8 @@ describe GRPC::RpcDesc do expect(&blk).to_not raise_error end - it 'raises when a bidi streamer does not have 1 arg' do - [:fake_svstream, :no_arg].each do |mth| + it 'raises when a bidi streamer does not have 1 or 2 args' do + [:fake_three_args, :no_arg].each do |mth| blk = proc do @bidi_streamer.assert_arity_matches(method(mth)) end @@ -259,6 +262,13 @@ describe GRPC::RpcDesc do end expect(&blk).to_not raise_error end + + it 'passes when a bidi streamer has 2 args' do + blk = proc do + @bidi_streamer.assert_arity_matches(method(:fake_svstream)) + end + expect(&blk).to_not raise_error + end end describe '#request_response?' do