diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c index d2f6a90ca827c9faa0a26aa57fa6f58158ca2f01..5e09a050ee86845cdfcb98c5ced49cc9379a8889 100644 --- a/src/core/channel/channel_stack.c +++ b/src/core/channel/channel_stack.c @@ -106,8 +106,7 @@ void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, int initial_refs, const grpc_channel_filter **filters, size_t filter_count, const grpc_channel_args *channel_args, - const char *name, - grpc_channel_stack *stack) { + const char *name, grpc_channel_stack *stack) { size_t call_size = ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call_stack)) + ROUND_UP_TO_ALIGNMENT_SIZE(filter_count * sizeof(grpc_call_element)); diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h index bb7081b2a28a0b0e8d0f104fff89d4ebd8ff9013..c01050e7179aacd982e079c6d2e5c2910a8c27f9 100644 --- a/src/core/channel/channel_stack.h +++ b/src/core/channel/channel_stack.h @@ -183,8 +183,7 @@ void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg, const grpc_channel_filter **filters, size_t filter_count, const grpc_channel_args *args, - const char *name, - grpc_channel_stack *stack); + const char *name, grpc_channel_stack *stack); /* Destroy a channel stack */ void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_channel_stack *stack); diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 9d3690dc7d272ef32b8a61a386995df709559234..60f2acb9ef45eb7e5a0e49c4dfc817b429c26be8 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -120,7 +120,8 @@ static void on_lb_policy_state_changed_locked( /* check if the notification is for a stale policy */ if (w->lb_policy != w->chand->lb_policy) return; - if (publish_state == GRPC_CHANNEL_FATAL_FAILURE && w->chand->resolver != NULL) { + if (publish_state == GRPC_CHANNEL_FATAL_FAILURE && + w->chand->resolver != NULL) { publish_state = GRPC_CHANNEL_TRANSIENT_FAILURE; grpc_resolver_channel_saw_error(exec_ctx, w->chand->resolver); } @@ -180,7 +181,8 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, chand->incoming_configuration = NULL; if (lb_policy != NULL) { - grpc_pollset_set_add_pollset_set(exec_ctx, &lb_policy->interested_parties, &chand->interested_parties); + grpc_pollset_set_add_pollset_set(exec_ctx, &lb_policy->interested_parties, + &chand->interested_parties); } gpr_mu_lock(&chand->mu_config); @@ -226,7 +228,9 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, } if (old_lb_policy != NULL) { - grpc_pollset_set_del_pollset_set(exec_ctx, &old_lb_policy->interested_parties, &chand->interested_parties); + grpc_pollset_set_del_pollset_set(exec_ctx, + &old_lb_policy->interested_parties, + &chand->interested_parties); GRPC_LB_POLICY_UNREF(exec_ctx, old_lb_policy, "channel"); } @@ -265,7 +269,9 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, destroy_resolver = chand->resolver; chand->resolver = NULL; if (chand->lb_policy != NULL) { - grpc_pollset_set_del_pollset_set(exec_ctx, &chand->lb_policy->interested_parties, &chand->interested_parties); + grpc_pollset_set_del_pollset_set(exec_ctx, + &chand->lb_policy->interested_parties, + &chand->interested_parties); GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); chand->lb_policy = NULL; } @@ -401,7 +407,9 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); } if (chand->lb_policy != NULL) { - grpc_pollset_set_del_pollset_set(exec_ctx, &chand->lb_policy->interested_parties, &chand->interested_parties); + grpc_pollset_set_del_pollset_set(exec_ctx, + &chand->lb_policy->interested_parties, + &chand->interested_parties); GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); } grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker); @@ -472,11 +480,14 @@ typedef struct { grpc_closure my_closure; } external_connectivity_watcher; -static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) { +static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg, + int iomgr_success) { external_connectivity_watcher *w = arg; grpc_closure *follow_up = w->on_complete; - grpc_pollset_set_del_pollset(exec_ctx, &w->chand->interested_parties, w->pollset); - GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "external_connectivity_watcher"); + grpc_pollset_set_del_pollset(exec_ctx, &w->chand->interested_parties, + w->pollset); + GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, + "external_connectivity_watcher"); gpr_free(w); follow_up->cb(exec_ctx, follow_up->cb_arg, iomgr_success); } @@ -491,7 +502,8 @@ void grpc_client_channel_watch_connectivity_state( w->on_complete = on_complete; grpc_pollset_set_add_pollset(exec_ctx, &chand->interested_parties, pollset); grpc_closure_init(&w->my_closure, on_external_watch_complete, w); - GRPC_CHANNEL_STACK_REF(w->chand->owning_stack, "external_connectivity_watcher"); + GRPC_CHANNEL_STACK_REF(w->chand->owning_stack, + "external_connectivity_watcher"); gpr_mu_lock(&chand->mu_config); grpc_connectivity_state_notify_on_state_change( exec_ctx, &chand->state_tracker, state, &w->my_closure); diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index acaf356df4c5844104da1a80d32b772615cc0f81..3bfa7a86fb15dc3a729016951fb83a472bceec8f 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -102,15 +102,19 @@ void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "shutdown"); if (p->selected != NULL) { - grpc_connected_subchannel_notify_on_state_change(exec_ctx, p->selected, NULL, &p->connectivity_changed); + grpc_connected_subchannel_notify_on_state_change( + exec_ctx, p->selected, NULL, &p->connectivity_changed); } else { - grpc_subchannel_notify_on_state_change(exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL, &p->connectivity_changed); + grpc_subchannel_notify_on_state_change( + exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL, + &p->connectivity_changed); } gpr_mu_unlock(&p->mu); while (pp != NULL) { pending_pick *next = pp->next; *pp->target = NULL; - grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset); + grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, + pp->pollset); grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1); gpr_free(pp); pp = next; @@ -127,7 +131,8 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, while (pp != NULL) { pending_pick *next = pp->next; if (pp->target == target) { - grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset); + grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, + pp->pollset); *target = NULL; grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 0); gpr_free(pp); @@ -147,8 +152,8 @@ static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) { GRPC_LB_POLICY_WEAK_REF(&p->base, "pick_first_connectivity"); grpc_subchannel_notify_on_state_change( exec_ctx, p->subchannels[p->checking_subchannel], - &p->base.interested_parties, - &p->checking_connectivity, &p->connectivity_changed); + &p->base.interested_parties, &p->checking_connectivity, + &p->connectivity_changed); } void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { @@ -174,7 +179,8 @@ int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, if (!p->started_picking) { start_picking(exec_ctx, p); } - grpc_pollset_set_add_pollset(exec_ctx, &p->base.interested_parties, pollset); + grpc_pollset_set_add_pollset(exec_ctx, &p->base.interested_parties, + pollset); pp = gpr_malloc(sizeof(*pp)); pp->next = p->pending_picks; pp->pollset = pollset; @@ -254,7 +260,8 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = p->selected; - grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset); + grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, + pp->pollset); grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1); gpr_free(pp); } @@ -273,8 +280,8 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) { grpc_subchannel_notify_on_state_change( exec_ctx, p->subchannels[p->checking_subchannel], - &p->base.interested_parties, - &p->checking_connectivity, &p->connectivity_changed); + &p->base.interested_parties, &p->checking_connectivity, + &p->connectivity_changed); } else { goto loop; } @@ -286,8 +293,8 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, "connecting_changed"); grpc_subchannel_notify_on_state_change( exec_ctx, p->subchannels[p->checking_subchannel], - &p->base.interested_parties, - &p->checking_connectivity, &p->connectivity_changed); + &p->base.interested_parties, &p->checking_connectivity, + &p->connectivity_changed); break; case GRPC_CHANNEL_FATAL_FAILURE: p->num_subchannels--; @@ -305,7 +312,8 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1); gpr_free(pp); } - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, + "pick_first_connectivity"); } else { grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, diff --git a/src/core/client_config/lb_policies/round_robin.c b/src/core/client_config/lb_policies/round_robin.c index f0b8ebe6fecf555abe1b060b3c27dc1b080adfed..b86dba20ee11dd3adb71bbca93401f41389fb6ce 100644 --- a/src/core/client_config/lb_policies/round_robin.c +++ b/src/core/client_config/lb_policies/round_robin.c @@ -244,9 +244,7 @@ void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { GRPC_CHANNEL_FATAL_FAILURE, "shutdown"); for (i = 0; i < p->num_subchannels; i++) { subchannel_data *sd = p->subchannels[i]; - grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, - NULL, - NULL, + grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, NULL, &sd->connectivity_changed_closure); } gpr_mu_unlock(&p->mu); @@ -262,7 +260,8 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, while (pp != NULL) { pending_pick *next = pp->next; if (pp->target == target) { - grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset); + grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, + pp->pollset); *target = NULL; grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 0); gpr_free(pp); @@ -279,15 +278,15 @@ static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) { size_t i; p->started_picking = 1; - gpr_log(GPR_DEBUG, "LB_POLICY: p=%p num_subchannels=%d", p, p->num_subchannels); + gpr_log(GPR_DEBUG, "LB_POLICY: p=%p num_subchannels=%d", p, + p->num_subchannels); for (i = 0; i < p->num_subchannels; i++) { subchannel_data *sd = p->subchannels[i]; sd->connectivity_state = GRPC_CHANNEL_IDLE; - grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, - &p->base.interested_parties, - &sd->connectivity_state, - &sd->connectivity_changed_closure); + grpc_subchannel_notify_on_state_change( + exec_ctx, sd->subchannel, &p->base.interested_parties, + &sd->connectivity_state, &sd->connectivity_changed_closure); GRPC_LB_POLICY_WEAK_REF(&p->base, "round_robin_connectivity"); } } @@ -323,7 +322,8 @@ int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, if (!p->started_picking) { start_picking(exec_ctx, p); } - grpc_pollset_set_add_pollset(exec_ctx, &p->base.interested_parties, pollset); + grpc_pollset_set_add_pollset(exec_ctx, &p->base.interested_parties, + pollset); pp = gpr_malloc(sizeof(*pp)); pp->next = p->pending_picks; pp->pollset = pollset; @@ -355,8 +355,7 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, GRPC_CHANNEL_READY, "connecting_ready"); /* add the newly connected subchannel to the list of connected ones. * Note that it goes to the "end of the line". */ - sd->ready_list_node = - add_connected_sc_locked(p, sd->subchannel); + sd->ready_list_node = add_connected_sc_locked(p, sd->subchannel); /* at this point we know there's at least one suitable subchannel. Go * ahead and pick one and notify the pending suitors in * p->pending_picks. This preemtively replicates rr_pick()'s actions. */ @@ -375,34 +374,29 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)", selected->subchannel, selected); } - grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset); + grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, + pp->pollset); grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1); gpr_free(pp); } grpc_subchannel_notify_on_state_change( - exec_ctx, - sd->subchannel, - &p->base.interested_parties, - &sd->connectivity_state, - &sd->connectivity_changed_closure); + exec_ctx, sd->subchannel, &p->base.interested_parties, + &sd->connectivity_state, &sd->connectivity_changed_closure); break; case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_IDLE: grpc_connectivity_state_set(exec_ctx, &p->state_tracker, - sd->connectivity_state, "connecting_changed"); + sd->connectivity_state, + "connecting_changed"); grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, - &p->base.interested_parties, - &sd->connectivity_state, - &sd->connectivity_changed_closure); + exec_ctx, sd->subchannel, &p->base.interested_parties, + &sd->connectivity_state, &sd->connectivity_changed_closure); break; case GRPC_CHANNEL_TRANSIENT_FAILURE: /* renew state notification */ grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, - &p->base.interested_parties, - &sd->connectivity_state, - &sd->connectivity_changed_closure); + exec_ctx, sd->subchannel, &p->base.interested_parties, + &sd->connectivity_state, &sd->connectivity_changed_closure); /* remove from ready list if still present */ if (sd->ready_list_node != NULL) { @@ -415,16 +409,14 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, break; case GRPC_CHANNEL_FATAL_FAILURE: if (sd->ready_list_node != NULL) { - remove_disconnected_sc_locked( - p, sd->ready_list_node); + remove_disconnected_sc_locked(p, sd->ready_list_node); sd->ready_list_node = NULL; } p->num_subchannels--; GPR_SWAP(subchannel_data *, p->subchannels[sd->index], p->subchannels[p->num_subchannels]); - GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, - "round_robin"); + GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "round_robin"); p->subchannels[sd->index]->index = sd->index; gpr_free(sd); @@ -491,8 +483,7 @@ static grpc_lb_policy *create_round_robin(grpc_lb_policy_factory *factory, memset(p, 0, sizeof(*p)); grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable); p->num_subchannels = args->num_subchannels; - p->subchannels = - gpr_malloc(sizeof(*p->subchannels) * p->num_subchannels); + p->subchannels = gpr_malloc(sizeof(*p->subchannels) * p->num_subchannels); memset(p->subchannels, 0, sizeof(*p->subchannels) * p->num_subchannels); grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, "round_robin"); @@ -505,8 +496,8 @@ static grpc_lb_policy *create_round_robin(grpc_lb_policy_factory *factory, sd->policy = p; sd->index = i; sd->subchannel = args->subchannels[i]; - grpc_closure_init(&sd->connectivity_changed_closure, rr_connectivity_changed, - sd); + grpc_closure_init(&sd->connectivity_changed_closure, + rr_connectivity_changed, sd); } /* The (dummy node) root of the ready list */ diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c index 4208c15b3fcaa5ab5f56e6d34ce34fc63e711961..d25416154675803145e36695ffd1dd9bf6b637fa 100644 --- a/src/core/client_config/lb_policy.c +++ b/src/core/client_config/lb_policy.c @@ -54,10 +54,14 @@ void grpc_lb_policy_init(grpc_lb_policy *policy, #define REF_MUTATE_PASS_ARGS(x) #endif -static gpr_atm ref_mutate(grpc_lb_policy *c, gpr_atm delta, int barrier REF_MUTATE_EXTRA_ARGS) { - gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&c->ref_pair, delta) : gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta); +static gpr_atm ref_mutate(grpc_lb_policy *c, gpr_atm delta, + int barrier REF_MUTATE_EXTRA_ARGS) { + gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&c->ref_pair, delta) + : gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta); #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG - gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "LB_POLICY: %p % 12s 0x%08x -> 0x%08x [%s]", c, purpose, old_val, old_val + delta, reason); + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, + "LB_POLICY: %p % 12s 0x%08x -> 0x%08x [%s]", c, purpose, old_val, + old_val + delta, reason); #endif return old_val; } @@ -66,22 +70,28 @@ void grpc_lb_policy_ref(grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) { ref_mutate(policy, 1 << WEAK_REF_BITS, 0 REF_MUTATE_PASS_ARGS("STRONG_REF")); } -void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) { - gpr_atm old_val = ref_mutate(policy, (gpr_atm)1-(gpr_atm)(1 << WEAK_REF_BITS), 1 REF_MUTATE_PASS_ARGS("STRONG_UNREF")); +void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx, + grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) { + gpr_atm old_val = + ref_mutate(policy, (gpr_atm)1 - (gpr_atm)(1 << WEAK_REF_BITS), + 1 REF_MUTATE_PASS_ARGS("STRONG_UNREF")); gpr_atm mask = ~(gpr_atm)((1 << WEAK_REF_BITS) - 1); gpr_atm check = 1 << WEAK_REF_BITS; if ((old_val & mask) == check) { policy->vtable->shutdown(exec_ctx, policy); } - grpc_lb_policy_weak_unref(exec_ctx, policy REF_FUNC_PASS_ARGS("strong-unref")); + grpc_lb_policy_weak_unref(exec_ctx, + policy REF_FUNC_PASS_ARGS("strong-unref")); } void grpc_lb_policy_weak_ref(grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) { ref_mutate(policy, 1, 0 REF_MUTATE_PASS_ARGS("WEAK_REF")); } -void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) { - gpr_atm old_val = ref_mutate(policy, -(gpr_atm)1, 1 REF_MUTATE_PASS_ARGS("WEAK_UNREF")); +void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, + grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) { + gpr_atm old_val = + ref_mutate(policy, -(gpr_atm)1, 1 REF_MUTATE_PASS_ARGS("WEAK_UNREF")); if (old_val == 1) { grpc_pollset_set_destroy(&policy->interested_parties); policy->vtable->destroy(exec_ctx, policy); diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h index 67546617458d1e547827b613db3e7254fc466da0..2f8d655558c980af1089bb3c9007c6bdd0a38e17 100644 --- a/src/core/client_config/lb_policy.h +++ b/src/core/client_config/lb_policy.h @@ -93,9 +93,9 @@ void grpc_lb_policy_ref(grpc_lb_policy *policy, const char *file, int line, void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, const char *file, int line, const char *reason); void grpc_lb_policy_weak_ref(grpc_lb_policy *policy, const char *file, int line, - const char *reason); + const char *reason); void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - const char *file, int line, const char *reason); + const char *file, int line, const char *reason); #else #define GRPC_LB_POLICY_REF(p, r) grpc_lb_policy_ref((p)) #define GRPC_LB_POLICY_UNREF(cl, p, r) grpc_lb_policy_unref((cl), (p)) diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index a0e51d57ec4c2d425cf3ea08d92a919e9fa32450..d89a5e9be17841190f7227d3a113f99c5176d52d 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -78,9 +78,9 @@ typedef struct external_state_watcher { struct grpc_subchannel { grpc_connector *connector; - /** refcount - - lower INTERNAL_REF_BITS bits are for internal references: - these do not keep the subchannel open. + /** refcount + - lower INTERNAL_REF_BITS bits are for internal references: + these do not keep the subchannel open. - upper remaining bits are for public references: these do keep the subchannel open */ gpr_atm ref_pair; @@ -155,7 +155,8 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel, #define UNREF_LOG(name, p) \ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p unref %d -> %d %s", \ (name), (p), (p)->refs.count, (p)->refs.count - 1, reason) -#define REF_MUTATE_EXTRA_ARGS GRPC_SUBCHANNEL_REF_EXTRA_ARGS, const char *purpose +#define REF_MUTATE_EXTRA_ARGS \ + GRPC_SUBCHANNEL_REF_EXTRA_ARGS, const char *purpose #define REF_MUTATE_PURPOSE(x) , file, line, reason, x #else #define REF_REASON "" @@ -209,21 +210,27 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg, gpr_free(c); } -static gpr_atm ref_mutate(grpc_subchannel *c, gpr_atm delta, int barrier REF_MUTATE_EXTRA_ARGS) { - gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&c->ref_pair, delta) : gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta); +static gpr_atm ref_mutate(grpc_subchannel *c, gpr_atm delta, + int barrier REF_MUTATE_EXTRA_ARGS) { + gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&c->ref_pair, delta) + : gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta); #ifdef GRPC_STREAM_REFCOUNT_DEBUG - gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SUBCHANNEL: %p % 12s 0x%08x -> 0x%08x [%s]", c, purpose, old_val, old_val + delta, reason); + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, + "SUBCHANNEL: %p % 12s 0x%08x -> 0x%08x [%s]", c, purpose, old_val, + old_val + delta, reason); #endif return old_val; } void grpc_subchannel_ref(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { gpr_atm old_refs; - old_refs = ref_mutate(c, (1 << INTERNAL_REF_BITS), 0 REF_MUTATE_PURPOSE("STRONG_REF")); + old_refs = ref_mutate(c, (1 << INTERNAL_REF_BITS), + 0 REF_MUTATE_PURPOSE("STRONG_REF")); GPR_ASSERT((old_refs & STRONG_REF_MASK) != 0); } -void grpc_subchannel_weak_ref(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +void grpc_subchannel_weak_ref(grpc_subchannel *c + GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { gpr_atm old_refs; old_refs = ref_mutate(c, 1, 0 REF_MUTATE_PURPOSE("WEAK_REF")); GPR_ASSERT(old_refs != 0); @@ -246,7 +253,8 @@ static void disconnect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx, grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { gpr_atm old_refs; - old_refs = ref_mutate(c, (gpr_atm)1 - (gpr_atm)(1 << INTERNAL_REF_BITS), 1 REF_MUTATE_PURPOSE("STRONG_UNREF")); + old_refs = ref_mutate(c, (gpr_atm)1 - (gpr_atm)(1 << INTERNAL_REF_BITS), + 1 REF_MUTATE_PURPOSE("STRONG_UNREF")); if ((old_refs & STRONG_REF_MASK) == (1 << INTERNAL_REF_BITS)) { disconnect(exec_ctx, c); } @@ -254,7 +262,8 @@ void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx, } void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx, - grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { + grpc_subchannel *c + GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { gpr_atm old_refs; old_refs = ref_mutate(c, -(gpr_atm)1, 1 REF_MUTATE_PURPOSE("WEAK_UNREF")); if (old_refs == 0) { @@ -286,7 +295,8 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, &c->initial_connect_string); c->args = grpc_channel_args_copy(args->args); c->random = random_seed(); - c->root_external_state_watcher.next = c->root_external_state_watcher.prev = &c->root_external_state_watcher; + c->root_external_state_watcher.next = c->root_external_state_watcher.prev = + &c->root_external_state_watcher; grpc_closure_init(&c->connected, subchannel_connected, c); grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE, "subchannel"); @@ -326,11 +336,13 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) { return state; } -static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg, int success) { +static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg, + int success) { external_state_watcher *w = arg; grpc_closure *follow_up = w->notify; if (w->pollset_set != NULL) { - grpc_pollset_set_del_pollset_set(exec_ctx, &w->subchannel->pollset_set, w->pollset_set); + grpc_pollset_set_del_pollset_set(exec_ctx, &w->subchannel->pollset_set, + w->pollset_set); } gpr_mu_lock(&w->subchannel->mu); w->next->prev = w->prev; @@ -341,21 +353,20 @@ static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg, i follow_up->cb(exec_ctx, follow_up->cb_arg, success); } -void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx, - grpc_subchannel *c, - grpc_pollset_set *interested_parties, - grpc_connectivity_state *state, - grpc_closure *notify) { +void grpc_subchannel_notify_on_state_change( + grpc_exec_ctx *exec_ctx, grpc_subchannel *c, + grpc_pollset_set *interested_parties, grpc_connectivity_state *state, + grpc_closure *notify) { int do_connect = 0; external_state_watcher *w; if (state == NULL) { gpr_mu_lock(&c->mu); - for (w = c->root_external_state_watcher.next; - w != &c->root_external_state_watcher; - w = w->next) { + for (w = c->root_external_state_watcher.next; + w != &c->root_external_state_watcher; w = w->next) { if (w->notify == notify) { - grpc_connectivity_state_notify_on_state_change(exec_ctx, &c->state_tracker, NULL, &w->closure); + grpc_connectivity_state_notify_on_state_change( + exec_ctx, &c->state_tracker, NULL, &w->closure); } } gpr_mu_unlock(&c->mu); @@ -366,7 +377,8 @@ void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx, w->notify = notify; grpc_closure_init(&w->closure, on_external_state_watcher_done, w); if (interested_parties != NULL) { - grpc_pollset_set_add_pollset_set(exec_ctx, &c->pollset_set, interested_parties); + grpc_pollset_set_add_pollset_set(exec_ctx, &c->pollset_set, + interested_parties); } GRPC_SUBCHANNEL_WEAK_REF(c, "external_state_watcher"); gpr_mu_lock(&c->mu); diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index b735d2ccab3a107825c4b009801a6ecf5668b79c..e45708f91d1729f426194d2bdd41f62cc2decde7 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -68,7 +68,8 @@ typedef struct grpc_subchannel_args grpc_subchannel_args; #define GRPC_SUBCHANNEL_REF(p, r) grpc_subchannel_ref((p)) #define GRPC_SUBCHANNEL_UNREF(cl, p, r) grpc_subchannel_unref((cl), (p)) #define GRPC_SUBCHANNEL_WEAK_REF(p, r) grpc_subchannel_weak_ref((p)) -#define GRPC_SUBCHANNEL_WEAK_UNREF(cl, p, r) grpc_subchannel_weak_unref((cl), (p)) +#define GRPC_SUBCHANNEL_WEAK_UNREF(cl, p, r) \ + grpc_subchannel_weak_unref((cl), (p)) #define GRPC_CONNECTED_SUBCHANNEL_REF(p, r) grpc_connected_subchannel_ref((p)) #define GRPC_CONNECTED_SUBCHANNEL_UNREF(cl, p, r) \ grpc_connected_subchannel_unref((cl), (p)) @@ -84,10 +85,10 @@ void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx, grpc_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); void grpc_subchannel_weak_ref(grpc_subchannel *channel - GRPC_SUBCHANNEL_REF_EXTRA_ARGS); + GRPC_SUBCHANNEL_REF_EXTRA_ARGS); void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx, - grpc_subchannel *channel - GRPC_SUBCHANNEL_REF_EXTRA_ARGS); + grpc_subchannel *channel + GRPC_SUBCHANNEL_REF_EXTRA_ARGS); void grpc_connected_subchannel_ref(grpc_connected_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); void grpc_connected_subchannel_unref(grpc_exec_ctx *exec_ctx, @@ -115,11 +116,10 @@ grpc_connectivity_state grpc_subchannel_check_connectivity( /** call notify when the connectivity state of a channel changes from *state. Updates *state with the new state of the channel */ -void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx, - grpc_subchannel *channel, - grpc_pollset_set *interested_parties, - grpc_connectivity_state *state, - grpc_closure *notify); +void grpc_subchannel_notify_on_state_change( + grpc_exec_ctx *exec_ctx, grpc_subchannel *channel, + grpc_pollset_set *interested_parties, grpc_connectivity_state *state, + grpc_closure *notify); void grpc_connected_subchannel_notify_on_state_change( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *channel, grpc_connectivity_state *state, grpc_closure *notify); diff --git a/src/core/iomgr/pollset_set.h b/src/core/iomgr/pollset_set.h index e93a3dbb5683370c5653f9ff3b9590ce0438307c..09c04438f71e5aa98f3aa6866fd796ed8debc6bf 100644 --- a/src/core/iomgr/pollset_set.h +++ b/src/core/iomgr/pollset_set.h @@ -58,10 +58,10 @@ void grpc_pollset_set_del_pollset(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pollset_set, grpc_pollset *pollset); void grpc_pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *bag, - grpc_pollset_set *item); + grpc_pollset_set *bag, + grpc_pollset_set *item); void grpc_pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *bag, - grpc_pollset_set *item); + grpc_pollset_set *bag, + grpc_pollset_set *item); #endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_H */ diff --git a/src/core/iomgr/pollset_set_posix.c b/src/core/iomgr/pollset_set_posix.c index 7f0b34c36b227c1f8d41e253abadd443f4c6d35b..4ec92202e30d3cc30bb3d69c21b026f58987934f 100644 --- a/src/core/iomgr/pollset_set_posix.c +++ b/src/core/iomgr/pollset_set_posix.c @@ -107,8 +107,9 @@ void grpc_pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, gpr_mu_lock(&bag->mu); if (bag->pollset_set_count == bag->pollset_set_capacity) { bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity); - bag->pollset_sets = gpr_realloc(bag->pollset_sets, - bag->pollset_set_capacity * sizeof(*bag->pollset_sets)); + bag->pollset_sets = + gpr_realloc(bag->pollset_sets, + bag->pollset_set_capacity * sizeof(*bag->pollset_sets)); } bag->pollset_sets[bag->pollset_set_count++] = item; for (i = 0, j = 0; i < bag->fd_count; i++) { @@ -131,9 +132,8 @@ void grpc_pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, for (i = 0; i < bag->pollset_set_count; i++) { if (bag->pollset_sets[i] == item) { bag->pollset_set_count--; - GPR_SWAP(grpc_pollset_set *, - bag->pollset_sets[i], - bag->pollset_sets[bag->pollset_set_count]); + GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i], + bag->pollset_sets[bag->pollset_set_count]); break; } } diff --git a/src/core/iomgr/pollset_set_windows.c b/src/core/iomgr/pollset_set_windows.c index 04d88839cb49ee4d095e4e6af90a5d76a6f8cc89..157b46ec32a6a1b05c9546e61a88895b4f9365df 100644 --- a/src/core/iomgr/pollset_set_windows.c +++ b/src/core/iomgr/pollset_set_windows.c @@ -49,12 +49,12 @@ void grpc_pollset_set_del_pollset(grpc_exec_ctx* exec_ctx, grpc_pollset_set* pollset_set, grpc_pollset* pollset) {} -void grpc_pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *bag, - grpc_pollset_set *item) {} +void grpc_pollset_set_add_pollset_set(grpc_exec_ctx* exec_ctx, + grpc_pollset_set* bag, + grpc_pollset_set* item) {} -void grpc_pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *bag, - grpc_pollset_set *item) {} +void grpc_pollset_set_del_pollset_set(grpc_exec_ctx* exec_ctx, + grpc_pollset_set* bag, + grpc_pollset_set* item) {} #endif /* GPR_WINSOCK_SOCKET */ diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index 92fd3dadd7e1b26e05418353ea4d09f96efd07bb..b4959069c666b034c3f1c7ec051dbf3d3206f0c5 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -152,7 +152,8 @@ grpc_channel *grpc_channel_create_from_filters( } grpc_channel_stack_init(exec_ctx, 1, destroy_channel, channel, filters, - num_filters, args, is_client ? "CLIENT_CHANNEL" : "SERVER_CHANNEL", + num_filters, args, + is_client ? "CLIENT_CHANNEL" : "SERVER_CHANNEL", CHANNEL_STACK_FROM_CHANNEL(channel)); return channel; diff --git a/src/core/transport/connectivity_state.c b/src/core/transport/connectivity_state.c index 569d3c30b245e8c3e4eaa1ddf1cf492aefd078d5..c409d983da736322176caaad9134b7ad5e783e89 100644 --- a/src/core/transport/connectivity_state.c +++ b/src/core/transport/connectivity_state.c @@ -99,11 +99,11 @@ int grpc_connectivity_state_notify_on_state_change( grpc_connectivity_state *current, grpc_closure *notify) { if (grpc_connectivity_state_trace) { if (current == NULL) { - gpr_log(GPR_DEBUG, "CONWATCH: %p %s: unsubscribe notify=%p", - tracker, tracker->name, notify); + gpr_log(GPR_DEBUG, "CONWATCH: %p %s: unsubscribe notify=%p", tracker, + tracker->name, notify); } else { - gpr_log(GPR_DEBUG, "CONWATCH: %p %s: from %s [cur=%s] notify=%p", - tracker, tracker->name, grpc_connectivity_state_name(*current), + gpr_log(GPR_DEBUG, "CONWATCH: %p %s: from %s [cur=%s] notify=%p", tracker, + tracker->name, grpc_connectivity_state_name(*current), grpc_connectivity_state_name(tracker->current_state), notify); } } diff --git a/src/core/transport/connectivity_state.h b/src/core/transport/connectivity_state.h index d8b5b38da0e47a385526dbfb3b3688ebd3a768f8..1f3a130e901a798308bd24276ea4adfd80f06458 100644 --- a/src/core/transport/connectivity_state.h +++ b/src/core/transport/connectivity_state.h @@ -74,7 +74,7 @@ grpc_connectivity_state grpc_connectivity_state_check( grpc_connectivity_state_tracker *tracker); /** Return 1 if the channel should start connecting, 0 otherwise. - If current==NULL cancel notify if it is already queued (success==0 in that + If current==NULL cancel notify if it is already queued (success==0 in that case) */ int grpc_connectivity_state_notify_on_state_change( grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker, diff --git a/test/core/client_config/lb_policies_test.c b/test/core/client_config/lb_policies_test.c index 5aa8140e080b71a7b6228ed4d2dd8b0f6be496a5..6f218e7f08be2405b13d9f82814496842e5be565 100644 --- a/test/core/client_config/lb_policies_test.c +++ b/test/core/client_config/lb_policies_test.c @@ -135,9 +135,8 @@ static void kill_server(const servers_fixture *f, size_t i) { gpr_log(GPR_INFO, "KILLING SERVER %d", i); GPR_ASSERT(f->servers[i] != NULL); grpc_server_shutdown_and_notify(f->servers[i], f->cq, tag(10000)); - GPR_ASSERT( - grpc_completion_queue_pluck(f->cq, tag(10000), n_millis_time(5000), NULL) - .type == GRPC_OP_COMPLETE); + GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(10000), n_millis_time(5000), + NULL).type == GRPC_OP_COMPLETE); grpc_server_destroy(f->servers[i]); f->servers[i] = NULL; } @@ -203,8 +202,8 @@ static void teardown_servers(servers_fixture *f) { if (f->servers[i] == NULL) continue; grpc_server_shutdown_and_notify(f->servers[i], f->cq, tag(10000)); GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(10000), - n_millis_time(5000), NULL) - .type == GRPC_OP_COMPLETE); + n_millis_time(5000), + NULL).type == GRPC_OP_COMPLETE); grpc_server_destroy(f->servers[i]); } grpc_completion_queue_shutdown(f->cq); @@ -304,8 +303,8 @@ int *perform_request(servers_fixture *f, grpc_channel *client, s_idx = -1; while ((ev = grpc_completion_queue_next( - f->cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1), NULL)) - .type != GRPC_QUEUE_TIMEOUT) { + f->cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1), NULL)).type != + GRPC_QUEUE_TIMEOUT) { GPR_ASSERT(ev.type == GRPC_OP_COMPLETE); read_tag = ((int)(gpr_intptr)ev.tag); gpr_log(GPR_DEBUG, "EVENT: success:%d, type:%d, tag:%d iter:%d", diff --git a/test/core/client_config/set_initial_connect_string_test.c b/test/core/client_config/set_initial_connect_string_test.c index 13517b7c1f591d6ea8a701edb5483855f0e3e9b6..ceca56c833bf444d0fd6fc62ae9d63e117101e3a 100644 --- a/test/core/client_config/set_initial_connect_string_test.c +++ b/test/core/client_config/set_initial_connect_string_test.c @@ -66,15 +66,15 @@ static grpc_closure on_read; static void handle_read(grpc_exec_ctx *exec_ctx, void *arg, int success) { GPR_ASSERT(success); - gpr_slice_buffer_move_into( - &state.temp_incoming_buffer, &state.incoming_buffer); + gpr_slice_buffer_move_into(&state.temp_incoming_buffer, + &state.incoming_buffer); if (state.incoming_buffer.length > strlen(magic_connect_string)) { state.done = 1; grpc_endpoint_shutdown(exec_ctx, state.tcp); grpc_endpoint_destroy(exec_ctx, state.tcp); } else { - grpc_endpoint_read( - exec_ctx, state.tcp, &state.temp_incoming_buffer, &on_read); + grpc_endpoint_read(exec_ctx, state.tcp, &state.temp_incoming_buffer, + &on_read); } } diff --git a/test/core/end2end/fixtures/h2_uchannel.c b/test/core/end2end/fixtures/h2_uchannel.c index 6123bf061e3928fe6fe503a402eca8c922eea15b..dd9a625ac287c7e5a3bd9c627981c6b78b6ddbab 100644 --- a/test/core/end2end/fixtures/h2_uchannel.c +++ b/test/core/end2end/fixtures/h2_uchannel.c @@ -239,7 +239,8 @@ grpc_pollset_set g_interested_parties; static void state_changed(grpc_exec_ctx *exec_ctx, void *arg, int success) { if (g_state != GRPC_CHANNEL_READY) { grpc_subchannel_notify_on_state_change( - exec_ctx, arg, &g_interested_parties, &g_state, grpc_closure_create(state_changed, arg)); + exec_ctx, arg, &g_interested_parties, &g_state, + grpc_closure_create(state_changed, arg)); } } @@ -253,7 +254,8 @@ static grpc_connected_subchannel *connect_subchannel(grpc_subchannel *c) { grpc_pollset_init(&pollset); grpc_pollset_set_init(&g_interested_parties); grpc_pollset_set_add_pollset(&exec_ctx, &g_interested_parties, &pollset); - grpc_subchannel_notify_on_state_change(&exec_ctx, c, &g_interested_parties, &g_state, + grpc_subchannel_notify_on_state_change(&exec_ctx, c, &g_interested_parties, + &g_state, grpc_closure_create(state_changed, c)); grpc_exec_ctx_flush(&exec_ctx); gpr_mu_lock(GRPC_POLLSET_MU(&pollset)); @@ -330,9 +332,9 @@ static void chttp2_tear_down_micro_fullstack(grpc_end2end_test_fixture *f) { /* All test configurations */ static grpc_end2end_test_config configs[] = { - {"chttp2/micro_fullstack", 0, - chttp2_create_fixture_micro_fullstack, chttp2_init_client_micro_fullstack, - chttp2_init_server_micro_fullstack, chttp2_tear_down_micro_fullstack}, + {"chttp2/micro_fullstack", 0, chttp2_create_fixture_micro_fullstack, + chttp2_init_client_micro_fullstack, chttp2_init_server_micro_fullstack, + chttp2_tear_down_micro_fullstack}, }; int main(int argc, char **argv) { diff --git a/test/core/end2end/tests/hpack_size.c b/test/core/end2end/tests/hpack_size.c index 0b1a9faad8799b95f9d4fe30181fa6395af849e1..f16883ecfdf6660c7534b0ca073f7160fa956f4e 100644 --- a/test/core/end2end/tests/hpack_size.c +++ b/test/core/end2end/tests/hpack_size.c @@ -262,9 +262,9 @@ static void drain_cq(grpc_completion_queue *cq) { static void shutdown_server(grpc_end2end_test_fixture *f) { if (!f->server) return; grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck( - f->cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5), NULL) - .type == GRPC_OP_COMPLETE); + GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(1000), + GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5), + NULL).type == GRPC_OP_COMPLETE); grpc_server_destroy(f->server); f->server = NULL; }