Newer
Older
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
require 'grpc'
murgatroid99
committed
Thread.abort_on_exception = true
murgatroid99
committed
t.abort_on_exception = true
def load_test_certs
test_root = File.join(File.dirname(File.dirname(__FILE__)), 'testdata')
files = ['ca.pem', 'server1.key', 'server1.pem']
files.map { |f| File.open(File.join(test_root, f)).read }
end
include GRPC::Core::StatusCodes
include GRPC::Core::TimeConsts
let(:noop) { proc { |x| x } }
before(:each) do
Thread.abort_on_exception = true
@server = nil
@method = 'an_rpc_method'
@pass = OK
@fail = INTERNAL
end
after(:each) do
murgatroid99
committed
@server.close(from_relative_time(2)) unless @server.nil?
end
describe '#new' do
let(:fake_host) { 'localhost:0' }
it 'can be created from a host and args' do
opts = { channel_args: { a_channel_arg: 'an_arg' } }
murgatroid99
committed
GRPC::ClientStub.new(fake_host, :this_channel_is_insecure, **opts)
end
expect(&blk).not_to raise_error
end
it 'can be created with an channel override' do
opts = {
channel_args: { a_channel_arg: 'an_arg' },
channel_override: @ch
}
murgatroid99
committed
GRPC::ClientStub.new(fake_host, :this_channel_is_insecure, **opts)
end
expect(&blk).not_to raise_error
end
it 'cannot be created with a bad channel override' do
opts = {
channel_args: { a_channel_arg: 'an_arg' },
channel_override: Object.new
}
murgatroid99
committed
GRPC::ClientStub.new(fake_host, :this_channel_is_insecure, **opts)
end
expect(&blk).to raise_error
end
it 'cannot be created with bad credentials' do
opts = { channel_args: { a_channel_arg: 'an_arg' } }
murgatroid99
committed
GRPC::ClientStub.new(fake_host, Object.new, **opts)
end
expect(&blk).to raise_error
end
it 'can be created with test test credentials' do
certs = load_test_certs
opts = {
channel_args: {
GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.fr',
a_channel_arg: 'an_arg'
}
}
creds = GRPC::Core::ChannelCredentials.new(certs[0], nil, nil)
murgatroid99
committed
GRPC::ClientStub.new(fake_host, creds, **opts)
end
expect(&blk).to_not raise_error
end
end
describe '#request_response' do
before(:each) do
@sent_msg, @resp = 'a_msg', 'a_reply'
end
shared_examples 'request response' do
it 'should send a request to/receive a reply from a server' do
server_port = create_test_server
th = run_request_response(@sent_msg, @resp, @pass)
murgatroid99
committed
stub = GRPC::ClientStub.new("localhost:#{server_port}",
:this_channel_is_insecure)
expect(get_response(stub)).to eq(@resp)
th.join
end
it 'should send metadata to the server ok' do
server_port = create_test_server
host = "localhost:#{server_port}"
th = run_request_response(@sent_msg, @resp, @pass,
k1: 'v1', k2: 'v2')
murgatroid99
committed
stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
expect(get_response(stub)).to eq(@resp)
th.join
end
it 'should send a request when configured using an override channel' do
server_port = create_test_server
alt_host = "localhost:#{server_port}"
th = run_request_response(@sent_msg, @resp, @pass)
ch = GRPC::Core::Channel.new(alt_host, nil, :this_channel_is_insecure)
murgatroid99
committed
stub = GRPC::ClientStub.new('ignored-host',
:this_channel_is_insecure,
channel_override: ch)
expect(get_response(stub)).to eq(@resp)
th.join
end
it 'should raise an error if the status is not OK' do
server_port = create_test_server
host = "localhost:#{server_port}"
th = run_request_response(@sent_msg, @resp, @fail)
murgatroid99
committed
stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
blk = proc { get_response(stub) }
expect(&blk).to raise_error(GRPC::BadStatus)
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
it 'should receive UNAUTHENTICATED if call credentials plugin fails' do
server_port = create_secure_test_server
th = run_request_response(@sent_msg, @resp, @pass)
certs = load_test_certs
secure_channel_creds = GRPC::Core::ChannelCredentials.new(
certs[0], nil, nil)
secure_stub_opts = {
channel_args: {
GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.fr'
}
}
stub = GRPC::ClientStub.new("localhost:#{server_port}",
secure_channel_creds, **secure_stub_opts)
error_message = 'Failing call credentials callback'
failing_auth = proc do
fail error_message
end
creds = GRPC::Core::CallCredentials.new(failing_auth)
error_occured = false
begin
get_response(stub, credentials: creds)
rescue GRPC::BadStatus => e
error_occured = true
expect(e.code).to eq(GRPC::Core::StatusCodes::UNAUTHENTICATED)
expect(e.details.include?(error_message)).to be true
end
expect(error_occured).to eq(true)
# Kill the server thread so tests can complete
th.kill
end
describe 'without a call operation' do
def get_response(stub, credentials: nil)
puts credentials.inspect
stub.request_response(@method, @sent_msg, noop, noop,
metadata: { k1: 'v1', k2: 'v2' },
credentials: credentials)
it_behaves_like 'request response'
end
describe 'via a call operation' do
def get_response(stub, credentials: nil)
op = stub.request_response(@method, @sent_msg, noop, noop,
murgatroid99
committed
metadata: { k1: 'v1', k2: 'v2' },
deadline: from_relative_time(2),
credentials: credentials)
expect(op).to be_a(GRPC::ActiveCall::Operation)
it_behaves_like 'request response'
end
end
describe '#client_streamer' do
shared_examples 'client streaming' do
before(:each) do
server_port = create_test_server
host = "localhost:#{server_port}"
murgatroid99
committed
@stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
@metadata = { k1: 'v1', k2: 'v2' }
@sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s }
it 'should send requests to/receive a reply from a server' do
th = run_client_streamer(@sent_msgs, @resp, @pass)
expect(get_response(@stub)).to eq(@resp)
it 'should send metadata to the server ok' do
th = run_client_streamer(@sent_msgs, @resp, @pass, **@metadata)
expect(get_response(@stub)).to eq(@resp)
th.join
end
it 'should raise an error if the status is not ok' do
th = run_client_streamer(@sent_msgs, @resp, @fail)
blk = proc { get_response(@stub) }
expect(&blk).to raise_error(GRPC::BadStatus)
th.join
end
it 'should raise ArgumentError if metadata contains invalid values' do
@metadata.merge!(k3: 3)
expect do
get_response(@stub)
end.to raise_error(ArgumentError,
/Header values must be of type string or array/)
end
end
describe 'without a call operation' do
def get_response(stub)
stub.client_streamer(@method, @sent_msgs, noop, noop,
metadata: @metadata)
end
it_behaves_like 'client streaming'
end
describe 'via a call operation' do
def get_response(stub)
op = stub.client_streamer(@method, @sent_msgs, noop, noop,
return_op: true, metadata: @metadata)
expect(op).to be_a(GRPC::ActiveCall::Operation)
it_behaves_like 'client streaming'
end
end
describe '#server_streamer' do
shared_examples 'server streaming' do
before(:each) do
@sent_msg = 'a_msg'
@replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s }
it 'should send a request to/receive replies from a server' do
server_port = create_test_server
host = "localhost:#{server_port}"
th = run_server_streamer(@sent_msg, @replys, @pass)
murgatroid99
committed
stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
expect(get_responses(stub).collect { |r| r }).to eq(@replys)
th.join
end
it 'should raise an error if the status is not ok' do
server_port = create_test_server
host = "localhost:#{server_port}"
th = run_server_streamer(@sent_msg, @replys, @fail)
murgatroid99
committed
stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
e = get_responses(stub)
expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)
it 'should send metadata to the server ok' do
server_port = create_test_server
host = "localhost:#{server_port}"
th = run_server_streamer(@sent_msg, @replys, @fail,
k1: 'v1', k2: 'v2')
murgatroid99
committed
stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
e = get_responses(stub)
expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)
end
describe 'without a call operation' do
def get_responses(stub)
e = stub.server_streamer(@method, @sent_msg, noop, noop,
metadata: { k1: 'v1', k2: 'v2' })
expect(e).to be_a(Enumerator)
e
end
it_behaves_like 'server streaming'
end
describe 'via a call operation' do
def get_responses(stub)
op = stub.server_streamer(@method, @sent_msg, noop, noop,
return_op: true,
metadata: { k1: 'v1', k2: 'v2' })
expect(op).to be_a(GRPC::ActiveCall::Operation)
expect(e).to be_a(Enumerator)
it_behaves_like 'server streaming'
end
end
describe '#bidi_streamer' do
shared_examples 'bidi streaming' do
before(:each) do
@sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s }
@replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s }
server_port = create_test_server
@host = "localhost:#{server_port}"
it 'supports sending all the requests first', bidi: true do
th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys,
murgatroid99
committed
stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
expect(e.collect { |r| r }).to eq(@replys)
th.join
end
it 'supports client-initiated ping pong', bidi: true do
th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true)
murgatroid99
committed
stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
expect(e.collect { |r| r }).to eq(@sent_msgs)
th.join
end
it 'supports a server-initiated ping pong', bidi: true do
th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, false)
murgatroid99
committed
stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
expect(e.collect { |r| r }).to eq(@sent_msgs)
th.join
end
end
describe 'without a call operation' do
def get_responses(stub)
e = stub.bidi_streamer(@method, @sent_msgs, noop, noop)
expect(e).to be_a(Enumerator)
it_behaves_like 'bidi streaming'
end
describe 'via a call operation' do
def get_responses(stub)
op = stub.bidi_streamer(@method, @sent_msgs, noop, noop,
expect(op).to be_a(GRPC::ActiveCall::Operation)
e = op.execute
expect(e).to be_a(Enumerator)
it_behaves_like 'bidi streaming'
def run_server_streamer(expected_input, replys, status, **kw)
wakey_thread do |notifier|
c = expect_server_to_be_invoked(notifier)
wanted_metadata.each do |k, v|
expect(c.metadata[k.to_s]).to eq(v)
end
expect(c.remote_read).to eq(expected_input)
replys.each { |r| c.remote_send(r) }
c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
def run_bidi_streamer_handle_inputs_first(expected_inputs, replys,
wakey_thread do |notifier|
c = expect_server_to_be_invoked(notifier)
expected_inputs.each { |i| expect(c.remote_read).to eq(i) }
replys.each { |r| c.remote_send(r) }
c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
def run_bidi_streamer_echo_ping_pong(expected_inputs, status, client_starts)
wakey_thread do |notifier|
c = expect_server_to_be_invoked(notifier)
expected_inputs.each do |i|
if client_starts
expect(c.remote_read).to eq(i)
c.remote_send(i)
else
c.remote_send(i)
expect(c.remote_read).to eq(i)
end
end
c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
def run_client_streamer(expected_inputs, resp, status, **kw)
wakey_thread do |notifier|
c = expect_server_to_be_invoked(notifier)
expected_inputs.each { |i| expect(c.remote_read).to eq(i) }
wanted_metadata.each do |k, v|
expect(c.metadata[k.to_s]).to eq(v)
end
c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
def run_request_response(expected_input, resp, status, **kw)
wakey_thread do |notifier|
c = expect_server_to_be_invoked(notifier)
expect(c.remote_read).to eq(expected_input)
wanted_metadata.each do |k, v|
expect(c.metadata[k.to_s]).to eq(v)
end
c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
def create_secure_test_server
certs = load_test_certs
secure_credentials = GRPC::Core::ServerCredentials.new(
nil, [{ private_key: certs[1], cert_chain: certs[2] }], false)
@server = GRPC::Core::Server.new(nil)
@server.add_http2_port('0.0.0.0:0', secure_credentials)
end
murgatroid99
committed
@server = GRPC::Core::Server.new(nil)
@server.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
def expect_server_to_be_invoked(notifier)
murgatroid99
committed
recvd_rpc = @server.request_call
recvd_call = recvd_rpc.call
recvd_call.metadata = recvd_rpc.metadata
murgatroid99
committed
recvd_call.run_batch(SEND_INITIAL_METADATA => nil)
GRPC::ActiveCall.new(recvd_call, noop, noop, INFINITE_FUTURE,
metadata_received: true)