Skip to content
Snippets Groups Projects
Commit 4b2def36 authored by David Garcia Quintas's avatar David Garcia Quintas
Browse files

Fix RR concurrent updates

parent af724acc
No related branches found
No related tags found
No related merge requests found
......@@ -195,11 +195,28 @@ static void rr_subchannel_list_unref(grpc_exec_ctx *exec_ctx,
static void rr_subchannel_list_shutdown(grpc_exec_ctx *exec_ctx,
rr_subchannel_list *subchannel_list,
const char *reason) {
if (subchannel_list->shutting_down) {
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_DEBUG, "Subchannel list %p already shutting down",
(void *)subchannel_list);
}
return;
};
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_DEBUG, "Shutting down subchannel_list %p",
(void *)subchannel_list);
}
GPR_ASSERT(!subchannel_list->shutting_down);
subchannel_list->shutting_down = true;
for (size_t i = 0; i < subchannel_list->num_subchannels; i++) {
subchannel_data *sd = &subchannel_list->subchannels[i];
if (sd->subchannel != NULL) { // if subchannel isn't shutdown, unsubscribe.
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_DEBUG,
"Unsubscribing from subchannel %p as part of shutting down "
"subchannel_list %p",
(void *)sd->subchannel, (void *)subchannel_list);
}
grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL,
NULL,
&sd->connectivity_changed_closure);
......@@ -228,13 +245,14 @@ static size_t get_next_ready_subchannel_index_locked(
const size_t index = (i + p->last_ready_subchannel_index + 1) %
p->subchannel_list->num_subchannels;
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_DEBUG,
"[RR %p] checking subchannel %p, subchannel_list %p, index %lu: "
"state=%d",
(void *)p,
(void *)p->subchannel_list->subchannels[index].subchannel,
(void *)p->subchannel_list, (unsigned long)index,
p->subchannel_list->subchannels[index].curr_connectivity_state);
gpr_log(
GPR_DEBUG,
"[RR %p] checking subchannel %p, subchannel_list %p, index %lu: "
"state=%s",
(void *)p, (void *)p->subchannel_list->subchannels[index].subchannel,
(void *)p->subchannel_list, (unsigned long)index,
grpc_connectivity_state_name(
p->subchannel_list->subchannels[index].curr_connectivity_state));
}
if (p->subchannel_list->subchannels[index].curr_connectivity_state ==
GRPC_CHANNEL_READY) {
......@@ -511,16 +529,27 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
subchannel_data *sd = arg;
round_robin_lb_policy *p = sd->subchannel_list->policy;
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(
GPR_DEBUG,
"[RR %p] connectivity changed for subchannel %p, subchannel_list %p: "
"prev_state=%s new_state=%s p->shutdown=%d "
"sd->subchannel_list->shutting_down=%d error=%s",
(void *)p, (void *)sd->subchannel, (void *)sd->subchannel_list,
grpc_connectivity_state_name(sd->prev_connectivity_state),
grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe),
p->shutdown, sd->subchannel_list->shutting_down,
grpc_error_string(error));
}
// If the policy is shutting down, unref and return.
if (p->shutdown) {
rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, "pol_shutdown");
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pol_shutdown");
return;
}
if (sd->subchannel_list->shutting_down) {
if (sd->subchannel_list->shutting_down && error == GRPC_ERROR_CANCELLED) {
// the subchannel list associated with sd has been discarded. This callback
// corresponds to the unsubscription.
GPR_ASSERT(error == GRPC_ERROR_CANCELLED);
rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, "sl_shutdown");
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "sl_shutdown");
return;
......@@ -536,13 +565,6 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
// state (which was set by the connectivity state watcher) to
// curr_connectivity_state, which is what we use inside of the combiner.
sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe;
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_DEBUG,
"[RR %p] connectivity changed for subchannel %p: "
"prev_state=%d new_state=%d",
(void *)p, (void *)sd->subchannel, sd->prev_connectivity_state,
sd->curr_connectivity_state);
}
// Update state counters and determine new overall state.
update_state_counters_locked(sd);
sd->prev_connectivity_state = sd->curr_connectivity_state;
......
......@@ -463,6 +463,11 @@ TEST_F(ClientLbEnd2endTest, RoundRobinManyUpdates) {
EXPECT_EQ("round_robin", channel_->GetLoadBalancingPolicyName());
}
TEST_F(ClientLbEnd2endTest, RoundRobinConcurrentUpdates) {
// TODO(dgq): replicate the way internal testing exercises the concurrent
// update provisions of RR.
}
TEST_F(ClientLbEnd2endTest, RoundRobinReconnect) {
// Start servers and send one RPC per server.
const int kNumServers = 1;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment