Skip to content
Snippets Groups Projects
Commit 956f7009 authored by David Garcia Quintas's avatar David Garcia Quintas
Browse files

Keep LB policy alive during high freq of resolver updates

parent 7f2a15a6
Branches
Tags
No related merge requests found
......@@ -236,14 +236,19 @@ static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx,
grpc_connectivity_state state,
grpc_error *error,
const char *reason) {
if ((state == GRPC_CHANNEL_TRANSIENT_FAILURE ||
state == GRPC_CHANNEL_SHUTDOWN) &&
chand->lb_policy != NULL) {
/* cancel picks with wait_for_ready=false */
grpc_lb_policy_cancel_picks_locked(
exec_ctx, chand->lb_policy,
/* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY,
/* check= */ 0, GRPC_ERROR_REF(error));
if (chand->lb_policy != NULL) {
if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
/* cancel picks with wait_for_ready=false */
grpc_lb_policy_cancel_picks_locked(
exec_ctx, chand->lb_policy,
/* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY,
/* check= */ 0, GRPC_ERROR_REF(error));
} else if (state == GRPC_CHANNEL_SHUTDOWN) {
/* cancel all picks */
grpc_lb_policy_cancel_picks_locked(exec_ctx, chand->lb_policy,
/* mask= */ 0, /* check= */ 0,
GRPC_ERROR_REF(error));
}
}
grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, error,
reason);
......@@ -346,6 +351,37 @@ static void parse_retry_throttle_params(const grpc_json *field, void *arg) {
}
}
// Wrap a closure associated with \a lb_policy. The associated callback (\a
// wrapped_on_pick_closure_cb) is responsible for unref'ing \a lb_policy after
// scheduling \a wrapped_closure.
typedef struct wrapped_on_pick_closure_arg {
/* the closure instance using this struct as argument */
grpc_closure wrapper_closure;
/* the original closure. Usually a on_complete/notify cb for pick() and ping()
* calls against the internal RR instance, respectively. */
grpc_closure *wrapped_closure;
/* The policy instance related to the closure */
grpc_lb_policy *lb_policy;
/* heap memory to be freed upon closure execution. Usually this arg. */
void *free_when_done;
} wrapped_on_pick_closure_arg;
// Invoke \a arg->wrapped_closure, unref \a arg->lb_policy and free
// arg->free_when_done (usually \a arg itself).
static void wrapped_on_pick_closure_cb(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
wrapped_on_pick_closure_arg *wc_arg = arg;
GPR_ASSERT(wc_arg->wrapped_closure != NULL);
GPR_ASSERT(wc_arg->lb_policy != NULL);
GPR_ASSERT(wc_arg->free_when_done != NULL);
grpc_closure_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error));
GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->lb_policy, "pick_subchannel_wrapping");
gpr_free(wc_arg->free_when_done);
}
static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
void *arg, grpc_error *error) {
channel_data *chand = arg;
......@@ -1031,11 +1067,30 @@ static bool pick_subchannel_locked(
const grpc_lb_policy_pick_args inputs = {
initial_metadata, initial_metadata_flags, &calld->lb_token_mdelem,
gpr_inf_future(GPR_CLOCK_MONOTONIC)};
const bool result = grpc_lb_policy_pick_locked(
exec_ctx, lb_policy, &inputs, connected_subchannel, NULL, on_ready);
// Wrap the user-provided callback in order to hold a strong reference to
// the LB policy for the duration of the pick.
wrapped_on_pick_closure_arg *w_on_pick_arg =
gpr_zalloc(sizeof(*w_on_pick_arg));
grpc_closure_init(&w_on_pick_arg->wrapper_closure,
wrapped_on_pick_closure_cb, w_on_pick_arg,
grpc_schedule_on_exec_ctx);
w_on_pick_arg->wrapped_closure = on_ready;
GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel_wrapping");
w_on_pick_arg->lb_policy = lb_policy;
w_on_pick_arg->free_when_done = w_on_pick_arg;
const bool pick_done = grpc_lb_policy_pick_locked(
exec_ctx, lb_policy, &inputs, connected_subchannel, NULL,
&w_on_pick_arg->wrapper_closure);
if (pick_done) {
/* synchronous grpc_lb_policy_pick call. Unref the LB policy. */
GRPC_LB_POLICY_UNREF(exec_ctx, w_on_pick_arg->lb_policy,
"pick_subchannel_wrapping");
gpr_free(w_on_pick_arg->free_when_done);
}
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick_subchannel");
GPR_TIMER_END("pick_subchannel", 0);
return result;
return pick_done;
}
if (chand->resolver != NULL && !chand->started_resolving) {
chand->started_resolving = true;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment