Newer
Older
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include <grpc++/client_context.h>
#include <grpc++/create_channel.h>
#include <grpc++/security/auth_metadata_processor.h>
#include <grpc++/security/credentials.h>
#include <grpc++/security/server_credentials.h>
#include <grpc++/server.h>
#include <grpc++/server_builder.h>
#include <grpc++/server_context.h>
#include <grpc/support/thd.h>
#include <grpc/support/time.h>
#include "src/core/lib/security/credentials/credentials.h"
#include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
#include "test/cpp/end2end/test_service_impl.h"
#include "test/cpp/util/test_credentials_provider.h"
using grpc::testing::EchoRequest;
using grpc::testing::EchoResponse;
using grpc::testing::kTlsCredentialsType;
bool CheckIsLocalhost(const grpc::string& addr) {
const grpc::string kIpv6("ipv6:[::1]:");
const grpc::string kIpv4MappedIpv6("ipv6:[::ffff:127.0.0.1]:");
const grpc::string kIpv4("ipv4:127.0.0.1:");
return addr.substr(0, kIpv4.size()) == kIpv4 ||
addr.substr(0, kIpv4MappedIpv6.size()) == kIpv4MappedIpv6 ||
addr.substr(0, kIpv6.size()) == kIpv6;
}
const char kTestCredsPluginErrorMsg[] = "Could not find plugin metadata.";
class TestMetadataCredentialsPlugin : public MetadataCredentialsPlugin {
public:
static const char kGoodMetadataKey[];
static const char kBadMetadataKey[];
TestMetadataCredentialsPlugin(grpc::string_ref metadata_key,
grpc::string_ref metadata_value,
bool is_blocking, bool is_successful)
: metadata_key_(metadata_key.data(), metadata_key.length()),
metadata_value_(metadata_value.data(), metadata_value.length()),
is_blocking_(is_blocking),
is_successful_(is_successful) {}
bool IsBlocking() const override { return is_blocking_; }
Status GetMetadata(
grpc::string_ref service_url, grpc::string_ref method_name,
const grpc::AuthContext& channel_auth_context,
std::multimap<grpc::string, grpc::string>* metadata) override {
EXPECT_GT(service_url.length(), 0UL);
EXPECT_GT(method_name.length(), 0UL);
EXPECT_TRUE(channel_auth_context.IsPeerAuthenticated());
EXPECT_TRUE(metadata != nullptr);
if (is_successful_) {
metadata->insert(std::make_pair(metadata_key_, metadata_value_));
return Status::OK;
} else {
return Status(StatusCode::NOT_FOUND, kTestCredsPluginErrorMsg);
grpc::string metadata_value_;
bool is_blocking_;
bool is_successful_;
};
const char TestMetadataCredentialsPlugin::kBadMetadataKey[] =
"TestPluginMetadata";
const char TestMetadataCredentialsPlugin::kGoodMetadataKey[] =
"test-plugin-metadata";
class TestAuthMetadataProcessor : public AuthMetadataProcessor {
public:
static const char kGoodGuy[];
TestAuthMetadataProcessor(bool is_blocking) : is_blocking_(is_blocking) {}
std::shared_ptr<CallCredentials> GetCompatibleClientCreds() {
return MetadataCredentialsFromPlugin(
std::unique_ptr<MetadataCredentialsPlugin>(
new TestMetadataCredentialsPlugin(
TestMetadataCredentialsPlugin::kGoodMetadataKey, kGoodGuy,
is_blocking_, true)));
std::shared_ptr<CallCredentials> GetIncompatibleClientCreds() {
return MetadataCredentialsFromPlugin(
std::unique_ptr<MetadataCredentialsPlugin>(
new TestMetadataCredentialsPlugin(
TestMetadataCredentialsPlugin::kGoodMetadataKey, "Mr Hyde",
is_blocking_, true)));
bool IsBlocking() const override { return is_blocking_; }
Status Process(const InputMetadata& auth_metadata, AuthContext* context,
OutputMetadata* consumed_auth_metadata,
EXPECT_TRUE(consumed_auth_metadata != nullptr);
EXPECT_TRUE(context != nullptr);
EXPECT_TRUE(response_metadata != nullptr);
auth_metadata.find(TestMetadataCredentialsPlugin::kGoodMetadataKey);
EXPECT_NE(auth_md, auth_metadata.end());
string_ref auth_md_value = auth_md->second;
if (auth_md_value == kGoodGuy) {
context->AddProperty(kIdentityPropName, kGoodGuy);
context->SetPeerIdentityPropertyName(kIdentityPropName);
consumed_auth_metadata->insert(std::make_pair(
string(auth_md->first.data(), auth_md->first.length()),
string(auth_md->second.data(), auth_md->second.length())));
return Status::OK;
} else {
return Status(StatusCode::UNAUTHENTICATED,
string("Invalid principal: ") +
string(auth_md_value.data(), auth_md_value.length()));
}
}
static const char kIdentityPropName[];
bool is_blocking_;
};
const char TestAuthMetadataProcessor::kGoodGuy[] = "Dr Jekyll";
const char TestAuthMetadataProcessor::kIdentityPropName[] = "novel identity";
class Proxy : public ::grpc::testing::EchoTestService::Service {
Proxy(std::shared_ptr<Channel> channel)
: stub_(grpc::testing::EchoTestService::NewStub(channel)) {}
Status Echo(ServerContext* server_context, const EchoRequest* request,
std::unique_ptr<ClientContext> client_context =
ClientContext::FromServerContext(*server_context);
return stub_->Echo(client_context.get(), *request, response);
}
private:
std::unique_ptr< ::grpc::testing::EchoTestService::Stub> stub_;
class TestServiceImplDupPkg
: public ::grpc::testing::duplicate::EchoTestService::Service {
public:
Status Echo(ServerContext* context, const EchoRequest* request,
response->set_message("no package");
return Status::OK;
}
};
class TestScenario {
public:
TestScenario(bool proxy, const grpc::string& creds_type)
: use_proxy(proxy), credentials_type(creds_type) {}
// Although the below grpc::string is logically const, we can't declare
// them const because of a limitation in the way old compilers (e.g., gcc-4.4)
// manage vector insertion using a copy constructor
grpc::string credentials_type;
static std::ostream& operator<<(std::ostream& out,
const TestScenario& scenario) {
return out << "TestScenario{use_proxy="
<< (scenario.use_proxy ? "true" : "false") << ", credentials='"
<< scenario.credentials_type << "'}";
}
void TestScenario::Log() const {
std::ostringstream out;
out << *this;
gpr_log(GPR_DEBUG, "%s", out.str().c_str());
}
class End2endTest : public ::testing::TestWithParam<TestScenario> {
: is_server_started_(false),
kMaxMessageSize_(8192),
special_service_("special") {
GetParam().Log();
}
if (is_server_started_) {
server_->Shutdown();
if (proxy_server_) proxy_server_->Shutdown();
}
}
void StartServer(const std::shared_ptr<AuthMetadataProcessor>& processor) {
auto server_creds = GetServerCredentials(GetParam().credentials_type);
if (GetParam().credentials_type != kInsecureCredentialsType) {
server_creds->SetAuthMetadataProcessor(processor);
}
builder.AddListeningPort(server_address_.str(), server_creds);
builder.RegisterService("foo.test.youtube.com", &special_service_);
builder.SetSyncServerOption(ServerBuilder::SyncServerOption::NUM_CQS, 4);
builder.SetSyncServerOption(
ServerBuilder::SyncServerOption::CQ_TIMEOUT_MSEC, 10);
virtual void ConfigureServerBuilder(ServerBuilder* builder) {
builder->SetMaxMessageSize(
kMaxMessageSize_); // For testing max message size.
}
void ResetChannel() {
if (!is_server_started_) {
StartServer(std::shared_ptr<AuthMetadataProcessor>());
}
EXPECT_TRUE(is_server_started_);
auto channel_creds =
GetChannelCredentials(GetParam().credentials_type, &args);
if (!user_agent_prefix_.empty()) {
args.SetUserAgentPrefix(user_agent_prefix_);
}
args.SetString(GRPC_ARG_SECONDARY_USER_AGENT_STRING, "end2end_test");
channel_ = CreateCustomChannel(server_address_.str(), channel_creds, args);
proxy_service_.reset(new Proxy(channel_));
int port = grpc_pick_unused_port_or_die();
std::ostringstream proxyaddr;
proxyaddr << "localhost:" << port;
ServerBuilder builder;
builder.AddListeningPort(proxyaddr.str(), InsecureServerCredentials());
builder.RegisterService(proxy_service_.get());
builder.SetSyncServerOption(ServerBuilder::SyncServerOption::NUM_CQS, 4);
builder.SetSyncServerOption(
ServerBuilder::SyncServerOption::CQ_TIMEOUT_MSEC, 10);
proxy_server_ = builder.BuildAndStart();
channel_ = CreateChannel(proxyaddr.str(), InsecureChannelCredentials());
stub_ = grpc::testing::EchoTestService::NewStub(channel_);
}
std::shared_ptr<Channel> channel_;
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
std::unique_ptr<Server> proxy_server_;
std::unique_ptr<Proxy> proxy_service_;
TestServiceImpl service_;
TestServiceImpl special_service_;
TestServiceImplDupPkg dup_pkg_service_;
grpc::string user_agent_prefix_;
static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs,
bool with_binary_metadata) {
David Garcia Quintas
committed
request.set_message("Hello hello hello hello");
for (int i = 0; i < num_rpcs; ++i) {
ClientContext context;
if (with_binary_metadata) {
char bytes[8] = {'\0', '\1', '\2', '\3', '\4', '\5', '\6', (char)i};
context.AddMetadata("custom-bin", grpc::string(bytes, 8));
}
context.set_compression_algorithm(GRPC_COMPRESS_GZIP);
Status s = stub->Echo(&context, request, &response);
EXPECT_EQ(response.message(), request.message());
// This class is for testing scenarios where RPCs are cancelled on the server
// by calling ServerContext::TryCancel()
class End2endServerTryCancelTest : public End2endTest {
protected:
// Helper for testing client-streaming RPCs which are cancelled on the server.
// Depending on the value of server_try_cancel parameter, this will test one
// of the following three scenarios:
// CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading
// any messages from the client
//
// CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading
// messages from the client
//
// CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading all
// the messages from the client
//
// NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL.
void TestRequestStreamServerCancel(
ServerTryCancelRequestPhase server_try_cancel, int num_msgs_to_send) {
ResetStub();
EchoRequest request;
EchoResponse response;
ClientContext context;
// Send server_try_cancel value in the client metadata
context.AddMetadata(kServerTryCancelRequest,
grpc::to_string(server_try_cancel));
auto stream = stub_->RequestStream(&context, &response);
int num_msgs_sent = 0;
while (num_msgs_sent < num_msgs_to_send) {
request.set_message("hello");
if (!stream->Write(request)) {
break;
}
num_msgs_sent++;
}
gpr_log(GPR_INFO, "Sent %d messages", num_msgs_sent);
stream->WritesDone();
Status s = stream->Finish();
// At this point, we know for sure that RPC was cancelled by the server
// since we passed server_try_cancel value in the metadata. Depending on the
// value of server_try_cancel, the RPC might have been cancelled by the
// server at different stages. The following validates our expectations of
// number of messages sent in various cancellation scenarios:
switch (server_try_cancel) {
case CANCEL_BEFORE_PROCESSING:
case CANCEL_DURING_PROCESSING:
// If the RPC is cancelled by server before / during messages from the
// client, it means that the client most likely did not get a chance to
// send all the messages it wanted to send. i.e num_msgs_sent <=
// num_msgs_to_send
EXPECT_LE(num_msgs_sent, num_msgs_to_send);
break;
case CANCEL_AFTER_PROCESSING:
// If the RPC was cancelled after all messages were read by the server,
// the client did get a chance to send all its messages
EXPECT_EQ(num_msgs_sent, num_msgs_to_send);
break;
default:
gpr_log(GPR_ERROR, "Invalid server_try_cancel value: %d",
server_try_cancel);
EXPECT_TRUE(server_try_cancel > DO_NOT_CANCEL &&
server_try_cancel <= CANCEL_AFTER_PROCESSING);
break;
}
EXPECT_FALSE(s.ok());
EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
}
// Helper for testing server-streaming RPCs which are cancelled on the server.
// Depending on the value of server_try_cancel parameter, this will test one
// of the following three scenarios:
// CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before writing
// any messages to the client
//
// CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while writing
// messages to the client
//
// CANCEL_AFTER PROCESSING: Rpc is cancelled by server after writing all
// the messages to the client
//
// NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL.
void TestResponseStreamServerCancel(
ServerTryCancelRequestPhase server_try_cancel) {
ResetStub();
EchoRequest request;
EchoResponse response;
ClientContext context;
// Send server_try_cancel in the client metadata
context.AddMetadata(kServerTryCancelRequest,
grpc::to_string(server_try_cancel));
request.set_message("hello");
auto stream = stub_->ResponseStream(&context, request);
int num_msgs_read = 0;
while (num_msgs_read < kNumResponseStreamsMsgs) {
if (!stream->Read(&response)) {
break;
}
EXPECT_EQ(response.message(),
request.message() + grpc::to_string(num_msgs_read));
num_msgs_read++;
}
gpr_log(GPR_INFO, "Read %d messages", num_msgs_read);
Status s = stream->Finish();
// Depending on the value of server_try_cancel, the RPC might have been
// cancelled by the server at different stages. The following validates our
// expectations of number of messages read in various cancellation
// scenarios:
switch (server_try_cancel) {
case CANCEL_BEFORE_PROCESSING:
// Server cancelled before sending any messages. Which means the client
// wouldn't have read any
EXPECT_EQ(num_msgs_read, 0);
break;
case CANCEL_DURING_PROCESSING:
// Server cancelled while writing messages. Client must have read less
// than or equal to the expected number of messages
EXPECT_LE(num_msgs_read, kNumResponseStreamsMsgs);
break;
// Even though the Server cancelled after writing all messages, the RPC
// may be cancelled before the Client got a chance to read all the
// messages.
EXPECT_LE(num_msgs_read, kNumResponseStreamsMsgs);
default: {
gpr_log(GPR_ERROR, "Invalid server_try_cancel value: %d",
server_try_cancel);
EXPECT_TRUE(server_try_cancel > DO_NOT_CANCEL &&
server_try_cancel <= CANCEL_AFTER_PROCESSING);
break;
}
}
EXPECT_FALSE(s.ok());
EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
}
// Helper for testing bidirectional-streaming RPCs which are cancelled on the
// server. Depending on the value of server_try_cancel parameter, this will
// test one of the following three scenarios:
// CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading/
// writing any messages from/to the client
//
// CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading/
// writing messages from/to the client
//
// CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading/writing
// all the messages from/to the client
//
// NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL.
void TestBidiStreamServerCancel(ServerTryCancelRequestPhase server_try_cancel,
int num_messages) {
ResetStub();
EchoRequest request;
EchoResponse response;
ClientContext context;
// Send server_try_cancel in the client metadata
context.AddMetadata(kServerTryCancelRequest,
grpc::to_string(server_try_cancel));
auto stream = stub_->BidiStream(&context);
int num_msgs_read = 0;
int num_msgs_sent = 0;
while (num_msgs_sent < num_messages) {
request.set_message("hello " + grpc::to_string(num_msgs_sent));
if (!stream->Write(request)) {
break;
}
num_msgs_sent++;
if (!stream->Read(&response)) {
break;
}
num_msgs_read++;
EXPECT_EQ(response.message(), request.message());
}
gpr_log(GPR_INFO, "Sent %d messages", num_msgs_sent);
gpr_log(GPR_INFO, "Read %d messages", num_msgs_read);
stream->WritesDone();
Status s = stream->Finish();
// Depending on the value of server_try_cancel, the RPC might have been
// cancelled by the server at different stages. The following validates our
// expectations of number of messages read in various cancellation
// scenarios:
switch (server_try_cancel) {
EXPECT_EQ(num_msgs_read, 0);
break;
EXPECT_LE(num_msgs_sent, num_messages);
EXPECT_LE(num_msgs_read, num_msgs_sent);
break;
EXPECT_EQ(num_msgs_sent, num_messages);
// The Server cancelled after reading the last message and after writing
// the message to the client. However, the RPC cancellation might have
// taken effect before the client actually read the response.
EXPECT_LE(num_msgs_read, num_msgs_sent);
gpr_log(GPR_ERROR, "Invalid server_try_cancel value: %d",
server_try_cancel);
EXPECT_TRUE(server_try_cancel > DO_NOT_CANCEL &&
server_try_cancel <= CANCEL_AFTER_PROCESSING);
break;
}
EXPECT_FALSE(s.ok());
EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
}
};
TEST_P(End2endServerTryCancelTest, RequestEchoServerCancel) {
ResetStub();
EchoRequest request;
EchoResponse response;
ClientContext context;
context.AddMetadata(kServerTryCancelRequest,
grpc::to_string(CANCEL_BEFORE_PROCESSING));
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
Status s = stub_->Echo(&context, request, &response);
EXPECT_FALSE(s.ok());
EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
}
// Server to cancel before doing reading the request
TEST_P(End2endServerTryCancelTest, RequestStreamServerCancelBeforeReads) {
TestRequestStreamServerCancel(CANCEL_BEFORE_PROCESSING, 1);
}
// Server to cancel while reading a request from the stream in parallel
TEST_P(End2endServerTryCancelTest, RequestStreamServerCancelDuringRead) {
TestRequestStreamServerCancel(CANCEL_DURING_PROCESSING, 10);
}
// Server to cancel after reading all the requests but before returning to the
// client
TEST_P(End2endServerTryCancelTest, RequestStreamServerCancelAfterReads) {
TestRequestStreamServerCancel(CANCEL_AFTER_PROCESSING, 4);
}
// Server to cancel before sending any response messages
TEST_P(End2endServerTryCancelTest, ResponseStreamServerCancelBefore) {
TestResponseStreamServerCancel(CANCEL_BEFORE_PROCESSING);
}
// Server to cancel while writing a response to the stream in parallel
TEST_P(End2endServerTryCancelTest, ResponseStreamServerCancelDuring) {
TestResponseStreamServerCancel(CANCEL_DURING_PROCESSING);
}
// Server to cancel after writing all the respones to the stream but before
// returning to the client
TEST_P(End2endServerTryCancelTest, ResponseStreamServerCancelAfter) {
TestResponseStreamServerCancel(CANCEL_AFTER_PROCESSING);
}
// Server to cancel before reading/writing any requests/responses on the stream
TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelBefore) {
TestBidiStreamServerCancel(CANCEL_BEFORE_PROCESSING, 2);
}
// Server to cancel while reading/writing requests/responses on the stream in
// parallel
TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelDuring) {
TestBidiStreamServerCancel(CANCEL_DURING_PROCESSING, 10);
}
// Server to cancel after reading/writing all requests/responses on the stream
// but before returning to the client
TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelAfter) {
TestBidiStreamServerCancel(CANCEL_AFTER_PROCESSING, 5);
}
TEST_P(End2endTest, SimpleRpcWithCustomUserAgentPrefix) {
user_agent_prefix_ = "custom_prefix";
ResetStub();
EchoRequest request;
EchoResponse response;
request.set_message("Hello hello hello hello");
request.mutable_param()->set_echo_metadata(true);
ClientContext context;
Status s = stub_->Echo(&context, request, &response);
EXPECT_EQ(response.message(), request.message());
EXPECT_TRUE(s.ok());
const auto& trailing_metadata = context.GetServerTrailingMetadata();
auto iter = trailing_metadata.find("user-agent");
EXPECT_TRUE(iter != trailing_metadata.end());
grpc::string expected_prefix = user_agent_prefix_ + " grpc-c++/";
EXPECT_TRUE(iter->second.starts_with(expected_prefix)) << iter->second;
TEST_P(End2endTest, MultipleRpcsWithVariedBinaryMetadataValue) {
ResetStub();
std::vector<std::thread> threads;
threads.emplace_back(SendRpc, stub_.get(), 10, true);
}
}
TEST_P(End2endTest, MultipleRpcs) {
ResetStub();
std::vector<std::thread> threads;
threads.emplace_back(SendRpc, stub_.get(), 10, false);
TEST_P(End2endTest, RequestStreamOneRequest) {
ResetStub();
EchoRequest request;
EchoResponse response;
ClientContext context;
auto stream = stub_->RequestStream(&context, &response);
request.set_message("hello");
EXPECT_TRUE(stream->Write(request));
stream->WritesDone();
EXPECT_EQ(response.message(), request.message());
}
TEST_P(End2endTest, RequestStreamTwoRequests) {
ResetStub();
EchoRequest request;
EchoResponse response;
ClientContext context;
auto stream = stub_->RequestStream(&context, &response);
request.set_message("hello");
EXPECT_TRUE(stream->Write(request));
EXPECT_TRUE(stream->Write(request));
stream->WritesDone();
EXPECT_EQ(response.message(), "hellohello");
}
TEST_P(End2endTest, ResponseStream) {
ResetStub();
EchoRequest request;
EchoResponse response;
ClientContext context;
request.set_message("hello");
auto stream = stub_->ResponseStream(&context, request);
EXPECT_TRUE(stream->Read(&response));
EXPECT_EQ(response.message(), request.message() + "0");
EXPECT_TRUE(stream->Read(&response));
EXPECT_EQ(response.message(), request.message() + "1");
EXPECT_TRUE(stream->Read(&response));
EXPECT_EQ(response.message(), request.message() + "2");
EXPECT_FALSE(stream->Read(&response));
}
TEST_P(End2endTest, BidiStream) {
ResetStub();
EchoRequest request;
EchoResponse response;
ClientContext context;
grpc::string msg("hello");
auto stream = stub_->BidiStream(&context);
request.set_message(msg + "0");
EXPECT_TRUE(stream->Write(request));
EXPECT_TRUE(stream->Read(&response));
EXPECT_EQ(response.message(), request.message());
request.set_message(msg + "1");
EXPECT_TRUE(stream->Write(request));
EXPECT_TRUE(stream->Read(&response));
EXPECT_EQ(response.message(), request.message());
request.set_message(msg + "2");
EXPECT_TRUE(stream->Write(request));
EXPECT_TRUE(stream->Read(&response));
EXPECT_EQ(response.message(), request.message());
stream->WritesDone();
EXPECT_FALSE(stream->Read(&response));
EXPECT_FALSE(stream->Read(&response));
}
// Talk to the two services with the same name but different package names.
// The two stubs are created on the same channel.
TEST_P(End2endTest, DiffPackageServices) {
ResetStub();
EchoRequest request;
EchoResponse response;
request.set_message("Hello");
ClientContext context;
Status s = stub_->Echo(&context, request, &response);
EXPECT_EQ(response.message(), request.message());
std::unique_ptr<grpc::testing::duplicate::EchoTestService::Stub> dup_pkg_stub(
grpc::testing::duplicate::EchoTestService::NewStub(channel_));
ClientContext context2;
s = dup_pkg_stub->Echo(&context2, request, &response);
EXPECT_EQ("no package", response.message());
}
void CancelRpc(ClientContext* context, int delay_us, TestServiceImpl* service) {
while (!service->signal_client()) {
}
context->TryCancel();
}
TEST_P(End2endTest, CancelRpcBeforeStart) {
ResetStub();
EchoRequest request;
EchoResponse response;
ClientContext context;
request.set_message("hello");
context.TryCancel();
Status s = stub_->Echo(&context, request, &response);
EXPECT_EQ("", response.message());
EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
}
// Client cancels request stream after sending two messages
TEST_P(End2endTest, ClientCancelsRequestStream) {
ResetStub();
EchoRequest request;
EchoResponse response;
ClientContext context;
request.set_message("hello");
auto stream = stub_->RequestStream(&context, &response);
EXPECT_TRUE(stream->Write(request));
EXPECT_TRUE(stream->Write(request));
context.TryCancel();
Status s = stream->Finish();
EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
// Client cancels server stream after sending some messages
TEST_P(End2endTest, ClientCancelsResponseStream) {
ResetStub();
EchoRequest request;
EchoResponse response;
ClientContext context;
request.set_message("hello");
auto stream = stub_->ResponseStream(&context, request);
EXPECT_TRUE(stream->Read(&response));
EXPECT_EQ(response.message(), request.message() + "0");
EXPECT_TRUE(stream->Read(&response));
EXPECT_EQ(response.message(), request.message() + "1");
context.TryCancel();
// The cancellation races with responses, so there might be zero or
// one responses pending, read till failure
if (stream->Read(&response)) {
EXPECT_EQ(response.message(), request.message() + "2");
// Since we have cancelled, we expect the next attempt to read to fail
EXPECT_FALSE(stream->Read(&response));
}
Status s = stream->Finish();
// The final status could be either of CANCELLED or OK depending on
// who won the race.
EXPECT_GE(grpc::StatusCode::CANCELLED, s.error_code());
// Client cancels bidi stream after sending some messages
TEST_P(End2endTest, ClientCancelsBidi) {
ResetStub();
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
EchoRequest request;
EchoResponse response;
ClientContext context;
grpc::string msg("hello");
auto stream = stub_->BidiStream(&context);
request.set_message(msg + "0");
EXPECT_TRUE(stream->Write(request));
EXPECT_TRUE(stream->Read(&response));
EXPECT_EQ(response.message(), request.message());
request.set_message(msg + "1");
EXPECT_TRUE(stream->Write(request));
context.TryCancel();
// The cancellation races with responses, so there might be zero or
// one responses pending, read till failure
if (stream->Read(&response)) {
EXPECT_EQ(response.message(), request.message());
// Since we have cancelled, we expect the next attempt to read to fail
EXPECT_FALSE(stream->Read(&response));
}
Status s = stream->Finish();
EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
TEST_P(End2endTest, RpcMaxMessageSize) {
ResetStub();
EchoRequest request;
EchoResponse response;
ClientContext context;
Status s = stub_->Echo(&context, request, &response);
// Client sends 20 requests and the server returns CANCELLED status after
// reading 10 requests.
TEST_P(End2endTest, RequestStreamServerEarlyCancelTest) {
ResetStub();
EchoRequest request;
EchoResponse response;
ClientContext context;
context.AddMetadata(kServerCancelAfterReads, "10");
auto stream = stub_->RequestStream(&context, &response);
request.set_message("hello");
int send_messages = 20;
while (send_messages > 10) {
EXPECT_TRUE(stream->Write(request));
send_messages--;
while (send_messages > 0) {
stream->Write(request);
send_messages--;
}
stream->WritesDone();
Status s = stream->Finish();
EXPECT_EQ(s.error_code(), StatusCode::CANCELLED);
void ReaderThreadFunc(ClientReaderWriter<EchoRequest, EchoResponse>* stream,
gpr_event* ev) {
EchoResponse resp;
gpr_event_set(ev, (void*)1);
while (stream->Read(&resp)) {
gpr_log(GPR_INFO, "Read message");
}
}
// Run a Read and a WritesDone simultaneously.
TEST_P(End2endTest, SimultaneousReadWritesDone) {
ResetStub();
ClientContext context;
gpr_event ev;
gpr_event_init(&ev);
auto stream = stub_->BidiStream(&context);
std::thread reader_thread(ReaderThreadFunc, stream.get(), &ev);
gpr_event_wait(&ev, gpr_inf_future(GPR_CLOCK_REALTIME));
stream->WritesDone();
reader_thread.join();
TEST_P(End2endTest, ChannelState) {
ResetStub();
// Start IDLE
EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false));
// Did not ask to connect, no state change.
CompletionQueue cq;
std::chrono::system_clock::time_point deadline =
std::chrono::system_clock::now() + std::chrono::milliseconds(10);
channel_->NotifyOnStateChange(GRPC_CHANNEL_IDLE, deadline, &cq, NULL);
void* tag;
bool ok = true;
cq.Next(&tag, &ok);
EXPECT_FALSE(ok);
EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(true));
EXPECT_TRUE(channel_->WaitForStateChange(GRPC_CHANNEL_IDLE,
gpr_inf_future(GPR_CLOCK_REALTIME)));
auto state = channel_->GetState(false);
EXPECT_TRUE(state == GRPC_CHANNEL_CONNECTING || state == GRPC_CHANNEL_READY);
// Takes 10s.
TEST_P(End2endTest, ChannelStateTimeout) {
if (GetParam().credentials_type != kInsecureCredentialsType) {
return;
}
int port = grpc_pick_unused_port_or_die();
std::ostringstream server_address;
server_address << "127.0.0.1:" << port;
// Channel to non-existing server
auto channel =
CreateChannel(server_address.str(), InsecureChannelCredentials());
// Start IDLE
EXPECT_EQ(GRPC_CHANNEL_IDLE, channel->GetState(true));
auto state = GRPC_CHANNEL_IDLE;
for (int i = 0; i < 10; i++) {
channel->WaitForStateChange(
state, std::chrono::system_clock::now() + std::chrono::seconds(1));
state = channel->GetState(false);
}
}