Skip to content
Snippets Groups Projects
Commit 59a19a9d authored by Alexander Polcyn's avatar Alexander Polcyn
Browse files

make sure that client-side view of calls is robust

parent fb1e164c
No related branches found
No related tags found
No related merge requests found
...@@ -46,7 +46,7 @@ module GRPC ...@@ -46,7 +46,7 @@ module GRPC
extend Forwardable extend Forwardable
attr_reader :deadline, :metadata_sent, :metadata_to_send attr_reader :deadline, :metadata_sent, :metadata_to_send
def_delegators :@call, :cancel, :metadata, :write_flag, :write_flag=, def_delegators :@call, :cancel, :metadata, :write_flag, :write_flag=,
:peer, :peer_cert, :trailing_metadata :peer, :peer_cert, :trailing_metadata, :status
# client_invoke begins a client invocation. # client_invoke begins a client invocation.
# #
...@@ -105,6 +105,8 @@ module GRPC ...@@ -105,6 +105,8 @@ module GRPC
@input_stream_done = false @input_stream_done = false
@call_finished = false @call_finished = false
@call_finished_mu = Mutex.new @call_finished_mu = Mutex.new
@client_call_executed = false
@client_call_executed_mu = Mutex.new
end end
# Sends the initial metadata that has yet to be sent. # Sends the initial metadata that has yet to be sent.
...@@ -327,6 +329,7 @@ module GRPC ...@@ -327,6 +329,7 @@ module GRPC
# a list, multiple metadata for its key are sent # a list, multiple metadata for its key are sent
# @return [Object] the response received from the server # @return [Object] the response received from the server
def request_response(req, metadata: {}) def request_response(req, metadata: {})
raise_error_if_already_executed
ops = { ops = {
SEND_MESSAGE => @marshal.call(req), SEND_MESSAGE => @marshal.call(req),
SEND_CLOSE_FROM_CLIENT => nil, SEND_CLOSE_FROM_CLIENT => nil,
...@@ -369,6 +372,7 @@ module GRPC ...@@ -369,6 +372,7 @@ module GRPC
# a list, multiple metadata for its key are sent # a list, multiple metadata for its key are sent
# @return [Object] the response received from the server # @return [Object] the response received from the server
def client_streamer(requests, metadata: {}) def client_streamer(requests, metadata: {})
raise_error_if_already_executed
begin begin
merge_metadata_and_send_if_not_already_sent(metadata) merge_metadata_and_send_if_not_already_sent(metadata)
requests.each { |r| @call.run_batch(SEND_MESSAGE => @marshal.call(r)) } requests.each { |r| @call.run_batch(SEND_MESSAGE => @marshal.call(r)) }
...@@ -411,6 +415,7 @@ module GRPC ...@@ -411,6 +415,7 @@ module GRPC
# a list, multiple metadata for its key are sent # a list, multiple metadata for its key are sent
# @return [Enumerator|nil] a response Enumerator # @return [Enumerator|nil] a response Enumerator
def server_streamer(req, metadata: {}) def server_streamer(req, metadata: {})
raise_error_if_already_executed
ops = { ops = {
SEND_MESSAGE => @marshal.call(req), SEND_MESSAGE => @marshal.call(req),
SEND_CLOSE_FROM_CLIENT => nil SEND_CLOSE_FROM_CLIENT => nil
...@@ -468,6 +473,7 @@ module GRPC ...@@ -468,6 +473,7 @@ module GRPC
# a list, multiple metadata for its key are sent # a list, multiple metadata for its key are sent
# @return [Enumerator, nil] a response Enumerator # @return [Enumerator, nil] a response Enumerator
def bidi_streamer(requests, metadata: {}, &blk) def bidi_streamer(requests, metadata: {}, &blk)
raise_error_if_already_executed
# Metadata might have already been sent if this is an operation view # Metadata might have already been sent if this is an operation view
merge_metadata_and_send_if_not_already_sent(metadata) merge_metadata_and_send_if_not_already_sent(metadata)
bd = BidiCall.new(@call, bd = BidiCall.new(@call,
...@@ -572,6 +578,15 @@ module GRPC ...@@ -572,6 +578,15 @@ module GRPC
merge_metadata_to_send(metadata) && send_initial_metadata merge_metadata_to_send(metadata) && send_initial_metadata
end end
def raise_error_if_already_executed
@client_call_executed_mu.synchronize do
if @client_call_executed
fail GRPC::Core::CallError, 'attempting to re-run a call'
end
@client_call_executed = true
end
end
def self.view_class(*visible_methods) def self.view_class(*visible_methods)
Class.new do Class.new do
extend ::Forwardable extend ::Forwardable
......
...@@ -36,6 +36,33 @@ include GRPC::Core::StatusCodes ...@@ -36,6 +36,33 @@ include GRPC::Core::StatusCodes
include GRPC::Core::TimeConsts include GRPC::Core::TimeConsts
include GRPC::Core::CallOps include GRPC::Core::CallOps
# check that methods on a finished/closed call t crash
def check_op_view_of_finished_client_call_is_robust(op_view)
# use read_response_stream to try to iterate through
# possible response stream
fail('need something to attempt reads') unless block_given?
expect do
resp = op_view.execute
yield resp
end.to raise_error(GRPC::Core::CallError)
expect { op_view.start_call }.to raise_error(RuntimeError)
expect do
op_view.wait
op_view.cancel
op_view.metadata
op_view.trailing_metadata
op_view.status
op_view.cancelled?
op_view.deadline
op_view.write_flag
op_view.write_flag = 1
end.to_not raise_error
end
describe 'ClientStub' do describe 'ClientStub' do
let(:noop) { proc { |x| x } } let(:noop) { proc { |x| x } }
...@@ -231,15 +258,27 @@ describe 'ClientStub' do ...@@ -231,15 +258,27 @@ describe 'ClientStub' do
it_behaves_like 'request response' it_behaves_like 'request response'
it 'sends metadata to the server ok when running start_call first' do def run_op_view_metadata_test(run_start_call_first)
server_port = create_test_server server_port = create_test_server
host = "localhost:#{server_port}" host = "localhost:#{server_port}"
th = run_request_response(@sent_msg, @resp, @pass, th = run_request_response(@sent_msg, @resp, @pass,
k1: 'v1', k2: 'v2') k1: 'v1', k2: 'v2')
stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
expect(get_response(stub)).to eq(@resp) expect(
get_response(stub,
run_start_call_first: run_start_call_first)).to eq(@resp)
th.join th.join
end end
it 'sends metadata to the server ok when running start_call first' do
run_op_view_metadata_test(true)
check_op_view_of_finished_client_call_is_robust(@op) { |r| p r }
end
it 'does not crash when used after the call has been finished' do
run_op_view_metadata_test(false)
check_op_view_of_finished_client_call_is_robust(@op) { |r| p r }
end
end end
end end
...@@ -307,11 +346,23 @@ describe 'ClientStub' do ...@@ -307,11 +346,23 @@ describe 'ClientStub' do
it_behaves_like 'client streaming' it_behaves_like 'client streaming'
it 'sends metadata to the server ok when running start_call first' do def run_op_view_metadata_test(run_start_call_first)
th = run_client_streamer(@sent_msgs, @resp, @pass, **@metadata) th = run_client_streamer(@sent_msgs, @resp, @pass, **@metadata)
expect(get_response(@stub, run_start_call_first: true)).to eq(@resp) expect(
get_response(@stub,
run_start_call_first: run_start_call_first)).to eq(@resp)
th.join th.join
end end
it 'sends metadata to the server ok when running start_call first' do
run_op_view_metadata_test(true)
check_op_view_of_finished_client_call_is_robust(@op) { |r| p r }
end
it 'does not crash when used after the call has been finished' do
run_op_view_metadata_test(false)
check_op_view_of_finished_client_call_is_robust(@op) { |r| p r }
end
end end
end end
...@@ -377,7 +428,7 @@ describe 'ClientStub' do ...@@ -377,7 +428,7 @@ describe 'ClientStub' do
end end
end end
describe 'without a call operation', test2: true do describe 'without a call operation' do
def get_responses(stub, unmarshal: noop) def get_responses(stub, unmarshal: noop)
e = stub.server_streamer(@method, @sent_msg, noop, unmarshal, e = stub.server_streamer(@method, @sent_msg, noop, unmarshal,
metadata: @metadata) metadata: @metadata)
...@@ -405,16 +456,30 @@ describe 'ClientStub' do ...@@ -405,16 +456,30 @@ describe 'ClientStub' do
it_behaves_like 'server streaming' it_behaves_like 'server streaming'
it 'should send metadata to the server ok when start_call is run first' do def run_op_view_metadata_test(run_start_call_first)
server_port = create_test_server server_port = create_test_server
host = "localhost:#{server_port}" host = "localhost:#{server_port}"
th = run_server_streamer(@sent_msg, @replys, @fail, th = run_server_streamer(@sent_msg, @replys, @fail,
k1: 'v1', k2: 'v2') k1: 'v1', k2: 'v2')
stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
e = get_responses(stub, run_start_call_first: true) e = get_responses(stub, run_start_call_first: run_start_call_first)
expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus) expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)
th.join th.join
end end
it 'should send metadata to the server ok when start_call is run first' do
run_op_view_metadata_test(true)
check_op_view_of_finished_client_call_is_robust(@op) do |responses|
responses.each { |r| p r }
end
end
it 'does not crash when used after the call has been finished' do
run_op_view_metadata_test(false)
check_op_view_of_finished_client_call_is_robust(@op) do |responses|
responses.each { |r| p r }
end
end
end end
end end
...@@ -501,14 +566,28 @@ describe 'ClientStub' do ...@@ -501,14 +566,28 @@ describe 'ClientStub' do
it_behaves_like 'bidi streaming' it_behaves_like 'bidi streaming'
it 'can run start_call before executing the call' do def run_op_view_metadata_test(run_start_call_first)
th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys, th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys,
@pass) @pass)
stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
e = get_responses(stub, run_start_call_first: true) e = get_responses(stub, run_start_call_first: run_start_call_first)
expect(e.collect { |r| r }).to eq(@replys) expect(e.collect { |r| r }).to eq(@replys)
th.join th.join
end end
it 'can run start_call before executing the call' do
run_op_view_metadata_test(true)
check_op_view_of_finished_client_call_is_robust(@op) do |responses|
responses.each { |r| p r }
end
end
it 'doesnt crash when op_view used after call has finished' do
run_op_view_metadata_test(false)
check_op_view_of_finished_client_call_is_robust(@op) do |responses|
responses.each { |r| p r }
end
end
end end
end end
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment