Skip to content
Snippets Groups Projects
Commit 536fc539 authored by Craig Tiller's avatar Craig Tiller
Browse files

Integrate backoff library with subchannel

parent c72cc422
No related branches found
No related tags found
No related merge requests found
...@@ -45,6 +45,7 @@ ...@@ -45,6 +45,7 @@
#include "src/core/client_config/subchannel_index.h" #include "src/core/client_config/subchannel_index.h"
#include "src/core/iomgr/timer.h" #include "src/core/iomgr/timer.h"
#include "src/core/profiling/timers.h" #include "src/core/profiling/timers.h"
#include "src/core/support/backoff.h"
#include "src/core/surface/channel.h" #include "src/core/surface/channel.h"
#include "src/core/transport/connectivity_state.h" #include "src/core/transport/connectivity_state.h"
...@@ -127,8 +128,8 @@ struct grpc_subchannel { ...@@ -127,8 +128,8 @@ struct grpc_subchannel {
/** next connect attempt time */ /** next connect attempt time */
gpr_timespec next_attempt; gpr_timespec next_attempt;
/** amount to backoff each failure */ /** backoff state */
gpr_timespec backoff_delta; gpr_backoff backoff_state;
/** do we have an active alarm? */ /** do we have an active alarm? */
int have_alarm; int have_alarm;
/** our alarm */ /** our alarm */
...@@ -146,7 +147,6 @@ struct grpc_subchannel_call { ...@@ -146,7 +147,6 @@ struct grpc_subchannel_call {
#define CALLSTACK_TO_SUBCHANNEL_CALL(callstack) \ #define CALLSTACK_TO_SUBCHANNEL_CALL(callstack) \
(((grpc_subchannel_call *)(callstack)) - 1) (((grpc_subchannel_call *)(callstack)) - 1)
static gpr_timespec compute_connect_deadline(grpc_subchannel *c);
static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel, static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel,
bool iomgr_success); bool iomgr_success);
...@@ -184,8 +184,8 @@ static void connection_destroy(grpc_exec_ctx *exec_ctx, void *arg, ...@@ -184,8 +184,8 @@ static void connection_destroy(grpc_exec_ctx *exec_ctx, void *arg,
gpr_free(c); gpr_free(c);
} }
void grpc_connected_subchannel_ref(grpc_connected_subchannel *c void grpc_connected_subchannel_ref(
GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { grpc_connected_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
GRPC_CHANNEL_STACK_REF(CHANNEL_STACK_FROM_CONNECTION(c), REF_REASON); GRPC_CHANNEL_STACK_REF(CHANNEL_STACK_FROM_CONNECTION(c), REF_REASON);
} }
...@@ -226,8 +226,8 @@ static gpr_atm ref_mutate(grpc_subchannel *c, gpr_atm delta, ...@@ -226,8 +226,8 @@ static gpr_atm ref_mutate(grpc_subchannel *c, gpr_atm delta,
return old_val; return old_val;
} }
grpc_subchannel *grpc_subchannel_ref(grpc_subchannel *c grpc_subchannel *grpc_subchannel_ref(
GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
gpr_atm old_refs; gpr_atm old_refs;
old_refs = ref_mutate(c, (1 << INTERNAL_REF_BITS), old_refs = ref_mutate(c, (1 << INTERNAL_REF_BITS),
0 REF_MUTATE_PURPOSE("STRONG_REF")); 0 REF_MUTATE_PURPOSE("STRONG_REF"));
...@@ -235,8 +235,8 @@ grpc_subchannel *grpc_subchannel_ref(grpc_subchannel *c ...@@ -235,8 +235,8 @@ grpc_subchannel *grpc_subchannel_ref(grpc_subchannel *c
return c; return c;
} }
grpc_subchannel *grpc_subchannel_weak_ref(grpc_subchannel *c grpc_subchannel *grpc_subchannel_weak_ref(
GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
gpr_atm old_refs; gpr_atm old_refs;
old_refs = ref_mutate(c, 1, 0 REF_MUTATE_PURPOSE("WEAK_REF")); old_refs = ref_mutate(c, 1, 0 REF_MUTATE_PURPOSE("WEAK_REF"));
GPR_ASSERT(old_refs != 0); GPR_ASSERT(old_refs != 0);
...@@ -337,6 +337,22 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, ...@@ -337,6 +337,22 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
grpc_closure_init(&c->connected, subchannel_connected, c); grpc_closure_init(&c->connected, subchannel_connected, c);
grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE, grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE,
"subchannel"); "subchannel");
gpr_backoff_init(&c->backoff_state,
GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER,
GRPC_SUBCHANNEL_RECONNECT_JITTER,
GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000,
GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
if (c->args) {
for (size_t i = 0; i < c->args->num_args; i++) {
if (0 == strcmp(c->args->args[i].key,
"grpc.testing.fixed_reconnect_backoff")) {
GPR_ASSERT(c->args->args[i].type == GRPC_ARG_INTEGER);
gpr_backoff_init(&c->backoff_state, 1.0, 0.0,
c->args->args[i].value.integer,
c->args->args[i].value.integer);
}
}
}
gpr_mu_init(&c->mu); gpr_mu_init(&c->mu);
return grpc_subchannel_index_register(exec_ctx, key, c); return grpc_subchannel_index_register(exec_ctx, key, c);
...@@ -348,7 +364,7 @@ static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { ...@@ -348,7 +364,7 @@ static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
args.interested_parties = c->pollset_set; args.interested_parties = c->pollset_set;
args.addr = c->addr; args.addr = c->addr;
args.addr_len = c->addr_len; args.addr_len = c->addr_len;
args.deadline = compute_connect_deadline(c); args.deadline = c->next_attempt;
args.channel_args = c->args; args.channel_args = c->args;
args.initial_connect_string = c->initial_connect_string; args.initial_connect_string = c->initial_connect_string;
...@@ -359,10 +375,8 @@ static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { ...@@ -359,10 +375,8 @@ static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
} }
static void start_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { static void start_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
c->backoff_delta = gpr_time_from_seconds(
GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS, GPR_TIMESPAN);
c->next_attempt = c->next_attempt =
gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), c->backoff_delta); gpr_backoff_begin(&c->backoff_state, gpr_now(GPR_CLOCK_MONOTONIC));
continue_connect(exec_ctx, c); continue_connect(exec_ctx, c);
} }
...@@ -579,50 +593,6 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { ...@@ -579,50 +593,6 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
gpr_free((void *)filters); gpr_free((void *)filters);
} }
/* Generate a random number between 0 and 1. */
static double generate_uniform_random_number(grpc_subchannel *c) {
c->random = (1103515245 * c->random + 12345) % ((uint32_t)1 << 31);
return c->random / (double)((uint32_t)1 << 31);
}
/* Update backoff_delta and next_attempt in subchannel */
static void update_reconnect_parameters(grpc_subchannel *c) {
size_t i;
int32_t backoff_delta_millis, jitter;
int32_t max_backoff_millis =
GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000;
double jitter_range;
if (c->args) {
for (i = 0; i < c->args->num_args; i++) {
if (0 == strcmp(c->args->args[i].key,
"grpc.testing.fixed_reconnect_backoff")) {
GPR_ASSERT(c->args->args[i].type == GRPC_ARG_INTEGER);
c->next_attempt = gpr_time_add(
gpr_now(GPR_CLOCK_MONOTONIC),
gpr_time_from_millis(c->args->args[i].value.integer, GPR_TIMESPAN));
return;
}
}
}
backoff_delta_millis =
(int32_t)(gpr_time_to_millis(c->backoff_delta) *
GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER);
if (backoff_delta_millis > max_backoff_millis) {
backoff_delta_millis = max_backoff_millis;
}
c->backoff_delta = gpr_time_from_millis(backoff_delta_millis, GPR_TIMESPAN);
c->next_attempt =
gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), c->backoff_delta);
jitter_range = GRPC_SUBCHANNEL_RECONNECT_JITTER * backoff_delta_millis;
jitter =
(int32_t)((2 * generate_uniform_random_number(c) - 1) * jitter_range);
c->next_attempt =
gpr_time_add(c->next_attempt, gpr_time_from_millis(jitter, GPR_TIMESPAN));
}
static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, bool iomgr_success) { static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, bool iomgr_success) {
grpc_subchannel *c = arg; grpc_subchannel *c = arg;
gpr_mu_lock(&c->mu); gpr_mu_lock(&c->mu);
...@@ -631,7 +601,8 @@ static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, bool iomgr_success) { ...@@ -631,7 +601,8 @@ static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, bool iomgr_success) {
iomgr_success = 0; iomgr_success = 0;
} }
if (iomgr_success) { if (iomgr_success) {
update_reconnect_parameters(c); c->next_attempt =
gpr_backoff_step(&c->backoff_state, gpr_now(GPR_CLOCK_MONOTONIC));
continue_connect(exec_ctx, c); continue_connect(exec_ctx, c);
gpr_mu_unlock(&c->mu); gpr_mu_unlock(&c->mu);
} else { } else {
...@@ -661,17 +632,6 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg, ...@@ -661,17 +632,6 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
} }
} }
static gpr_timespec compute_connect_deadline(grpc_subchannel *c) {
gpr_timespec current_deadline =
gpr_time_add(c->next_attempt, c->backoff_delta);
gpr_timespec min_deadline = gpr_time_add(
gpr_now(GPR_CLOCK_MONOTONIC),
gpr_time_from_seconds(GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS,
GPR_TIMESPAN));
return gpr_time_cmp(current_deadline, min_deadline) > 0 ? current_deadline
: min_deadline;
}
/* /*
* grpc_subchannel_call implementation * grpc_subchannel_call implementation
*/ */
...@@ -686,8 +646,8 @@ static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call, ...@@ -686,8 +646,8 @@ static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call,
GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0); GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0);
} }
void grpc_subchannel_call_ref(grpc_subchannel_call *c void grpc_subchannel_call_ref(
GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON); GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON);
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment