Skip to content
Snippets Groups Projects
Commit e2a6510a authored by Craig Tiller's avatar Craig Tiller
Browse files

Fix refcounting problem, closure reordering problem

parent 2a1bb7f0
No related branches found
No related tags found
No related merge requests found
...@@ -38,6 +38,8 @@ ...@@ -38,6 +38,8 @@
#include <grpc/support/alloc.h> #include <grpc/support/alloc.h>
#include "src/core/transport/connectivity_state.h" #include "src/core/transport/connectivity_state.h"
typedef struct round_robin_lb_policy round_robin_lb_policy;
int grpc_lb_round_robin_trace = 0; int grpc_lb_round_robin_trace = 0;
/** List of entities waiting for a pick. /** List of entities waiting for a pick.
...@@ -58,22 +60,27 @@ typedef struct ready_list { ...@@ -58,22 +60,27 @@ typedef struct ready_list {
} ready_list; } ready_list;
typedef struct { typedef struct {
size_t subchannel_idx; /**< Index over p->subchannels */ /** index within policy->subchannels */
void *p; /**< round_robin_lb_policy instance */ size_t index;
} connectivity_changed_cb_arg; /** backpointer to owning policy */
round_robin_lb_policy *policy;
typedef struct { /** subchannel itself */
grpc_subchannel *subchannel;
/** notification that connectivity has changed on subchannel */
grpc_closure connectivity_changed_closure;
/** this subchannels current position in subchannel->ready_list */
ready_list *ready_list_node;
/** last observed connectivity */
grpc_connectivity_state connectivity_state;
} subchannel_data;
struct round_robin_lb_policy {
/** base policy: must be first */ /** base policy: must be first */
grpc_lb_policy base; grpc_lb_policy base;
/** all our subchannels */ /** all our subchannels */
grpc_subchannel **subchannels;
size_t num_subchannels; size_t num_subchannels;
subchannel_data **subchannels;
/** Callbacks, one per subchannel being watched, to be called when their
* respective connectivity changes */
grpc_closure *connectivity_changed_cbs;
connectivity_changed_cb_arg *cb_args;
/** mutex protecting remaining members */ /** mutex protecting remaining members */
gpr_mu mu; gpr_mu mu;
...@@ -81,8 +88,6 @@ typedef struct { ...@@ -81,8 +88,6 @@ typedef struct {
int started_picking; int started_picking;
/** are we shutting down? */ /** are we shutting down? */
int shutdown; int shutdown;
/** Connectivity state of the subchannels being watched */
grpc_connectivity_state *subchannel_connectivity;
/** List of picks that are waiting on connectivity */ /** List of picks that are waiting on connectivity */
pending_pick *pending_picks; pending_pick *pending_picks;
...@@ -93,13 +98,7 @@ typedef struct { ...@@ -93,13 +98,7 @@ typedef struct {
ready_list ready_list; ready_list ready_list;
/** Last pick from the ready list. */ /** Last pick from the ready list. */
ready_list *ready_list_last_pick; ready_list *ready_list_last_pick;
};
/** Subchannel index to ready_list node.
*
* Kept in order to remove nodes from the ready list associated with a
* subchannel */
ready_list **subchannel_index_to_readylist_node;
} round_robin_lb_policy;
/** Returns the next subchannel from the connected list or NULL if the list is /** Returns the next subchannel from the connected list or NULL if the list is
* empty. * empty.
...@@ -205,10 +204,10 @@ void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { ...@@ -205,10 +204,10 @@ void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
size_t i; size_t i;
ready_list *elem; ready_list *elem;
for (i = 0; i < p->num_subchannels; i++) { for (i = 0; i < p->num_subchannels; i++) {
GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "round_robin"); subchannel_data *sd = p->subchannels[i];
GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "round_robin");
gpr_free(sd);
} }
gpr_free(p->connectivity_changed_cbs);
gpr_free(p->subchannel_connectivity);
grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker); grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker);
gpr_free(p->subchannels); gpr_free(p->subchannels);
...@@ -224,8 +223,6 @@ void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { ...@@ -224,8 +223,6 @@ void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
gpr_free(elem); gpr_free(elem);
elem = tmp; elem = tmp;
} }
gpr_free(p->subchannel_index_to_readylist_node);
gpr_free(p->cb_args);
gpr_free(p); gpr_free(p);
} }
...@@ -248,10 +245,11 @@ gpr_log(GPR_DEBUG, "LB_POLICY: rr_shutdown: p=%p num_subchannels=%d", p, p->num_ ...@@ -248,10 +245,11 @@ gpr_log(GPR_DEBUG, "LB_POLICY: rr_shutdown: p=%p num_subchannels=%d", p, p->num_
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_FATAL_FAILURE, "shutdown"); GRPC_CHANNEL_FATAL_FAILURE, "shutdown");
for (i = 0; i < p->num_subchannels; i++) { for (i = 0; i < p->num_subchannels; i++) {
grpc_subchannel_notify_on_state_change(exec_ctx, p->subchannels[i], subchannel_data *sd = p->subchannels[i];
grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel,
NULL, NULL,
NULL, NULL,
&p->connectivity_changed_cbs[i]); &sd->connectivity_changed_closure);
} }
gpr_mu_unlock(&p->mu); gpr_mu_unlock(&p->mu);
} }
...@@ -286,11 +284,12 @@ static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) { ...@@ -286,11 +284,12 @@ static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) {
gpr_log(GPR_DEBUG, "LB_POLICY: p=%p num_subchannels=%d", p, p->num_subchannels); gpr_log(GPR_DEBUG, "LB_POLICY: p=%p num_subchannels=%d", p, p->num_subchannels);
for (i = 0; i < p->num_subchannels; i++) { for (i = 0; i < p->num_subchannels; i++) {
p->subchannel_connectivity[i] = GRPC_CHANNEL_IDLE; subchannel_data *sd = p->subchannels[i];
grpc_subchannel_notify_on_state_change(exec_ctx, p->subchannels[i], sd->connectivity_state = GRPC_CHANNEL_IDLE;
grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel,
&p->base.interested_parties, &p->base.interested_parties,
&p->subchannel_connectivity[i], &sd->connectivity_state,
&p->connectivity_changed_cbs[i]); &sd->connectivity_changed_closure);
GRPC_LB_POLICY_WEAK_REF(&p->base, "round_robin_connectivity"); GRPC_LB_POLICY_WEAK_REF(&p->base, "round_robin_connectivity");
} }
} }
...@@ -340,33 +339,26 @@ int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, ...@@ -340,33 +339,26 @@ int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset,
static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
int iomgr_success) { int iomgr_success) {
connectivity_changed_cb_arg *cb_arg = arg; subchannel_data *sd = arg;
round_robin_lb_policy *p = cb_arg->p; round_robin_lb_policy *p = sd->policy;
/* index over p->subchannels of this cb's subchannel */
const size_t this_idx = cb_arg->subchannel_idx;
pending_pick *pp; pending_pick *pp;
ready_list *selected; ready_list *selected;
int unref = 0; int unref = 0;
/* connectivity state of this cb's subchannel */
grpc_connectivity_state *this_connectivity;
gpr_mu_lock(&p->mu); gpr_mu_lock(&p->mu);
this_connectivity = &p->subchannel_connectivity[this_idx];
if (p->shutdown) { if (p->shutdown) {
unref = 1; unref = 1;
} else { } else {
switch (*this_connectivity) { switch (sd->connectivity_state) {
case GRPC_CHANNEL_READY: case GRPC_CHANNEL_READY:
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_READY, "connecting_ready"); GRPC_CHANNEL_READY, "connecting_ready");
/* add the newly connected subchannel to the list of connected ones. /* add the newly connected subchannel to the list of connected ones.
* Note that it goes to the "end of the line". */ * Note that it goes to the "end of the line". */
p->subchannel_index_to_readylist_node[this_idx] = sd->ready_list_node =
add_connected_sc_locked(p, p->subchannels[this_idx]); add_connected_sc_locked(p, sd->subchannel);
/* at this point we know there's at least one suitable subchannel. Go /* at this point we know there's at least one suitable subchannel. Go
* ahead and pick one and notify the pending suitors in * ahead and pick one and notify the pending suitors in
* p->pending_picks. This preemtively replicates rr_pick()'s actions. */ * p->pending_picks. This preemtively replicates rr_pick()'s actions. */
...@@ -391,52 +383,54 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, ...@@ -391,52 +383,54 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
} }
grpc_subchannel_notify_on_state_change( grpc_subchannel_notify_on_state_change(
exec_ctx, exec_ctx,
p->subchannels[this_idx], sd->subchannel,
&p->base.interested_parties, &p->base.interested_parties,
this_connectivity, &sd->connectivity_state,
&p->connectivity_changed_cbs[this_idx]); &sd->connectivity_changed_closure);
break; break;
case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_IDLE: case GRPC_CHANNEL_IDLE:
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
*this_connectivity, "connecting_changed"); sd->connectivity_state, "connecting_changed");
grpc_subchannel_notify_on_state_change( grpc_subchannel_notify_on_state_change(
exec_ctx, p->subchannels[this_idx], exec_ctx, sd->subchannel,
&p->base.interested_parties, &p->base.interested_parties,
this_connectivity, &sd->connectivity_state,
&p->connectivity_changed_cbs[this_idx]); &sd->connectivity_changed_closure);
break; break;
case GRPC_CHANNEL_TRANSIENT_FAILURE: case GRPC_CHANNEL_TRANSIENT_FAILURE:
/* renew state notification */ /* renew state notification */
grpc_subchannel_notify_on_state_change( grpc_subchannel_notify_on_state_change(
exec_ctx, p->subchannels[this_idx], exec_ctx, sd->subchannel,
&p->base.interested_parties, &p->base.interested_parties,
this_connectivity, &sd->connectivity_state,
&p->connectivity_changed_cbs[this_idx]); &sd->connectivity_changed_closure);
/* remove from ready list if still present */ /* remove from ready list if still present */
if (p->subchannel_index_to_readylist_node[this_idx] != NULL) { if (sd->ready_list_node != NULL) {
remove_disconnected_sc_locked( remove_disconnected_sc_locked(p, sd->ready_list_node);
p, p->subchannel_index_to_readylist_node[this_idx]); sd->ready_list_node = NULL;
p->subchannel_index_to_readylist_node[this_idx] = NULL;
} }
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_CHANNEL_TRANSIENT_FAILURE,
"connecting_transient_failure"); "connecting_transient_failure");
break; break;
case GRPC_CHANNEL_FATAL_FAILURE: case GRPC_CHANNEL_FATAL_FAILURE:
if (p->subchannel_index_to_readylist_node[this_idx] != NULL) { if (sd->ready_list_node != NULL) {
remove_disconnected_sc_locked( remove_disconnected_sc_locked(
p, p->subchannel_index_to_readylist_node[this_idx]); p, sd->ready_list_node);
p->subchannel_index_to_readylist_node[this_idx] = NULL; sd->ready_list_node = NULL;
} }
GPR_SWAP(grpc_subchannel *, p->subchannels[this_idx],
p->subchannels[p->num_subchannels - 1]);
p->num_subchannels--; p->num_subchannels--;
GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[p->num_subchannels], GPR_SWAP(subchannel_data *, p->subchannels[sd->index],
p->subchannels[p->num_subchannels]);
GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel,
"round_robin"); "round_robin");
p->subchannels[sd->index]->index = sd->index;
gpr_free(sd);
unref = 1;
if (p->num_subchannels == 0) { if (p->num_subchannels == 0) {
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_FATAL_FAILURE, GRPC_CHANNEL_FATAL_FAILURE,
...@@ -447,7 +441,6 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, ...@@ -447,7 +441,6 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1); grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
gpr_free(pp); gpr_free(pp);
} }
unref = 1;
} else { } else {
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_CHANNEL_TRANSIENT_FAILURE,
...@@ -499,27 +492,23 @@ static grpc_lb_policy *create_round_robin(grpc_lb_policy_factory *factory, ...@@ -499,27 +492,23 @@ static grpc_lb_policy *create_round_robin(grpc_lb_policy_factory *factory,
GPR_ASSERT(args->num_subchannels > 0); GPR_ASSERT(args->num_subchannels > 0);
memset(p, 0, sizeof(*p)); memset(p, 0, sizeof(*p));
grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable); grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable);
p->subchannels =
gpr_malloc(sizeof(grpc_subchannel *) * args->num_subchannels);
p->num_subchannels = args->num_subchannels; p->num_subchannels = args->num_subchannels;
p->subchannels =
gpr_malloc(sizeof(*p->subchannels) * p->num_subchannels);
memset(p->subchannels, 0, sizeof(*p->subchannels) * p->num_subchannels);
grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE,
"round_robin"); "round_robin");
memcpy(p->subchannels, args->subchannels,
sizeof(grpc_subchannel *) * args->num_subchannels);
gpr_mu_init(&p->mu); gpr_mu_init(&p->mu);
p->connectivity_changed_cbs =
gpr_malloc(sizeof(grpc_closure) * args->num_subchannels);
p->subchannel_connectivity =
gpr_malloc(sizeof(grpc_connectivity_state) * args->num_subchannels);
p->cb_args =
gpr_malloc(sizeof(connectivity_changed_cb_arg) * args->num_subchannels);
for (i = 0; i < args->num_subchannels; i++) { for (i = 0; i < args->num_subchannels; i++) {
p->cb_args[i].subchannel_idx = i; subchannel_data *sd = gpr_malloc(sizeof(*sd));
p->cb_args[i].p = p; memset(sd, 0, sizeof(*sd));
grpc_closure_init(&p->connectivity_changed_cbs[i], rr_connectivity_changed, p->subchannels[i] = sd;
&p->cb_args[i]); sd->policy = p;
sd->index = i;
sd->subchannel = args->subchannels[i];
grpc_closure_init(&sd->connectivity_changed_closure, rr_connectivity_changed,
sd);
} }
/* The (dummy node) root of the ready list */ /* The (dummy node) root of the ready list */
...@@ -528,10 +517,6 @@ static grpc_lb_policy *create_round_robin(grpc_lb_policy_factory *factory, ...@@ -528,10 +517,6 @@ static grpc_lb_policy *create_round_robin(grpc_lb_policy_factory *factory,
p->ready_list.next = NULL; p->ready_list.next = NULL;
p->ready_list_last_pick = &p->ready_list; p->ready_list_last_pick = &p->ready_list;
p->subchannel_index_to_readylist_node =
gpr_malloc(sizeof(grpc_subchannel *) * args->num_subchannels);
memset(p->subchannel_index_to_readylist_node, 0,
sizeof(grpc_subchannel *) * args->num_subchannels);
return &p->base; return &p->base;
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment