Skip to content
Snippets Groups Projects
Commit 47e12924 authored by David Garcia Quintas's avatar David Garcia Quintas
Browse files

PR comments

parent ecfe2d6c
No related branches found
No related tags found
No related merge requests found
...@@ -191,6 +191,21 @@ static void rr_subchannel_list_unref(grpc_exec_ctx *exec_ctx, ...@@ -191,6 +191,21 @@ static void rr_subchannel_list_unref(grpc_exec_ctx *exec_ctx,
} }
} }
static rr_subchannel_list *rr_subchannel_list_create(round_robin_lb_policy *p,
size_t num_subchannels) {
rr_subchannel_list *subchannel_list = gpr_zalloc(sizeof(*subchannel_list));
subchannel_list->policy = p;
subchannel_list->subchannels =
gpr_zalloc(sizeof(subchannel_data) * num_subchannels);
subchannel_list->num_subchannels = num_subchannels;
gpr_ref_init(&subchannel_list->refcount, 1);
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_DEBUG, "Created subchannel list %p for %lu subchannels",
(void *)subchannel_list, (unsigned long)num_subchannels);
}
return subchannel_list;
}
/** Mark \a subchannel_list as discarded. Unsubscribes all its subchannels. The /** Mark \a subchannel_list as discarded. Unsubscribes all its subchannels. The
* watcher's callback will ultimately unref \a subchannel_list. */ * watcher's callback will ultimately unref \a subchannel_list. */
static void rr_subchannel_list_shutdown(grpc_exec_ctx *exec_ctx, static void rr_subchannel_list_shutdown(grpc_exec_ctx *exec_ctx,
...@@ -217,7 +232,8 @@ static void rr_subchannel_list_shutdown(grpc_exec_ctx *exec_ctx, ...@@ -217,7 +232,8 @@ static void rr_subchannel_list_shutdown(grpc_exec_ctx *exec_ctx,
&sd->connectivity_changed_closure); &sd->connectivity_changed_closure);
} }
} }
rr_subchannel_list_unref(exec_ctx, subchannel_list, reason); // Corresponds to the creation ref.
rr_subchannel_list_unref(exec_ctx, subchannel_list, "creation");
} }
/** Returns the index into p->subchannel_list->subchannels of the next /** Returns the index into p->subchannel_list->subchannels of the next
...@@ -316,14 +332,16 @@ static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { ...@@ -316,14 +332,16 @@ static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list, rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list,
"sl_shutdown_rr_shutdown"); "sl_shutdown_rr_shutdown");
rr_subchannel_list_unref(exec_ctx, p->subchannel_list, rr_subchannel_list_unref(exec_ctx, p->subchannel_list,
"sl_unref_rr_shutdown"); "sl_shutdown_current+make_pending");
if (!latest_is_current && p->latest_pending_subchannel_list != NULL && if (!latest_is_current && p->latest_pending_subchannel_list != NULL &&
!p->latest_pending_subchannel_list->shutting_down) { !p->latest_pending_subchannel_list->shutting_down) {
rr_subchannel_list_shutdown(exec_ctx, p->latest_pending_subchannel_list, rr_subchannel_list_shutdown(exec_ctx, p->latest_pending_subchannel_list,
"sl_shutdown_pending_rr_shutdown"); "sl_shutdown_pending_rr_shutdown");
rr_subchannel_list_unref(exec_ctx, p->latest_pending_subchannel_list,
"sl_shutdown_pending+make_pending");
p->latest_pending_subchannel_list = NULL;
} }
p->subchannel_list = NULL; p->subchannel_list = NULL;
p->latest_pending_subchannel_list = NULL;
} }
static void rr_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, static void rr_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
...@@ -380,7 +398,7 @@ static void start_picking_locked(grpc_exec_ctx *exec_ctx, ...@@ -380,7 +398,7 @@ static void start_picking_locked(grpc_exec_ctx *exec_ctx,
for (size_t i = 0; i < p->subchannel_list->num_subchannels; i++) { for (size_t i = 0; i < p->subchannel_list->num_subchannels; i++) {
subchannel_data *sd = &p->subchannel_list->subchannels[i]; subchannel_data *sd = &p->subchannel_list->subchannels[i];
GRPC_LB_POLICY_WEAK_REF(&p->base, "start_picking_locked"); GRPC_LB_POLICY_WEAK_REF(&p->base, "start_picking_locked");
rr_subchannel_list_ref(sd->subchannel_list, "start_picking_locked"); rr_subchannel_list_ref(sd->subchannel_list, "started_picking");
grpc_subchannel_notify_on_state_change( grpc_subchannel_notify_on_state_change(
exec_ctx, sd->subchannel, p->base.interested_parties, exec_ctx, sd->subchannel, p->base.interested_parties,
&sd->pending_connectivity_state_unsafe, &sd->pending_connectivity_state_unsafe,
...@@ -548,15 +566,18 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, ...@@ -548,15 +566,18 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
} }
// If the policy is shutting down, unref and return. // If the policy is shutting down, unref and return.
if (p->shutdown) { if (p->shutdown) {
rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, "pol_shutdown"); rr_subchannel_list_unref(exec_ctx, sd->subchannel_list,
"pol_shutdown+started_picking");
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pol_shutdown"); GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pol_shutdown");
return; return;
} }
if (sd->subchannel_list->shutting_down && error == GRPC_ERROR_CANCELLED) { if (sd->subchannel_list->shutting_down && error == GRPC_ERROR_CANCELLED) {
// the subchannel list associated with sd has been discarded. This callback // the subchannel list associated with sd has been discarded. This callback
// corresponds to the unsubscription. // corresponds to the unsubscription. The unrefs correspond to the picking
rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, "sl_shutdown"); // ref (start_picking_locked or update_started_picking).
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "sl_shutdown"); rr_subchannel_list_unref(exec_ctx, sd->subchannel_list,
"sl_shutdown+started_picking");
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "sl_shutdown+picking");
return; return;
} }
// Dispose of outdated subchannel lists. // Dispose of outdated subchannel lists.
...@@ -565,7 +586,7 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, ...@@ -565,7 +586,7 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
// sd belongs to an outdated subchannel_list: get rid of it. // sd belongs to an outdated subchannel_list: get rid of it.
rr_subchannel_list_shutdown(exec_ctx, sd->subchannel_list, "sl_outdated"); rr_subchannel_list_shutdown(exec_ctx, sd->subchannel_list, "sl_outdated");
rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, rr_subchannel_list_unref(exec_ctx, sd->subchannel_list,
"sl_outdated+unref"); "sl_outdated+started_picking");
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "sl_outdated"); GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "sl_outdated");
return; return;
} }
...@@ -589,7 +610,7 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, ...@@ -589,7 +610,7 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
sd->user_data = NULL; sd->user_data = NULL;
} }
if (new_policy_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { if (new_policy_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
/* the policy is shutting down. Flush all the pending picks... */ // the policy is shutting down. Flush all the pending picks...
pending_pick *pp; pending_pick *pp;
while ((pp = p->pending_picks)) { while ((pp = p->pending_picks)) {
p->pending_picks = pp->next; p->pending_picks = pp->next;
...@@ -598,8 +619,9 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, ...@@ -598,8 +619,9 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
gpr_free(pp); gpr_free(pp);
} }
} }
/* unref the "rr_connectivity" weak ref from start_picking */ rr_subchannel_list_unref(exec_ctx, sd->subchannel_list,
rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, "sd_shutdown"); "sd_shutdown+started_picking");
// unref the "rr_connectivity_update" weak ref from start_picking.
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base,
"rr_connectivity_sd_shutdown"); "rr_connectivity_sd_shutdown");
} else { // sd not in SHUTDOWN } else { // sd not in SHUTDOWN
...@@ -627,10 +649,11 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, ...@@ -627,10 +649,11 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list, rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list,
"sl_phase_out_shutdown"); "sl_phase_out_shutdown");
rr_subchannel_list_unref(exec_ctx, p->subchannel_list, rr_subchannel_list_unref(exec_ctx, p->subchannel_list,
"sl_phase_out_shutdown+unref"); "sl_phase_out_shutdown+started_picking");
} }
rr_subchannel_list_ref(p->latest_pending_subchannel_list, // Promote pending list: No need to take a ref on
"sl_promotion"); // p->latest_pending_subchannel_list: reusing its "make_pending"
// one.
p->subchannel_list = p->latest_pending_subchannel_list; p->subchannel_list = p->latest_pending_subchannel_list;
p->latest_pending_subchannel_list = NULL; p->latest_pending_subchannel_list = NULL;
} }
...@@ -665,8 +688,8 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, ...@@ -665,8 +688,8 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
gpr_free(pp); gpr_free(pp);
} }
} }
/* renew notification: reuses the "rr_connectivity" weak ref on the policy /* renew notification: reuses the "rr_connectivity_update" weak ref on the
* as well as the sd->subchannel_list ref. */ * policy as well as the sd->subchannel_list ref. */
grpc_subchannel_notify_on_state_change( grpc_subchannel_notify_on_state_change(
exec_ctx, sd->subchannel, p->base.interested_parties, exec_ctx, sd->subchannel, p->base.interested_parties,
&sd->pending_connectivity_state_unsafe, &sd->pending_connectivity_state_unsafe,
...@@ -744,18 +767,13 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, ...@@ -744,18 +767,13 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list, rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list,
"sl_shutdown_empty_update"); "sl_shutdown_empty_update");
rr_subchannel_list_unref(exec_ctx, p->subchannel_list, rr_subchannel_list_unref(exec_ctx, p->subchannel_list,
"sl_shutdown_empty_update+unref"); "sl_shutdown_empty_update+make_pending");
p->subchannel_list = NULL; p->subchannel_list = NULL;
} }
return; return;
} }
size_t subchannel_index = 0; size_t subchannel_index = 0;
rr_subchannel_list *subchannel_list = gpr_zalloc(sizeof(*subchannel_list)); rr_subchannel_list *subchannel_list = rr_subchannel_list_create(p, num_addrs);
subchannel_list->policy = p;
subchannel_list->subchannels =
gpr_zalloc(sizeof(subchannel_data) * num_addrs);
subchannel_list->num_subchannels = num_addrs;
gpr_ref_init(&subchannel_list->refcount, 1);
if (p->latest_pending_subchannel_list != NULL && p->started_picking) { if (p->latest_pending_subchannel_list != NULL && p->started_picking) {
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_DEBUG, gpr_log(GPR_DEBUG,
...@@ -766,12 +784,11 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, ...@@ -766,12 +784,11 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
} }
rr_subchannel_list_shutdown(exec_ctx, p->latest_pending_subchannel_list, rr_subchannel_list_shutdown(exec_ctx, p->latest_pending_subchannel_list,
"sl_outdated_dont_smash"); "sl_outdated_dont_smash");
rr_subchannel_list_unref(exec_ctx, p->latest_pending_subchannel_list,
"sl_outdated_dont_smash+make_pending");
} }
p->latest_pending_subchannel_list = subchannel_list; p->latest_pending_subchannel_list = subchannel_list;
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { rr_subchannel_list_ref(p->latest_pending_subchannel_list, "make_pending");
gpr_log(GPR_DEBUG, "Created subchannel list %p for %lu subchannels",
(void *)subchannel_list, (unsigned long)num_addrs);
}
grpc_subchannel_args sc_args; grpc_subchannel_args sc_args;
/* We need to remove the LB addresses in order to be able to compare the /* We need to remove the LB addresses in order to be able to compare the
* subchannel keys of subchannels from a different batch of addresses. */ * subchannel keys of subchannels from a different batch of addresses. */
...@@ -840,8 +857,10 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, ...@@ -840,8 +857,10 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
if (p->subchannel_list != NULL) { if (p->subchannel_list != NULL) {
rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list, rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list,
"rr_update_before_started_picking"); "rr_update_before_started_picking");
rr_subchannel_list_unref(exec_ctx, subchannel_list,
"rr_update_before_started_picking+make_pending");
} }
rr_subchannel_list_ref(subchannel_list, "sl_initial_promotion"); // Recycles "make_pending" reference.
p->subchannel_list = subchannel_list; p->subchannel_list = subchannel_list;
p->latest_pending_subchannel_list = NULL; p->latest_pending_subchannel_list = NULL;
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment