Skip to content
Snippets Groups Projects
Commit ad0f7922 authored by Sree Kuchibhotla's avatar Sree Kuchibhotla
Browse files

Interop client that is resilient to server restarts

parent 48d833a9
Branches
Tags
No related merge requests found
...@@ -174,6 +174,7 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, bool success) { ...@@ -174,6 +174,7 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL); GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL);
holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
if (holder->connected_subchannel == NULL) { if (holder->connected_subchannel == NULL) {
gpr_atm_no_barrier_store(&holder->subchannel_call, 1);
fail_locked(exec_ctx, holder); fail_locked(exec_ctx, holder);
} else if (1 == gpr_atm_acq_load(&holder->subchannel_call)) { } else if (1 == gpr_atm_acq_load(&holder->subchannel_call)) {
/* already cancelled before subchannel became ready */ /* already cancelled before subchannel became ready */
......
...@@ -81,6 +81,14 @@ DEFINE_string(default_service_account, "", ...@@ -81,6 +81,14 @@ DEFINE_string(default_service_account, "",
DEFINE_string(service_account_key_file, "", DEFINE_string(service_account_key_file, "",
"Path to service account json key file."); "Path to service account json key file.");
DEFINE_string(oauth_scope, "", "Scope for OAuth tokens."); DEFINE_string(oauth_scope, "", "Scope for OAuth tokens.");
DEFINE_bool(do_not_abort_on_transient_failures, false,
"If set to 'true', abort() is not called in case of transient "
"failures (i.e failures that are temporary and will likely go away "
"on retrying; like a temporary connection failure) and an error "
"message is printed instead. Note that this flag just controls "
"whether abort() is called or not. It does not control whether the "
"test is retried in case of transient failures (and currently the "
"interop tests are not retried even if this flag is set to true)");
using grpc::testing::CreateChannelForTestCase; using grpc::testing::CreateChannelForTestCase;
using grpc::testing::GetServiceAccountJsonKey; using grpc::testing::GetServiceAccountJsonKey;
...@@ -89,8 +97,9 @@ int main(int argc, char** argv) { ...@@ -89,8 +97,9 @@ int main(int argc, char** argv) {
grpc::testing::InitTest(&argc, &argv, true); grpc::testing::InitTest(&argc, &argv, true);
gpr_log(GPR_INFO, "Testing these cases: %s", FLAGS_test_case.c_str()); gpr_log(GPR_INFO, "Testing these cases: %s", FLAGS_test_case.c_str());
int ret = 0; int ret = 0;
grpc::testing::InteropClient client( grpc::testing::InteropClient client(CreateChannelForTestCase(FLAGS_test_case),
CreateChannelForTestCase(FLAGS_test_case)); true,
FLAGS_do_not_abort_on_transient_failures);
if (FLAGS_test_case == "empty_unary") { if (FLAGS_test_case == "empty_unary") {
client.DoEmpty(); client.DoEmpty();
} else if (FLAGS_test_case == "large_unary") { } else if (FLAGS_test_case == "large_unary") {
......
This diff is collapsed.
...@@ -51,41 +51,42 @@ using CheckerFn = ...@@ -51,41 +51,42 @@ using CheckerFn =
class InteropClient { class InteropClient {
public: public:
explicit InteropClient(std::shared_ptr<Channel> channel); /// If new_stub_every_test_case is true, a new TestService::Stub object is
explicit InteropClient( /// created for every test case
std::shared_ptr<Channel> channel, /// If do_not_abort_on_transient_failures is true, abort() is not called in
bool new_stub_every_test_case); // If new_stub_every_test_case is true, /// case of transient failures (like connection failures)
// a new TestService::Stub object is explicit InteropClient(std::shared_ptr<Channel> channel,
// created for every test case below bool new_stub_every_test_case,
bool do_not_abort_on_transient_failures);
~InteropClient() {} ~InteropClient() {}
void Reset(std::shared_ptr<Channel> channel); void Reset(std::shared_ptr<Channel> channel);
void DoEmpty(); bool DoEmpty();
void DoLargeUnary(); bool DoLargeUnary();
void DoLargeCompressedUnary(); bool DoLargeCompressedUnary();
void DoPingPong(); bool DoPingPong();
void DoHalfDuplex(); bool DoHalfDuplex();
void DoRequestStreaming(); bool DoRequestStreaming();
void DoResponseStreaming(); bool DoResponseStreaming();
void DoResponseCompressedStreaming(); bool DoResponseCompressedStreaming();
void DoResponseStreamingWithSlowConsumer(); bool DoResponseStreamingWithSlowConsumer();
void DoCancelAfterBegin(); bool DoCancelAfterBegin();
void DoCancelAfterFirstResponse(); bool DoCancelAfterFirstResponse();
void DoTimeoutOnSleepingServer(); bool DoTimeoutOnSleepingServer();
void DoEmptyStream(); bool DoEmptyStream();
void DoStatusWithMessage(); bool DoStatusWithMessage();
void DoCustomMetadata(); bool DoCustomMetadata();
// Auth tests. // Auth tests.
// username is a string containing the user email // username is a string containing the user email
void DoJwtTokenCreds(const grpc::string& username); bool DoJwtTokenCreds(const grpc::string& username);
void DoComputeEngineCreds(const grpc::string& default_service_account, bool DoComputeEngineCreds(const grpc::string& default_service_account,
const grpc::string& oauth_scope); const grpc::string& oauth_scope);
// username the GCE default service account email // username the GCE default service account email
void DoOauth2AuthToken(const grpc::string& username, bool DoOauth2AuthToken(const grpc::string& username,
const grpc::string& oauth_scope); const grpc::string& oauth_scope);
// username is a string containing the user email // username is a string containing the user email
void DoPerRpcCreds(const grpc::string& json_key); bool DoPerRpcCreds(const grpc::string& json_key);
private: private:
class ServiceStub { class ServiceStub {
...@@ -105,13 +106,18 @@ class InteropClient { ...@@ -105,13 +106,18 @@ class InteropClient {
// Get() call // Get() call
}; };
void PerformLargeUnary(SimpleRequest* request, SimpleResponse* response); bool PerformLargeUnary(SimpleRequest* request, SimpleResponse* response);
/// Run \a custom_check_fn as an additional check. /// Run \a custom_check_fn as an additional check.
void PerformLargeUnary(SimpleRequest* request, SimpleResponse* response, bool PerformLargeUnary(SimpleRequest* request, SimpleResponse* response,
CheckerFn custom_checks_fn); CheckerFn custom_checks_fn);
void AssertOkOrPrintErrorStatus(const Status& s); bool AssertStatusOk(const Status& s);
bool AssertStatusCode(const Status& s, StatusCode expected_code);
bool TransientFailureOrAbort();
ServiceStub serviceStub_; ServiceStub serviceStub_;
/// If true, abort() is not called for transient failures
bool do_not_abort_on_transient_failures_;
}; };
} // namespace testing } // namespace testing
......
...@@ -84,11 +84,12 @@ StressTestInteropClient::StressTestInteropClient( ...@@ -84,11 +84,12 @@ StressTestInteropClient::StressTestInteropClient(
int test_id, const grpc::string& server_address, int test_id, const grpc::string& server_address,
std::shared_ptr<Channel> channel, std::shared_ptr<Channel> channel,
const WeightedRandomTestSelector& test_selector, long test_duration_secs, const WeightedRandomTestSelector& test_selector, long test_duration_secs,
long sleep_duration_ms) long sleep_duration_ms, bool do_not_abort_on_transient_failures)
: test_id_(test_id), : test_id_(test_id),
server_address_(server_address), server_address_(server_address),
channel_(channel), channel_(channel),
interop_client_(new InteropClient(channel, false)), interop_client_(new InteropClient(channel, false,
do_not_abort_on_transient_failures)),
test_selector_(test_selector), test_selector_(test_selector),
test_duration_secs_(test_duration_secs), test_duration_secs_(test_duration_secs),
sleep_duration_ms_(sleep_duration_ms) {} sleep_duration_ms_(sleep_duration_ms) {}
......
...@@ -87,7 +87,8 @@ class StressTestInteropClient { ...@@ -87,7 +87,8 @@ class StressTestInteropClient {
StressTestInteropClient(int test_id, const grpc::string& server_address, StressTestInteropClient(int test_id, const grpc::string& server_address,
std::shared_ptr<Channel> channel, std::shared_ptr<Channel> channel,
const WeightedRandomTestSelector& test_selector, const WeightedRandomTestSelector& test_selector,
long test_duration_secs, long sleep_duration_ms); long test_duration_secs, long sleep_duration_ms,
bool do_not_abort_on_transient_failures);
// The main function. Use this as the thread entry point. // The main function. Use this as the thread entry point.
// qps_gauge is the QpsGauge to record the requests per second metric // qps_gauge is the QpsGauge to record the requests per second metric
......
...@@ -101,6 +101,10 @@ DEFINE_int32(log_level, GPR_LOG_SEVERITY_INFO, ...@@ -101,6 +101,10 @@ DEFINE_int32(log_level, GPR_LOG_SEVERITY_INFO,
"The choices are: 0 (GPR_LOG_SEVERITY_DEBUG), 1 " "The choices are: 0 (GPR_LOG_SEVERITY_DEBUG), 1 "
"(GPR_LOG_SEVERITY_INFO) and 2 (GPR_LOG_SEVERITY_ERROR)"); "(GPR_LOG_SEVERITY_INFO) and 2 (GPR_LOG_SEVERITY_ERROR)");
DEFINE_bool(do_not_abort_on_transient_failures, true,
"If set to 'true', abort() is not called in case of transient "
"failures like temporary connection failures.");
using grpc::testing::kTestCaseList; using grpc::testing::kTestCaseList;
using grpc::testing::MetricsService; using grpc::testing::MetricsService;
using grpc::testing::MetricsServiceImpl; using grpc::testing::MetricsServiceImpl;
...@@ -189,6 +193,12 @@ void LogParameterInfo(const std::vector<grpc::string>& addresses, ...@@ -189,6 +193,12 @@ void LogParameterInfo(const std::vector<grpc::string>& addresses,
gpr_log(GPR_INFO, "test_cases : %s", FLAGS_test_cases.c_str()); gpr_log(GPR_INFO, "test_cases : %s", FLAGS_test_cases.c_str());
gpr_log(GPR_INFO, "sleep_duration_ms: %d", FLAGS_sleep_duration_ms); gpr_log(GPR_INFO, "sleep_duration_ms: %d", FLAGS_sleep_duration_ms);
gpr_log(GPR_INFO, "test_duration_secs: %d", FLAGS_test_duration_secs); gpr_log(GPR_INFO, "test_duration_secs: %d", FLAGS_test_duration_secs);
gpr_log(GPR_INFO, "num_channels_per_server: %d",
FLAGS_num_channels_per_server);
gpr_log(GPR_INFO, "num_stubs_per_channel: %d", FLAGS_num_stubs_per_channel);
gpr_log(GPR_INFO, "log_level: %d", FLAGS_log_level);
gpr_log(GPR_INFO, "do_not_abort_on_transient_failures: %s",
FLAGS_do_not_abort_on_transient_failures ? "true" : "false");
int num = 0; int num = 0;
for (auto it = addresses.begin(); it != addresses.end(); it++) { for (auto it = addresses.begin(); it != addresses.end(); it++) {
...@@ -272,7 +282,7 @@ int main(int argc, char** argv) { ...@@ -272,7 +282,7 @@ int main(int argc, char** argv) {
stub_idx++) { stub_idx++) {
StressTestInteropClient* client = new StressTestInteropClient( StressTestInteropClient* client = new StressTestInteropClient(
++thread_idx, *it, channel, test_selector, FLAGS_test_duration_secs, ++thread_idx, *it, channel, test_selector, FLAGS_test_duration_secs,
FLAGS_sleep_duration_ms); FLAGS_sleep_duration_ms, FLAGS_do_not_abort_on_transient_failures);
bool is_already_created = false; bool is_already_created = false;
// QpsGauge name // QpsGauge name
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment