Skip to content
Snippets Groups Projects
Commit 24e69bf0 authored by Aaron Isotton's avatar Aaron Isotton
Browse files

Added a channel argument to set the maximum reconnect backoff duration....

Added a channel argument to set the maximum reconnect backoff duration. Extended the interop test to test the custom reconnect backoffs.

This closes #5377.
parent 0e67d191
Branches
Tags
No related merge requests found
...@@ -142,6 +142,8 @@ typedef struct { ...@@ -142,6 +142,8 @@ typedef struct {
/** Secondary user agent: goes at the end of the user-agent metadata /** Secondary user agent: goes at the end of the user-agent metadata
sent on each request */ sent on each request */
#define GRPC_ARG_SECONDARY_USER_AGENT_STRING "grpc.secondary_user_agent" #define GRPC_ARG_SECONDARY_USER_AGENT_STRING "grpc.secondary_user_agent"
/** The maximum time between subsequent connection attempts, in ms */
#define GRPC_ARG_MAX_RECONNECT_BACKOFF_MS "grpc.max_reconnect_backoff_ms"
/* The caller of the secure_channel_create functions may override the target /* The caller of the secure_channel_create functions may override the target
name used for SSL host name checking using this channel argument which is of name used for SSL host name checking using this channel argument which is of
type GRPC_ARG_STRING. This *should* be used for testing only. type GRPC_ARG_STRING. This *should* be used for testing only.
......
...@@ -603,6 +603,20 @@ static void update_reconnect_parameters(grpc_subchannel *c) { ...@@ -603,6 +603,20 @@ static void update_reconnect_parameters(grpc_subchannel *c) {
gpr_time_from_millis(c->args->args[i].value.integer, GPR_TIMESPAN)); gpr_time_from_millis(c->args->args[i].value.integer, GPR_TIMESPAN));
return; return;
} }
if (0 ==
strcmp(c->args->args[i].key, GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) {
if (c->args->args[i].type == GRPC_ARG_INTEGER) {
if (c->args->args[i].value.integer >= 0) {
max_backoff_millis = c->args->args[i].value.integer;
} else {
gpr_log(GPR_ERROR, GRPC_ARG_MAX_RECONNECT_BACKOFF_MS
" : must be non-negative");
}
} else {
gpr_log(GPR_ERROR,
GRPC_ARG_MAX_RECONNECT_BACKOFF_MS " : must be an integer");
}
}
} }
} }
......
// Copyright 2015, Google Inc. // Copyright 2015-2016, Google Inc.
// All rights reserved. // All rights reserved.
// //
// Redistribution and use in source and binary forms, with or without // Redistribution and use in source and binary forms, with or without
...@@ -158,6 +158,12 @@ message StreamingOutputCallResponse { ...@@ -158,6 +158,12 @@ message StreamingOutputCallResponse {
Payload payload = 1; Payload payload = 1;
} }
// For reconnect interop test only.
// Client tells server what reconnection parameters it used.
message ReconnectParams {
int32 max_reconnect_backoff_ms = 1;
}
// For reconnect interop test only. // For reconnect interop test only.
// Server tells client whether its reconnects are following the spec and the // Server tells client whether its reconnects are following the spec and the
// reconnect backoffs it saw. // reconnect backoffs it saw.
......
// Copyright 2015, Google Inc. // Copyright 2015-2016, Google Inc.
// All rights reserved. // All rights reserved.
// //
// Redistribution and use in source and binary forms, with or without // Redistribution and use in source and binary forms, with or without
...@@ -80,6 +80,6 @@ service UnimplementedService { ...@@ -80,6 +80,6 @@ service UnimplementedService {
// A service used to control reconnect server. // A service used to control reconnect server.
service ReconnectService { service ReconnectService {
rpc Start(grpc.testing.Empty) returns (grpc.testing.Empty); rpc Start(grpc.testing.ReconnectParams) returns (grpc.testing.Empty);
rpc Stop(grpc.testing.Empty) returns (grpc.testing.ReconnectInfo); rpc Stop(grpc.testing.Empty) returns (grpc.testing.ReconnectInfo);
} }
...@@ -60,8 +60,12 @@ static void pretty_print_backoffs(reconnect_server *server) { ...@@ -60,8 +60,12 @@ static void pretty_print_backoffs(reconnect_server *server) {
i, backoff / 1000.0, expected_backoff / 1000.0, i, backoff / 1000.0, expected_backoff / 1000.0,
(backoff - expected_backoff) * 100.0 / expected_backoff); (backoff - expected_backoff) * 100.0 / expected_backoff);
expected_backoff *= 1.6; expected_backoff *= 1.6;
if (expected_backoff > 120 * 1000) { int max_reconnect_backoff_ms = 120 * 1000;
expected_backoff = 120 * 1000; if (server->max_reconnect_backoff_ms > 0) {
max_reconnect_backoff_ms = server->max_reconnect_backoff_ms;
}
if (expected_backoff > max_reconnect_backoff_ms) {
expected_backoff = max_reconnect_backoff_ms;
} }
} }
} }
...@@ -108,6 +112,7 @@ void reconnect_server_init(reconnect_server *server) { ...@@ -108,6 +112,7 @@ void reconnect_server_init(reconnect_server *server) {
server->head = NULL; server->head = NULL;
server->tail = NULL; server->tail = NULL;
server->peer = NULL; server->peer = NULL;
server->max_reconnect_backoff_ms = 0;
} }
void reconnect_server_start(reconnect_server *server, int port) { void reconnect_server_start(reconnect_server *server, int port) {
......
/* /*
* *
* Copyright 2015, Google Inc. * Copyright 2015-2016, Google Inc.
* All rights reserved. * All rights reserved.
* *
* Redistribution and use in source and binary forms, with or without * Redistribution and use in source and binary forms, with or without
...@@ -52,6 +52,7 @@ typedef struct reconnect_server { ...@@ -52,6 +52,7 @@ typedef struct reconnect_server {
timestamp_list *head; timestamp_list *head;
timestamp_list *tail; timestamp_list *tail;
char *peer; char *peer;
int max_reconnect_backoff_ms;
} reconnect_server; } reconnect_server;
void reconnect_server_init(reconnect_server *server); void reconnect_server_init(reconnect_server *server);
......
...@@ -39,6 +39,7 @@ ...@@ -39,6 +39,7 @@
#include <gflags/gflags.h> #include <gflags/gflags.h>
#include <grpc++/channel.h> #include <grpc++/channel.h>
#include <grpc++/client_context.h> #include <grpc++/client_context.h>
#include <grpc++/support/channel_arguments.h>
#include "test/cpp/util/create_test_channel.h" #include "test/cpp/util/create_test_channel.h"
#include "test/cpp/util/test_config.h" #include "test/cpp/util/test_config.h"
#include "src/proto/grpc/testing/test.grpc.pb.h" #include "src/proto/grpc/testing/test.grpc.pb.h"
...@@ -48,13 +49,18 @@ ...@@ -48,13 +49,18 @@
DEFINE_int32(server_control_port, 0, "Server port for control rpcs."); DEFINE_int32(server_control_port, 0, "Server port for control rpcs.");
DEFINE_int32(server_retry_port, 0, "Server port for testing reconnection."); DEFINE_int32(server_retry_port, 0, "Server port for testing reconnection.");
DEFINE_string(server_host, "127.0.0.1", "Server host to connect to"); DEFINE_string(server_host, "127.0.0.1", "Server host to connect to");
DEFINE_int32(max_reconnect_backoff_ms, 0,
"Maximum backoff time, or 0 for default.");
using grpc::CallCredentials;
using grpc::Channel; using grpc::Channel;
using grpc::ChannelArguments;
using grpc::ClientContext; using grpc::ClientContext;
using grpc::CreateTestChannel; using grpc::CreateTestChannel;
using grpc::Status; using grpc::Status;
using grpc::testing::Empty; using grpc::testing::Empty;
using grpc::testing::ReconnectInfo; using grpc::testing::ReconnectInfo;
using grpc::testing::ReconnectParams;
using grpc::testing::ReconnectService; using grpc::testing::ReconnectService;
int main(int argc, char** argv) { int main(int argc, char** argv) {
...@@ -68,17 +74,25 @@ int main(int argc, char** argv) { ...@@ -68,17 +74,25 @@ int main(int argc, char** argv) {
ReconnectService::NewStub( ReconnectService::NewStub(
CreateTestChannel(server_address.str(), false))); CreateTestChannel(server_address.str(), false)));
ClientContext start_context; ClientContext start_context;
Empty empty_request; ReconnectParams reconnect_params;
reconnect_params.set_max_reconnect_backoff_ms(FLAGS_max_reconnect_backoff_ms);
Empty empty_response; Empty empty_response;
Status start_status = Status start_status =
control_stub->Start(&start_context, empty_request, &empty_response); control_stub->Start(&start_context, reconnect_params, &empty_response);
GPR_ASSERT(start_status.ok()); GPR_ASSERT(start_status.ok());
gpr_log(GPR_INFO, "Starting connections with retries."); gpr_log(GPR_INFO, "Starting connections with retries.");
server_address.str(""); server_address.str("");
server_address << FLAGS_server_host << ':' << FLAGS_server_retry_port; server_address << FLAGS_server_host << ':' << FLAGS_server_retry_port;
ChannelArguments channel_args;
if (FLAGS_max_reconnect_backoff_ms > 0) {
channel_args.SetInt(GRPC_ARG_MAX_RECONNECT_BACKOFF_MS,
FLAGS_max_reconnect_backoff_ms);
}
std::shared_ptr<Channel> retry_channel = std::shared_ptr<Channel> retry_channel =
CreateTestChannel(server_address.str(), true); CreateTestChannel(server_address.str(), "foo.test.google.fr", true, false,
std::shared_ptr<CallCredentials>(), channel_args);
// About 13 retries. // About 13 retries.
const int kDeadlineSeconds = 540; const int kDeadlineSeconds = 540;
// Use any rpc to test retry. // Use any rpc to test retry.
...@@ -88,15 +102,15 @@ int main(int argc, char** argv) { ...@@ -88,15 +102,15 @@ int main(int argc, char** argv) {
retry_context.set_deadline(std::chrono::system_clock::now() + retry_context.set_deadline(std::chrono::system_clock::now() +
std::chrono::seconds(kDeadlineSeconds)); std::chrono::seconds(kDeadlineSeconds));
Status retry_status = Status retry_status =
retry_stub->Start(&retry_context, empty_request, &empty_response); retry_stub->Start(&retry_context, reconnect_params, &empty_response);
GPR_ASSERT(retry_status.error_code() == grpc::StatusCode::DEADLINE_EXCEEDED); GPR_ASSERT(retry_status.error_code() == grpc::StatusCode::DEADLINE_EXCEEDED);
gpr_log(GPR_INFO, "Done retrying, getting final data from server"); gpr_log(GPR_INFO, "Done retrying, getting final data from server");
ClientContext stop_context; ClientContext stop_context;
ReconnectInfo response; ReconnectInfo response;
Status stop_status = Status stop_status = control_stub->Stop(&stop_context, Empty(), &response);
control_stub->Stop(&stop_context, empty_request, &response);
GPR_ASSERT(stop_status.ok()); GPR_ASSERT(stop_status.ok());
GPR_ASSERT(response.passed() == true); GPR_ASSERT(response.passed() == true);
gpr_log(GPR_INFO, "Passed");
return 0; return 0;
} }
...@@ -69,6 +69,7 @@ using grpc::Status; ...@@ -69,6 +69,7 @@ using grpc::Status;
using grpc::testing::Empty; using grpc::testing::Empty;
using grpc::testing::ReconnectService; using grpc::testing::ReconnectService;
using grpc::testing::ReconnectInfo; using grpc::testing::ReconnectInfo;
using grpc::testing::ReconnectParams;
static bool got_sigint = false; static bool got_sigint = false;
...@@ -90,7 +91,8 @@ class ReconnectServiceImpl : public ReconnectService::Service { ...@@ -90,7 +91,8 @@ class ReconnectServiceImpl : public ReconnectService::Service {
void Poll(int seconds) { reconnect_server_poll(&tcp_server_, seconds); } void Poll(int seconds) { reconnect_server_poll(&tcp_server_, seconds); }
Status Start(ServerContext* context, const Empty* request, Empty* response) { Status Start(ServerContext* context, const ReconnectParams* request,
Empty* response) {
bool start_server = true; bool start_server = true;
std::unique_lock<std::mutex> lock(mu_); std::unique_lock<std::mutex> lock(mu_);
while (serving_ && !shutdown_) { while (serving_ && !shutdown_) {
...@@ -103,6 +105,8 @@ class ReconnectServiceImpl : public ReconnectService::Service { ...@@ -103,6 +105,8 @@ class ReconnectServiceImpl : public ReconnectService::Service {
if (server_started_) { if (server_started_) {
start_server = false; start_server = false;
} else { } else {
tcp_server_.max_reconnect_backoff_ms =
request->max_reconnect_backoff_ms();
server_started_ = true; server_started_ = true;
} }
lock.unlock(); lock.unlock();
...@@ -131,7 +135,9 @@ class ReconnectServiceImpl : public ReconnectService::Service { ...@@ -131,7 +135,9 @@ class ReconnectServiceImpl : public ReconnectService::Service {
const double kTransmissionDelay = 100.0; const double kTransmissionDelay = 100.0;
const double kBackoffMultiplier = 1.6; const double kBackoffMultiplier = 1.6;
const double kJitterFactor = 0.2; const double kJitterFactor = 0.2;
const int kMaxBackoffMs = 120 * 1000; const int kMaxBackoffMs = tcp_server_.max_reconnect_backoff_ms
? tcp_server_.max_reconnect_backoff_ms
: 120 * 1000;
bool passed = true; bool passed = true;
for (timestamp_list* cur = tcp_server_.head; cur && cur->next; for (timestamp_list* cur = tcp_server_.head; cur && cur->next;
cur = cur->next) { cur = cur->next) {
......
/* /*
* *
* Copyright 2015, Google Inc. * Copyright 2015-2016, Google Inc.
* All rights reserved. * All rights reserved.
* *
* Redistribution and use in source and binary forms, with or without * Redistribution and use in source and binary forms, with or without
...@@ -58,8 +58,9 @@ namespace grpc { ...@@ -58,8 +58,9 @@ namespace grpc {
std::shared_ptr<Channel> CreateTestChannel( std::shared_ptr<Channel> CreateTestChannel(
const grpc::string& server, const grpc::string& override_hostname, const grpc::string& server, const grpc::string& override_hostname,
bool enable_ssl, bool use_prod_roots, bool enable_ssl, bool use_prod_roots,
const std::shared_ptr<CallCredentials>& creds) { const std::shared_ptr<CallCredentials>& creds,
ChannelArguments channel_args; const ChannelArguments& args) {
ChannelArguments channel_args(args);
if (enable_ssl) { if (enable_ssl) {
const char* roots_certs = use_prod_roots ? "" : test_root_cert; const char* roots_certs = use_prod_roots ? "" : test_root_cert;
SslCredentialsOptions ssl_opts = {roots_certs, "", ""}; SslCredentialsOptions ssl_opts = {roots_certs, "", ""};
...@@ -81,6 +82,14 @@ std::shared_ptr<Channel> CreateTestChannel( ...@@ -81,6 +82,14 @@ std::shared_ptr<Channel> CreateTestChannel(
} }
} }
std::shared_ptr<Channel> CreateTestChannel(
const grpc::string& server, const grpc::string& override_hostname,
bool enable_ssl, bool use_prod_roots,
const std::shared_ptr<CallCredentials>& creds) {
return CreateTestChannel(server, override_hostname, enable_ssl,
use_prod_roots, creds, ChannelArguments());
}
std::shared_ptr<Channel> CreateTestChannel( std::shared_ptr<Channel> CreateTestChannel(
const grpc::string& server, const grpc::string& override_hostname, const grpc::string& server, const grpc::string& override_hostname,
bool enable_ssl, bool use_prod_roots) { bool enable_ssl, bool use_prod_roots) {
......
/* /*
* *
* Copyright 2015, Google Inc. * Copyright 2015-2016, Google Inc.
* All rights reserved. * All rights reserved.
* *
* Redistribution and use in source and binary forms, with or without * Redistribution and use in source and binary forms, with or without
...@@ -53,6 +53,12 @@ std::shared_ptr<Channel> CreateTestChannel( ...@@ -53,6 +53,12 @@ std::shared_ptr<Channel> CreateTestChannel(
bool enable_ssl, bool use_prod_roots, bool enable_ssl, bool use_prod_roots,
const std::shared_ptr<CallCredentials>& creds); const std::shared_ptr<CallCredentials>& creds);
std::shared_ptr<Channel> CreateTestChannel(
const grpc::string& server, const grpc::string& override_hostname,
bool enable_ssl, bool use_prod_roots,
const std::shared_ptr<CallCredentials>& creds,
const ChannelArguments& args);
} // namespace grpc } // namespace grpc
#endif // GRPC_TEST_CPP_UTIL_CREATE_TEST_CHANNEL_H #endif // GRPC_TEST_CPP_UTIL_CREATE_TEST_CHANNEL_H
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment