diff --git a/include/grpc/support/useful.h b/include/grpc/support/useful.h index 003e096cf9a43f96ae1838fb1a44a64576a63da9..9d8314e4bef472d5e482fd40d36463a854733015 100644 --- a/include/grpc/support/useful.h +++ b/include/grpc/support/useful.h @@ -74,4 +74,7 @@ #define GPR_ICMP(a, b) ((a) < (b) ? -1 : ((a) > (b) ? 1 : 0)) +#define GPR_HASH_POINTER(x, range) \ + ((((size_t)x) >> 4) ^ (((size_t)x) >> 9) ^ (((size_t)x) >> 14)) % (range) + #endif /* GRPC_SUPPORT_USEFUL_H */ diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c index 6ff6944d959f24353520a6ae728a2e2877838838..0508b92aa2cacc31dc753013f9e52697641241d8 100644 --- a/src/core/ext/client_channel/client_channel.c +++ b/src/core/ext/client_channel/client_channel.c @@ -51,6 +51,7 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/connected_channel.h" #include "src/core/lib/channel/deadline_filter.h" +#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/profiling/timers.h" @@ -160,13 +161,10 @@ typedef struct client_channel_channel_data { /** client channel factory */ grpc_client_channel_factory *client_channel_factory; - /** mutex protecting all variables below in this data structure */ - gpr_mu mu; + /** combiner protecting all variables below in this data structure */ + grpc_combiner *combiner; /** currently active load balancer */ - char *lb_policy_name; grpc_lb_policy *lb_policy; - /** service config in JSON form */ - char *service_config_json; /** maps method names to method_parameters structs */ grpc_slice_hash_table *method_params_table; /** incoming resolver result - set by resolver.next() */ @@ -183,6 +181,13 @@ typedef struct client_channel_channel_data { grpc_channel_stack *owning_stack; /** interested parties (owned) */ grpc_pollset_set *interested_parties; + + /* the following properties are guarded by a mutex since API's require them + to be instantaniously available */ + gpr_mu info_mu; + char *info_lb_policy_name; + /** service config in JSON form */ + char *info_service_config_json; } channel_data; /** We create one watcher for each new lb_policy that is returned from a @@ -218,32 +223,23 @@ static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx, } static void on_lb_policy_state_changed_locked(grpc_exec_ctx *exec_ctx, - lb_policy_connectivity_watcher *w, - grpc_error *error) { + void *arg, grpc_error *error) { + lb_policy_connectivity_watcher *w = arg; grpc_connectivity_state publish_state = w->state; - /* check if the notification is for a stale policy */ - if (w->lb_policy != w->chand->lb_policy) return; - - if (publish_state == GRPC_CHANNEL_SHUTDOWN && w->chand->resolver != NULL) { - publish_state = GRPC_CHANNEL_TRANSIENT_FAILURE; - grpc_resolver_channel_saw_error(exec_ctx, w->chand->resolver); - GRPC_LB_POLICY_UNREF(exec_ctx, w->chand->lb_policy, "channel"); - w->chand->lb_policy = NULL; - } - set_channel_connectivity_state_locked(exec_ctx, w->chand, publish_state, - GRPC_ERROR_REF(error), "lb_changed"); - if (w->state != GRPC_CHANNEL_SHUTDOWN) { - watch_lb_policy(exec_ctx, w->chand, w->lb_policy, w->state); + /* check if the notification is for the latest policy */ + if (w->lb_policy == w->chand->lb_policy) { + if (publish_state == GRPC_CHANNEL_SHUTDOWN && w->chand->resolver != NULL) { + publish_state = GRPC_CHANNEL_TRANSIENT_FAILURE; + grpc_resolver_channel_saw_error(exec_ctx, w->chand->resolver); + GRPC_LB_POLICY_UNREF(exec_ctx, w->chand->lb_policy, "channel"); + w->chand->lb_policy = NULL; + } + set_channel_connectivity_state_locked(exec_ctx, w->chand, publish_state, + GRPC_ERROR_REF(error), "lb_changed"); + if (w->state != GRPC_CHANNEL_SHUTDOWN) { + watch_lb_policy(exec_ctx, w->chand, w->lb_policy, w->state); + } } -} - -static void on_lb_policy_state_changed(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - lb_policy_connectivity_watcher *w = arg; - - gpr_mu_lock(&w->chand->mu); - on_lb_policy_state_changed_locked(exec_ctx, w, error); - gpr_mu_unlock(&w->chand->mu); GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "watch_lb_policy"); gpr_free(w); @@ -256,16 +252,16 @@ static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand, GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy"); w->chand = chand; - grpc_closure_init(&w->on_changed, on_lb_policy_state_changed, w, - grpc_schedule_on_exec_ctx); + grpc_closure_init(&w->on_changed, on_lb_policy_state_changed_locked, w, + grpc_combiner_scheduler(chand->combiner, false)); w->state = current_state; w->lb_policy = lb_policy; grpc_lb_policy_notify_on_state_change(exec_ctx, lb_policy, &w->state, &w->on_changed); } -static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, + void *arg, grpc_error *error) { channel_data *chand = arg; char *lb_policy_name = NULL; grpc_lb_policy *lb_policy = NULL; @@ -353,17 +349,18 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, chand->interested_parties); } - gpr_mu_lock(&chand->mu); + gpr_mu_lock(&chand->info_mu); if (lb_policy_name != NULL) { - gpr_free(chand->lb_policy_name); - chand->lb_policy_name = lb_policy_name; + gpr_free(chand->info_lb_policy_name); + chand->info_lb_policy_name = lb_policy_name; } old_lb_policy = chand->lb_policy; chand->lb_policy = lb_policy; if (service_config_json != NULL) { - gpr_free(chand->service_config_json); - chand->service_config_json = service_config_json; + gpr_free(chand->info_service_config_json); + chand->info_service_config_json = service_config_json; } + gpr_mu_unlock(&chand->info_mu); if (chand->method_params_table != NULL) { grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table); } @@ -391,7 +388,6 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result, &chand->on_resolver_result_changed); - gpr_mu_unlock(&chand->mu); } else { if (chand->resolver != NULL) { grpc_resolver_shutdown(exec_ctx, chand->resolver); @@ -404,7 +400,6 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, GRPC_ERROR_CREATE_REFERENCING("Got config after disconnection", refs, GPR_ARRAY_SIZE(refs)), "resolver_gone"); - gpr_mu_unlock(&chand->mu); } if (exit_idle) { @@ -426,20 +421,12 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, GRPC_ERROR_UNREF(state_error); } -static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, - grpc_transport_op *op) { +static void start_transport_op_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error_ignored) { + grpc_transport_op *op = arg; + grpc_channel_element *elem = op->transport_private.args[0]; channel_data *chand = elem->channel_data; - grpc_closure_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE); - - GPR_ASSERT(op->set_accept_stream == false); - if (op->bind_pollset != NULL) { - grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, - op->bind_pollset); - } - - gpr_mu_lock(&chand->mu); if (op->on_connectivity_state_change != NULL) { grpc_connectivity_state_notify_on_state_change( exec_ctx, &chand->state_tracker, op->connectivity_state, @@ -482,25 +469,48 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, } GRPC_ERROR_UNREF(op->disconnect_with_error); } - gpr_mu_unlock(&chand->mu); + GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "start_transport_op"); + + grpc_closure_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE); +} + +static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + grpc_transport_op *op) { + channel_data *chand = elem->channel_data; + + GPR_ASSERT(op->set_accept_stream == false); + if (op->bind_pollset != NULL) { + grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, + op->bind_pollset); + } + + op->transport_private.args[0] = elem; + GRPC_CHANNEL_STACK_REF(chand->owning_stack, "start_transport_op"); + grpc_closure_sched( + exec_ctx, grpc_closure_init( + &op->transport_private.closure, start_transport_op_locked, + op, grpc_combiner_scheduler(chand->combiner, false)), + GRPC_ERROR_NONE); } static void cc_get_channel_info(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, const grpc_channel_info *info) { channel_data *chand = elem->channel_data; - gpr_mu_lock(&chand->mu); + gpr_mu_lock(&chand->info_mu); if (info->lb_policy_name != NULL) { - *info->lb_policy_name = chand->lb_policy_name == NULL + *info->lb_policy_name = chand->info_lb_policy_name == NULL ? NULL - : gpr_strdup(chand->lb_policy_name); + : gpr_strdup(chand->info_lb_policy_name); } if (info->service_config_json != NULL) { - *info->service_config_json = chand->service_config_json == NULL - ? NULL - : gpr_strdup(chand->service_config_json); + *info->service_config_json = + chand->info_service_config_json == NULL + ? NULL + : gpr_strdup(chand->info_service_config_json); } - gpr_mu_unlock(&chand->mu); + gpr_mu_unlock(&chand->info_mu); } /* Constructor for channel_data */ @@ -511,11 +521,12 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx, GPR_ASSERT(args->is_last); GPR_ASSERT(elem->filter == &grpc_client_channel_filter); // Initialize data members. - gpr_mu_init(&chand->mu); + chand->combiner = grpc_combiner_create(NULL); + gpr_mu_init(&chand->info_mu); chand->owning_stack = args->channel_stack; grpc_closure_init(&chand->on_resolver_result_changed, - on_resolver_result_changed, chand, - grpc_schedule_on_exec_ctx); + on_resolver_result_changed_locked, chand, + grpc_combiner_scheduler(chand->combiner, false)); chand->interested_parties = grpc_pollset_set_create(); grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, "client_channel"); @@ -564,14 +575,15 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, chand->interested_parties); GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); } - gpr_free(chand->lb_policy_name); - gpr_free(chand->service_config_json); + gpr_free(chand->info_lb_policy_name); + gpr_free(chand->info_service_config_json); if (chand->method_params_table != NULL) { grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table); } grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker); - grpc_pollset_set_destroy(chand->interested_parties); - gpr_mu_destroy(&chand->mu); + grpc_pollset_set_destroy(exec_ctx, chand->interested_parties); + GRPC_COMBINER_UNREF(exec_ctx, chand->combiner, "client_channel"); + gpr_mu_destroy(&chand->info_mu); } /************************************************************************* @@ -614,8 +626,6 @@ typedef struct client_channel_call_data { grpc_subchannel_call */ gpr_atm subchannel_call; - gpr_mu mu; - subchannel_creation_phase creation_phase; grpc_connected_subchannel *connected_subchannel; grpc_polling_entity *pollent; @@ -660,52 +670,32 @@ static void fail_locked(grpc_exec_ctx *exec_ctx, call_data *calld, GRPC_ERROR_UNREF(error); } -typedef struct { - grpc_transport_stream_op **ops; - size_t nops; - grpc_subchannel_call *call; -} retry_ops_args; - -static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) { - retry_ops_args *a = args; - size_t i; - for (i = 0; i < a->nops; i++) { - grpc_subchannel_call_process_op(exec_ctx, a->call, a->ops[i]); - } - GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, a->call, "retry_ops"); - gpr_free(a->ops); - gpr_free(a); -} - static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) { if (calld->waiting_ops_count == 0) { return; } - retry_ops_args *a = gpr_malloc(sizeof(*a)); - a->ops = calld->waiting_ops; - a->nops = calld->waiting_ops_count; - a->call = GET_CALL(calld); - if (a->call == CANCELLED_CALL) { - gpr_free(a); + grpc_subchannel_call *call = GET_CALL(calld); + grpc_transport_stream_op **ops = calld->waiting_ops; + size_t nops = calld->waiting_ops_count; + if (call == CANCELLED_CALL) { fail_locked(exec_ctx, calld, GRPC_ERROR_CANCELLED); return; } calld->waiting_ops = NULL; calld->waiting_ops_count = 0; calld->waiting_ops_capacity = 0; - GRPC_SUBCHANNEL_CALL_REF(a->call, "retry_ops"); - grpc_closure_sched( - exec_ctx, grpc_closure_create(retry_ops, a, grpc_schedule_on_exec_ctx), - GRPC_ERROR_NONE); + for (size_t i = 0; i < nops; i++) { + grpc_subchannel_call_process_op(exec_ctx, call, ops[i]); + } + gpr_free(ops); } -static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { grpc_call_element *elem = arg; call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; - gpr_mu_lock(&calld->mu); GPR_ASSERT(calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL); grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent, @@ -741,7 +731,6 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, (gpr_atm)(uintptr_t)subchannel_call); retry_waiting_locked(exec_ctx, calld); } - gpr_mu_unlock(&calld->mu); GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel"); } @@ -767,37 +756,35 @@ typedef struct { /** Return true if subchannel is available immediately (in which case on_ready should not be called), or false otherwise (in which case on_ready should be called when the subchannel is available). */ -static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_metadata_batch *initial_metadata, - uint32_t initial_metadata_flags, - grpc_connected_subchannel **connected_subchannel, - grpc_closure *on_ready, grpc_error *error); - -static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static bool pick_subchannel_locked( + grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags, + grpc_connected_subchannel **connected_subchannel, grpc_closure *on_ready, + grpc_error *error); + +static void continue_picking_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { continue_picking_args *cpa = arg; if (cpa->connected_subchannel == NULL) { /* cancelled, do nothing */ } else if (error != GRPC_ERROR_NONE) { grpc_closure_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error)); } else { - call_data *calld = cpa->elem->call_data; - gpr_mu_lock(&calld->mu); - if (pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata, - cpa->initial_metadata_flags, cpa->connected_subchannel, - cpa->on_ready, GRPC_ERROR_NONE)) { + if (pick_subchannel_locked(exec_ctx, cpa->elem, cpa->initial_metadata, + cpa->initial_metadata_flags, + cpa->connected_subchannel, cpa->on_ready, + GRPC_ERROR_NONE)) { grpc_closure_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE); } - gpr_mu_unlock(&calld->mu); } gpr_free(cpa); } -static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_metadata_batch *initial_metadata, - uint32_t initial_metadata_flags, - grpc_connected_subchannel **connected_subchannel, - grpc_closure *on_ready, grpc_error *error) { +static bool pick_subchannel_locked( + grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags, + grpc_connected_subchannel **connected_subchannel, grpc_closure *on_ready, + grpc_error *error) { GPR_TIMER_BEGIN("pick_subchannel", 0); channel_data *chand = elem->channel_data; @@ -807,7 +794,6 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, GPR_ASSERT(connected_subchannel); - gpr_mu_lock(&chand->mu); if (initial_metadata == NULL) { if (chand->lb_policy != NULL) { grpc_lb_policy_cancel_pick(exec_ctx, chand->lb_policy, @@ -823,7 +809,6 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, GRPC_ERROR_CREATE_REFERENCING("Pick cancelled", &error, 1)); } } - gpr_mu_unlock(&chand->mu); GPR_TIMER_END("pick_subchannel", 0); GRPC_ERROR_UNREF(error); return true; @@ -832,7 +817,6 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, if (chand->lb_policy != NULL) { grpc_lb_policy *lb_policy = chand->lb_policy; GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel"); - gpr_mu_unlock(&chand->mu); // If the application explicitly set wait_for_ready, use that. // Otherwise, if the service config specified a value for this // method, use that. @@ -871,88 +855,66 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, cpa->connected_subchannel = connected_subchannel; cpa->on_ready = on_ready; cpa->elem = elem; - grpc_closure_init(&cpa->closure, continue_picking, cpa, - grpc_schedule_on_exec_ctx); + grpc_closure_init(&cpa->closure, continue_picking_locked, cpa, + grpc_combiner_scheduler(chand->combiner, true)); grpc_closure_list_append(&chand->waiting_for_config_closures, &cpa->closure, GRPC_ERROR_NONE); } else { grpc_closure_sched(exec_ctx, on_ready, GRPC_ERROR_CREATE("Disconnected")); } - gpr_mu_unlock(&chand->mu); GPR_TIMER_END("pick_subchannel", 0); return false; } -// The logic here is fairly complicated, due to (a) the fact that we -// need to handle the case where we receive the send op before the -// initial metadata op, and (b) the need for efficiency, especially in -// the streaming case. -// TODO(ctiller): Explain this more thoroughly. -static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op *op) { - call_data *calld = elem->call_data; +static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx, + grpc_transport_stream_op *op, + grpc_call_element *elem) { channel_data *chand = elem->channel_data; - GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - grpc_deadline_state_client_start_transport_stream_op(exec_ctx, elem, op); - /* try to (atomically) get the call */ - grpc_subchannel_call *call = GET_CALL(calld); - GPR_TIMER_BEGIN("cc_start_transport_stream_op", 0); - if (call == CANCELLED_CALL) { - grpc_transport_stream_op_finish_with_failure( - exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error)); - GPR_TIMER_END("cc_start_transport_stream_op", 0); - return; - } - if (call != NULL) { - grpc_subchannel_call_process_op(exec_ctx, call, op); - GPR_TIMER_END("cc_start_transport_stream_op", 0); - return; - } - /* we failed; lock and figure out what to do */ - gpr_mu_lock(&calld->mu); -retry: + call_data *calld = elem->call_data; + grpc_subchannel_call *call; + /* need to recheck that another thread hasn't set the call */ call = GET_CALL(calld); if (call == CANCELLED_CALL) { - gpr_mu_unlock(&calld->mu); grpc_transport_stream_op_finish_with_failure( exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error)); - GPR_TIMER_END("cc_start_transport_stream_op", 0); + /* early out */ return; } if (call != NULL) { - gpr_mu_unlock(&calld->mu); grpc_subchannel_call_process_op(exec_ctx, call, op); - GPR_TIMER_END("cc_start_transport_stream_op", 0); + /* early out */ return; } /* if this is a cancellation, then we can raise our cancelled flag */ if (op->cancel_error != GRPC_ERROR_NONE) { if (!gpr_atm_rel_cas(&calld->subchannel_call, 0, (gpr_atm)(uintptr_t)CANCELLED_CALL)) { - goto retry; + /* recurse to retry */ + start_transport_stream_op_locked_inner(exec_ctx, op, elem); + /* early out */ + return; } else { - // Stash a copy of cancel_error in our call data, so that we can use - // it for subsequent operations. This ensures that if the call is - // cancelled before any ops are passed down (e.g., if the deadline - // is in the past when the call starts), we can return the right - // error to the caller when the first op does get passed down. + /* Stash a copy of cancel_error in our call data, so that we can use + it for subsequent operations. This ensures that if the call is + cancelled before any ops are passed down (e.g., if the deadline + is in the past when the call starts), we can return the right + error to the caller when the first op does get passed down. */ calld->cancel_error = GRPC_ERROR_REF(op->cancel_error); switch (calld->creation_phase) { case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING: fail_locked(exec_ctx, calld, GRPC_ERROR_REF(op->cancel_error)); break; case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL: - pick_subchannel(exec_ctx, elem, NULL, 0, &calld->connected_subchannel, - NULL, GRPC_ERROR_REF(op->cancel_error)); + pick_subchannel_locked(exec_ctx, elem, NULL, 0, + &calld->connected_subchannel, NULL, + GRPC_ERROR_REF(op->cancel_error)); break; } - gpr_mu_unlock(&calld->mu); grpc_transport_stream_op_finish_with_failure( exec_ctx, op, GRPC_ERROR_REF(op->cancel_error)); - GPR_TIMER_END("cc_start_transport_stream_op", 0); + /* early out */ return; } } @@ -961,16 +923,16 @@ retry: calld->connected_subchannel == NULL && op->send_initial_metadata != NULL) { calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL; - grpc_closure_init(&calld->next_step, subchannel_ready, elem, - grpc_schedule_on_exec_ctx); + grpc_closure_init(&calld->next_step, subchannel_ready_locked, elem, + grpc_combiner_scheduler(chand->combiner, true)); GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel"); /* If a subchannel is not available immediately, the polling entity from call_data should be provided to channel_data's interested_parties, so that IO of the lb_policy and resolver could be done under it. */ - if (pick_subchannel(exec_ctx, elem, op->send_initial_metadata, - op->send_initial_metadata_flags, - &calld->connected_subchannel, &calld->next_step, - GRPC_ERROR_NONE)) { + if (pick_subchannel_locked(exec_ctx, elem, op->send_initial_metadata, + op->send_initial_metadata_flags, + &calld->connected_subchannel, &calld->next_step, + GRPC_ERROR_NONE)) { calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel"); } else { @@ -993,31 +955,89 @@ retry: gpr_atm_rel_store(&calld->subchannel_call, (gpr_atm)(uintptr_t)subchannel_call); retry_waiting_locked(exec_ctx, calld); - goto retry; + /* recurse to retry */ + start_transport_stream_op_locked_inner(exec_ctx, op, elem); + /* early out */ + return; } /* nothing to be done but wait */ add_waiting_locked(calld, op); - gpr_mu_unlock(&calld->mu); +} + +static void cc_start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, + void *arg, + grpc_error *error_ignored) { + GPR_TIMER_BEGIN("cc_start_transport_stream_op_locked", 0); + + grpc_transport_stream_op *op = arg; + grpc_call_element *elem = op->handler_private.args[0]; + call_data *calld = elem->call_data; + + start_transport_stream_op_locked_inner(exec_ctx, op, elem); + + GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, + "start_transport_stream_op"); + GPR_TIMER_END("cc_start_transport_stream_op_locked", 0); +} + +/* The logic here is fairly complicated, due to (a) the fact that we + need to handle the case where we receive the send op before the + initial metadata op, and (b) the need for efficiency, especially in + the streaming case. + + We use double-checked locking to initially see if initialization has been + performed. If it has not, we acquire the combiner and perform initialization. + If it has, we proceed on the fast path. */ +static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op *op) { + call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; + GRPC_CALL_LOG_OP(GPR_INFO, elem, op); + grpc_deadline_state_client_start_transport_stream_op(exec_ctx, elem, op); + /* try to (atomically) get the call */ + grpc_subchannel_call *call = GET_CALL(calld); + GPR_TIMER_BEGIN("cc_start_transport_stream_op", 0); + if (call == CANCELLED_CALL) { + grpc_transport_stream_op_finish_with_failure( + exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error)); + GPR_TIMER_END("cc_start_transport_stream_op", 0); + /* early out */ + return; + } + if (call != NULL) { + grpc_subchannel_call_process_op(exec_ctx, call, op); + GPR_TIMER_END("cc_start_transport_stream_op", 0); + /* early out */ + return; + } + /* we failed; lock and figure out what to do */ + GRPC_CALL_STACK_REF(calld->owning_call, "start_transport_stream_op"); + op->handler_private.args[0] = elem; + grpc_closure_sched( + exec_ctx, + grpc_closure_init(&op->handler_private.closure, + cc_start_transport_stream_op_locked, op, + grpc_combiner_scheduler(chand->combiner, false)), + GRPC_ERROR_NONE); GPR_TIMER_END("cc_start_transport_stream_op", 0); } // Gets data from the service config. Invoked when the resolver returns // its initial result. -static void read_service_config(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void read_service_config_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { grpc_call_element *elem = arg; channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; // If this is an error, there's no point in looking at the service config. if (error == GRPC_ERROR_NONE) { // Get the method config table from channel data. - gpr_mu_lock(&chand->mu); grpc_slice_hash_table *method_params_table = NULL; if (chand->method_params_table != NULL) { method_params_table = grpc_slice_hash_table_ref(chand->method_params_table); } - gpr_mu_unlock(&chand->mu); // If the method config table was present, use it. if (method_params_table != NULL) { const method_parameters *method_params = grpc_method_config_table_get( @@ -1027,7 +1047,6 @@ static void read_service_config(grpc_exec_ctx *exec_ctx, void *arg, gpr_time_cmp(method_params->timeout, gpr_time_0(GPR_TIMESPAN)) != 0; if (have_method_timeout || method_params->wait_for_ready != WAIT_FOR_READY_UNSET) { - gpr_mu_lock(&calld->mu); if (have_method_timeout) { const gpr_timespec per_method_deadline = gpr_time_add(calld->call_start_time, method_params->timeout); @@ -1041,7 +1060,6 @@ static void read_service_config(grpc_exec_ctx *exec_ctx, void *arg, calld->wait_for_ready_from_service_config = method_params->wait_for_ready; } - gpr_mu_unlock(&calld->mu); } } grpc_slice_hash_table_unref(exec_ctx, method_params_table); @@ -1050,43 +1068,25 @@ static void read_service_config(grpc_exec_ctx *exec_ctx, void *arg, GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "read_service_config"); } -/* Constructor for call_data */ -static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_call_element_args *args) { +static void initial_read_service_config_locked(grpc_exec_ctx *exec_ctx, + void *arg, + grpc_error *error_ignored) { + grpc_call_element *elem = arg; channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; - // Initialize data members. - grpc_deadline_state_init(exec_ctx, elem, args->call_stack); - calld->path = grpc_slice_ref_internal(args->path); - calld->call_start_time = args->start_time; - calld->deadline = gpr_convert_clock_type(args->deadline, GPR_CLOCK_MONOTONIC); - calld->wait_for_ready_from_service_config = WAIT_FOR_READY_UNSET; - calld->cancel_error = GRPC_ERROR_NONE; - gpr_atm_rel_store(&calld->subchannel_call, 0); - gpr_mu_init(&calld->mu); - calld->connected_subchannel = NULL; - calld->waiting_ops = NULL; - calld->waiting_ops_count = 0; - calld->waiting_ops_capacity = 0; - calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; - calld->owning_call = args->call_stack; - calld->pollent = NULL; // If the resolver has already returned results, then we can access // the service config parameters immediately. Otherwise, we need to // defer that work until the resolver returns an initial result. // TODO(roth): This code is almost but not quite identical to the code // in read_service_config() above. It would be nice to find a way to // combine them, to avoid having to maintain it twice. - gpr_mu_lock(&chand->mu); if (chand->lb_policy != NULL) { // We already have a resolver result, so check for service config. if (chand->method_params_table != NULL) { grpc_slice_hash_table *method_params_table = grpc_slice_hash_table_ref(chand->method_params_table); - gpr_mu_unlock(&chand->mu); method_parameters *method_params = grpc_method_config_table_get( - exec_ctx, method_params_table, args->path); + exec_ctx, method_params_table, calld->path); if (method_params != NULL) { if (gpr_time_cmp(method_params->timeout, gpr_time_0(GPR_CLOCK_MONOTONIC)) != 0) { @@ -1100,24 +1100,53 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx, } } grpc_slice_hash_table_unref(exec_ctx, method_params_table); - } else { - gpr_mu_unlock(&chand->mu); } } else { // We don't yet have a resolver result, so register a callback to // get the service config data once the resolver returns. // Take a reference to the call stack to be owned by the callback. GRPC_CALL_STACK_REF(calld->owning_call, "read_service_config"); - grpc_closure_init(&calld->read_service_config, read_service_config, elem, - grpc_schedule_on_exec_ctx); + grpc_closure_init(&calld->read_service_config, read_service_config_locked, + elem, grpc_combiner_scheduler(chand->combiner, false)); grpc_closure_list_append(&chand->waiting_for_config_closures, &calld->read_service_config, GRPC_ERROR_NONE); - gpr_mu_unlock(&chand->mu); } // Start the deadline timer with the current deadline value. If we // do not yet have service config data, then the timer may be reset // later. grpc_deadline_state_start(exec_ctx, elem, calld->deadline); + GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, + "initial_read_service_config"); +} + +/* Constructor for call_data */ +static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_call_element_args *args) { + channel_data *chand = elem->channel_data; + call_data *calld = elem->call_data; + // Initialize data members. + grpc_deadline_state_init(exec_ctx, elem, args->call_stack); + calld->path = grpc_slice_ref_internal(args->path); + calld->call_start_time = args->start_time; + calld->deadline = gpr_convert_clock_type(args->deadline, GPR_CLOCK_MONOTONIC); + calld->wait_for_ready_from_service_config = WAIT_FOR_READY_UNSET; + calld->cancel_error = GRPC_ERROR_NONE; + gpr_atm_rel_store(&calld->subchannel_call, 0); + calld->connected_subchannel = NULL; + calld->waiting_ops = NULL; + calld->waiting_ops_count = 0; + calld->waiting_ops_capacity = 0; + calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; + calld->owning_call = args->call_stack; + calld->pollent = NULL; + GRPC_CALL_STACK_REF(calld->owning_call, "initial_read_service_config"); + grpc_closure_sched( + exec_ctx, + grpc_closure_init(&calld->read_service_config, + initial_read_service_config_locked, elem, + grpc_combiner_scheduler(chand->combiner, false)), + GRPC_ERROR_NONE); return GRPC_ERROR_NONE; } @@ -1135,7 +1164,6 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "client_channel_destroy_call"); } GPR_ASSERT(calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING); - gpr_mu_destroy(&calld->mu); GPR_ASSERT(calld->waiting_ops_count == 0); if (calld->connected_subchannel != NULL) { GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, calld->connected_subchannel, @@ -1171,26 +1199,36 @@ const grpc_channel_filter grpc_client_channel_filter = { "client-channel", }; +static void try_to_connect_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error_ignored) { + channel_data *chand = arg; + if (chand->lb_policy != NULL) { + grpc_lb_policy_exit_idle(exec_ctx, chand->lb_policy); + } else { + chand->exit_idle_when_lb_policy_arrives = true; + if (!chand->started_resolving && chand->resolver != NULL) { + GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); + chand->started_resolving = true; + grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result, + &chand->on_resolver_result_changed); + } + } + GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "try_to_connect"); +} + grpc_connectivity_state grpc_client_channel_check_connectivity_state( grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect) { channel_data *chand = elem->channel_data; - grpc_connectivity_state out; - gpr_mu_lock(&chand->mu); - out = grpc_connectivity_state_check(&chand->state_tracker, NULL); + grpc_connectivity_state out = + grpc_connectivity_state_check(&chand->state_tracker); if (out == GRPC_CHANNEL_IDLE && try_to_connect) { - if (chand->lb_policy != NULL) { - grpc_lb_policy_exit_idle(exec_ctx, chand->lb_policy); - } else { - chand->exit_idle_when_lb_policy_arrives = true; - if (!chand->started_resolving && chand->resolver != NULL) { - GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); - chand->started_resolving = true; - grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result, - &chand->on_resolver_result_changed); - } - } + GRPC_CHANNEL_STACK_REF(chand->owning_stack, "try_to_connect"); + grpc_closure_sched( + exec_ctx, + grpc_closure_create(try_to_connect_locked, chand, + grpc_combiner_scheduler(chand->combiner, false)), + GRPC_ERROR_NONE); } - gpr_mu_unlock(&chand->mu); return out; } @@ -1198,6 +1236,7 @@ typedef struct { channel_data *chand; grpc_pollset *pollset; grpc_closure *on_complete; + grpc_connectivity_state *state; grpc_closure my_closure; } external_connectivity_watcher; @@ -1210,7 +1249,16 @@ static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg, GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "external_connectivity_watcher"); gpr_free(w); - follow_up->cb(exec_ctx, follow_up->cb_arg, error); + grpc_closure_run(exec_ctx, follow_up, GRPC_ERROR_REF(error)); +} + +static void watch_connectivity_state_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error_ignored) { + external_connectivity_watcher *w = arg; + grpc_closure_init(&w->my_closure, on_external_watch_complete, w, + grpc_schedule_on_exec_ctx); + grpc_connectivity_state_notify_on_state_change( + exec_ctx, &w->chand->state_tracker, w->state, &w->my_closure); } void grpc_client_channel_watch_connectivity_state( @@ -1221,13 +1269,13 @@ void grpc_client_channel_watch_connectivity_state( w->chand = chand; w->pollset = pollset; w->on_complete = on_complete; + w->state = state; grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, pollset); - grpc_closure_init(&w->my_closure, on_external_watch_complete, w, - grpc_schedule_on_exec_ctx); GRPC_CHANNEL_STACK_REF(w->chand->owning_stack, "external_connectivity_watcher"); - gpr_mu_lock(&chand->mu); - grpc_connectivity_state_notify_on_state_change( - exec_ctx, &chand->state_tracker, state, &w->my_closure); - gpr_mu_unlock(&chand->mu); + grpc_closure_sched( + exec_ctx, + grpc_closure_init(&w->my_closure, watch_connectivity_state_locked, w, + grpc_combiner_scheduler(chand->combiner, true)), + GRPC_ERROR_NONE); } diff --git a/src/core/ext/client_channel/lb_policy.c b/src/core/ext/client_channel/lb_policy.c index 45ee72e2f0c937cc21bc51b24c47576759535def..90401b586f2ab4852986230ad3144fb0bed299fc 100644 --- a/src/core/ext/client_channel/lb_policy.c +++ b/src/core/ext/client_channel/lb_policy.c @@ -94,7 +94,7 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, gpr_atm old_val = ref_mutate(policy, -(gpr_atm)1, 1 REF_MUTATE_PASS_ARGS("WEAK_UNREF")); if (old_val == 1) { - grpc_pollset_set_destroy(policy->interested_parties); + grpc_pollset_set_destroy(exec_ctx, policy->interested_parties); policy->vtable->destroy(exec_ctx, policy); } } diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c index 60c5f3bc5599537469074494a749547fd1df2a50..6ae7e22553dc83e13aaf42d6e2e4bb0d9d3359ac 100644 --- a/src/core/ext/client_channel/subchannel.c +++ b/src/core/ext/client_channel/subchannel.c @@ -217,7 +217,7 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg, grpc_slice_unref_internal(exec_ctx, c->initial_connect_string); grpc_connectivity_state_destroy(exec_ctx, &c->state_tracker); grpc_connector_unref(exec_ctx, c->connector); - grpc_pollset_set_destroy(c->pollset_set); + grpc_pollset_set_destroy(exec_ctx, c->pollset_set); grpc_subchannel_key_destroy(exec_ctx, c->key); gpr_free(c); } @@ -418,7 +418,7 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c, grpc_error **error) { grpc_connectivity_state state; gpr_mu_lock(&c->mu); - state = grpc_connectivity_state_check(&c->state_tracker, error); + state = grpc_connectivity_state_get(&c->state_tracker, error); gpr_mu_unlock(&c->mu); return state; } diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index 4b3c42542857b71cbb41aebe1438409a1e7a98a0..d60aab8d6a758f88f9ab1fb308a95a828f4b70d2 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -488,9 +488,8 @@ static grpc_lb_addresses *process_serverlist_locked( static bool update_lb_connectivity_status_locked( grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, grpc_connectivity_state new_rr_state, grpc_error *new_rr_state_error) { - grpc_error *curr_state_error; - const grpc_connectivity_state curr_glb_state = grpc_connectivity_state_check( - &glb_policy->state_tracker, &curr_state_error); + const grpc_connectivity_state curr_glb_state = + grpc_connectivity_state_check(&glb_policy->state_tracker); /* The new connectivity status is a function of the previous one and the new * input coming from the status of the RR policy. @@ -1091,8 +1090,8 @@ static grpc_connectivity_state glb_check_connectivity( glb_lb_policy *glb_policy = (glb_lb_policy *)pol; grpc_connectivity_state st; gpr_mu_lock(&glb_policy->mu); - st = grpc_connectivity_state_check(&glb_policy->state_tracker, - connectivity_error); + st = grpc_connectivity_state_get(&glb_policy->state_tracker, + connectivity_error); gpr_mu_unlock(&glb_policy->mu); return st; } diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c index 36c612ef089d71f2ab2f298f8bc9b17792e3c242..0ffb53f536091b261d85b8fadc909c8eb1d7589e 100644 --- a/src/core/ext/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/lb_policy/pick_first/pick_first.c @@ -398,7 +398,7 @@ static grpc_connectivity_state pf_check_connectivity(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p = (pick_first_lb_policy *)pol; grpc_connectivity_state st; gpr_mu_lock(&p->mu); - st = grpc_connectivity_state_check(&p->state_tracker, error); + st = grpc_connectivity_state_get(&p->state_tracker, error); gpr_mu_unlock(&p->mu); return st; } diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index 96fe236360cf1291b2652082f0e76714a5013446..1e78a9c7c8757a6bf1b1a8b12f63c63efc092618 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -654,7 +654,7 @@ static grpc_connectivity_state rr_check_connectivity(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p = (round_robin_lb_policy *)pol; grpc_connectivity_state st; gpr_mu_lock(&p->mu); - st = grpc_connectivity_state_check(&p->state_tracker, error); + st = grpc_connectivity_state_get(&p->state_tracker, error); gpr_mu_unlock(&p->mu); return st; } diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c index 264b2fc347a7da8a8f516e659c06a9d18b76a727..143cb261d25670fe14f2860d5d0d56e77fe1d4c7 100644 --- a/src/core/ext/resolver/dns/native/dns_resolver.c +++ b/src/core/ext/resolver/dns/native/dns_resolver.c @@ -244,7 +244,7 @@ static void dns_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) { if (r->resolved_result != NULL) { grpc_channel_args_destroy(exec_ctx, r->resolved_result); } - grpc_pollset_set_destroy(r->interested_parties); + grpc_pollset_set_destroy(exec_ctx, r->interested_parties); gpr_free(r->name_to_resolve); gpr_free(r->default_port); grpc_channel_args_destroy(exec_ctx, r->channel_args); diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 75a8b5a55bc95c490a12c74eb0f395539e1e26cc..5b5f6e54f278eff936f2e3e8f083a42c9852080f 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -1037,8 +1037,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, GPR_TIMER_BEGIN("perform_stream_op_locked", 0); grpc_transport_stream_op *op = stream_op; - grpc_chttp2_transport *t = op->transport_private.args[0]; - grpc_chttp2_stream *s = op->transport_private.args[1]; + grpc_chttp2_transport *t = op->handler_private.args[0]; + grpc_chttp2_stream *s = op->handler_private.args[1]; if (grpc_http_trace) { char *str = grpc_transport_stream_op_string(op); @@ -1259,13 +1259,13 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, gpr_free(str); } - op->transport_private.args[0] = gt; - op->transport_private.args[1] = gs; + op->handler_private.args[0] = gt; + op->handler_private.args[1] = gs; GRPC_CHTTP2_STREAM_REF(s, "perform_stream_op"); grpc_closure_sched( exec_ctx, grpc_closure_init( - &op->transport_private.closure, perform_stream_op_locked, op, + &op->handler_private.closure, perform_stream_op_locked, op, grpc_combiner_scheduler(t->combiner, op->covered_by_poller)), GRPC_ERROR_NONE); GPR_TIMER_END("perform_stream_op", 0); diff --git a/src/core/lib/http/httpcli.c b/src/core/lib/http/httpcli.c index fb2108987b15baaebd27b95be08dc6ee92daba39..6d7aa43b815fc1b45748261b9c204e39794b8132 100644 --- a/src/core/lib/http/httpcli.c +++ b/src/core/lib/http/httpcli.c @@ -93,8 +93,9 @@ void grpc_httpcli_context_init(grpc_httpcli_context *context) { context->pollset_set = grpc_pollset_set_create(); } -void grpc_httpcli_context_destroy(grpc_httpcli_context *context) { - grpc_pollset_set_destroy(context->pollset_set); +void grpc_httpcli_context_destroy(grpc_exec_ctx *exec_ctx, + grpc_httpcli_context *context) { + grpc_pollset_set_destroy(exec_ctx, context->pollset_set); } static void next_address(grpc_exec_ctx *exec_ctx, internal_request *req, diff --git a/src/core/lib/http/httpcli.h b/src/core/lib/http/httpcli.h index 11e03b44dfd37c126431dae7a2835f3afc07938c..8ae03ee78f45d1cae598188c73dd456969a6c114 100644 --- a/src/core/lib/http/httpcli.h +++ b/src/core/lib/http/httpcli.h @@ -83,7 +83,8 @@ typedef struct grpc_httpcli_request { typedef struct grpc_http_response grpc_httpcli_response; void grpc_httpcli_context_init(grpc_httpcli_context *context); -void grpc_httpcli_context_destroy(grpc_httpcli_context *context); +void grpc_httpcli_context_destroy(grpc_exec_ctx *exec_ctx, + grpc_httpcli_context *context); /* Asynchronously perform a HTTP GET. 'context' specifies the http context under which to do the get diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c index fc56843128ebe378265e925399429e83d30b91a2..fac3705142b974534e7948212ebcadb79414d2bf 100644 --- a/src/core/lib/iomgr/ev_epoll_linux.c +++ b/src/core/lib/iomgr/ev_epoll_linux.c @@ -1842,13 +1842,12 @@ static grpc_pollset_set *pollset_set_create(void) { return pss; } -static void pollset_set_destroy(grpc_pollset_set *pss) { +static void pollset_set_destroy(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *pss) { gpr_mu_destroy(&pss->po.mu); if (pss->po.pi != NULL) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - PI_UNREF(&exec_ctx, pss->po.pi, "pss_destroy"); - grpc_exec_ctx_finish(&exec_ctx); + PI_UNREF(exec_ctx, pss->po.pi, "pss_destroy"); } gpr_free(pss); diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index c06c6544ca37c4c51a6e2ce98744d87052d8209e..1a12e79da6a123ca467bb850056d90ec4b09ce65 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -149,7 +149,7 @@ static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *rec, static bool fd_is_orphaned(grpc_fd *fd); /* Reference counting for fds */ -/*#define GRPC_FD_REF_COUNT_DEBUG*/ +//#define GRPC_FD_REF_COUNT_DEBUG #ifdef GRPC_FD_REF_COUNT_DEBUG static void fd_ref(grpc_fd *fd, const char *reason, const char *file, int line); static void fd_unref(grpc_fd *fd, const char *reason, const char *file, @@ -191,6 +191,7 @@ struct grpc_pollset { int kicked_without_pollers; grpc_closure *shutdown_done; grpc_closure_list idle_jobs; + int pollset_set_count; /* all polled fds */ size_t fd_count; size_t fd_capacity; @@ -228,7 +229,7 @@ static grpc_error *pollset_kick_ext(grpc_pollset *p, /* Return 1 if the pollset has active threads in pollset_work (pollset must * be locked) */ -static int pollset_has_workers(grpc_pollset *pollset); +static bool pollset_has_workers(grpc_pollset *pollset); /******************************************************************************* * pollset_set definitions @@ -282,8 +283,8 @@ cv_fd_table g_cvfds; static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file, int line) { gpr_log(GPR_DEBUG, "FD %d %p ref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n, - gpr_atm_no_barrier_load(&fd->refst), - gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line); + (int)gpr_atm_no_barrier_load(&fd->refst), + (int)gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line); #else #define REF_BY(fd, n, reason) ref_by(fd, n) #define UNREF_BY(fd, n, reason) unref_by(fd, n) @@ -297,8 +298,8 @@ static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file, int line) { gpr_atm old; gpr_log(GPR_DEBUG, "FD %d %p unref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n, - gpr_atm_no_barrier_load(&fd->refst), - gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line); + (int)gpr_atm_no_barrier_load(&fd->refst), + (int)gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line); #else static void unref_by(grpc_fd *fd, int n) { gpr_atm old; @@ -658,10 +659,18 @@ static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) { worker->next->prev = worker->prev; } -static int pollset_has_workers(grpc_pollset *p) { +static bool pollset_has_workers(grpc_pollset *p) { return p->root_worker.next != &p->root_worker; } +static bool pollset_in_pollset_sets(grpc_pollset *p) { + return p->pollset_set_count; +} + +static bool pollset_has_observers(grpc_pollset *p) { + return pollset_has_workers(p) || pollset_in_pollset_sets(p); +} + static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) { if (pollset_has_workers(p)) { grpc_pollset_worker *w = p->root_worker.next; @@ -800,6 +809,7 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { pollset->fd_count = 0; pollset->fd_capacity = 0; pollset->fds = NULL; + pollset->pollset_set_count = 0; } static void pollset_destroy(grpc_pollset *pollset) { @@ -1061,7 +1071,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (pollset->shutting_down) { if (pollset_has_workers(pollset)) { pollset_kick(pollset, NULL); - } else if (!pollset->called_shutdown) { + } else if (!pollset->called_shutdown && !pollset_has_observers(pollset)) { pollset->called_shutdown = 1; gpr_mu_unlock(&pollset->mu); finish_shutdown(exec_ctx, pollset); @@ -1093,7 +1103,7 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (!pollset_has_workers(pollset)) { grpc_closure_list_sched(exec_ctx, &pollset->idle_jobs); } - if (!pollset->called_shutdown && !pollset_has_workers(pollset)) { + if (!pollset->called_shutdown && !pollset_has_observers(pollset)) { pollset->called_shutdown = 1; finish_shutdown(exec_ctx, pollset); } @@ -1126,12 +1136,27 @@ static grpc_pollset_set *pollset_set_create(void) { return pollset_set; } -static void pollset_set_destroy(grpc_pollset_set *pollset_set) { +static void pollset_set_destroy(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *pollset_set) { size_t i; gpr_mu_destroy(&pollset_set->mu); for (i = 0; i < pollset_set->fd_count; i++) { GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set"); } + for (i = 0; i < pollset_set->pollset_count; i++) { + grpc_pollset *pollset = pollset_set->pollsets[i]; + gpr_mu_lock(&pollset->mu); + pollset->pollset_set_count--; + /* check shutdown */ + if (pollset->shutting_down && !pollset->called_shutdown && + !pollset_has_observers(pollset)) { + pollset->called_shutdown = 1; + gpr_mu_unlock(&pollset->mu); + finish_shutdown(exec_ctx, pollset); + } else { + gpr_mu_unlock(&pollset->mu); + } + } gpr_free(pollset_set->pollsets); gpr_free(pollset_set->pollset_sets); gpr_free(pollset_set->fds); @@ -1142,6 +1167,9 @@ static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pollset_set, grpc_pollset *pollset) { size_t i, j; + gpr_mu_lock(&pollset->mu); + pollset->pollset_set_count++; + gpr_mu_unlock(&pollset->mu); gpr_mu_lock(&pollset_set->mu); if (pollset_set->pollset_count == pollset_set->pollset_capacity) { pollset_set->pollset_capacity = @@ -1177,6 +1205,17 @@ static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx, } } gpr_mu_unlock(&pollset_set->mu); + gpr_mu_lock(&pollset->mu); + pollset->pollset_set_count--; + /* check shutdown */ + if (pollset->shutting_down && !pollset->called_shutdown && + !pollset_has_observers(pollset)) { + pollset->called_shutdown = 1; + gpr_mu_unlock(&pollset->mu); + finish_shutdown(exec_ctx, pollset); + } else { + gpr_mu_unlock(&pollset->mu); + } } static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c index 95b1d99d7577773612aa09f766e1e4e5a0aa5102..b5be5504b9d044462e6fff3b3999a56dae36fa8f 100644 --- a/src/core/lib/iomgr/ev_posix.c +++ b/src/core/lib/iomgr/ev_posix.c @@ -215,8 +215,9 @@ grpc_pollset_set *grpc_pollset_set_create(void) { return g_event_engine->pollset_set_create(); } -void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set) { - g_event_engine->pollset_set_destroy(pollset_set); +void grpc_pollset_set_destroy(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *pollset_set) { + g_event_engine->pollset_set_destroy(exec_ctx, pollset_set); } void grpc_pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h index 1aea7d61f32e65b1720a2cbb4df46abb6ef28ffe..1a9e5c115aec82206ec680cadc4ab2820adb70f4 100644 --- a/src/core/lib/iomgr/ev_posix.h +++ b/src/core/lib/iomgr/ev_posix.h @@ -74,7 +74,8 @@ typedef struct grpc_event_engine_vtable { struct grpc_fd *fd); grpc_pollset_set *(*pollset_set_create)(void); - void (*pollset_set_destroy)(grpc_pollset_set *pollset_set); + void (*pollset_set_destroy)(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *pollset_set); void (*pollset_set_add_pollset)(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pollset_set, grpc_pollset *pollset); diff --git a/src/core/lib/iomgr/network_status_tracker.c b/src/core/lib/iomgr/network_status_tracker.c index 1601a390028f81bbff9ebe6211d849528059d09d..4104bf927a181349d1a2d6b73925cacc6a6f3b64 100644 --- a/src/core/lib/iomgr/network_status_tracker.c +++ b/src/core/lib/iomgr/network_status_tracker.c @@ -31,95 +31,18 @@ * */ -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> #include "src/core/lib/iomgr/endpoint.h" -typedef struct endpoint_ll_node { - grpc_endpoint *ep; - struct endpoint_ll_node *next; -} endpoint_ll_node; - -static endpoint_ll_node *head = NULL; -static gpr_mu g_endpoint_mutex; - -void grpc_network_status_shutdown(void) { - if (head != NULL) { - gpr_log(GPR_ERROR, - "Memory leaked as not all network endpoints were shut down"); - } - gpr_mu_destroy(&g_endpoint_mutex); -} +void grpc_network_status_shutdown(void) {} void grpc_network_status_init(void) { - gpr_mu_init(&g_endpoint_mutex); // TODO(makarandd): Install callback with OS to monitor network status. } -void grpc_destroy_network_status_monitor() { - for (endpoint_ll_node *curr = head; curr != NULL;) { - endpoint_ll_node *next = curr->next; - gpr_free(curr); - curr = next; - } - gpr_mu_destroy(&g_endpoint_mutex); -} - -void grpc_network_status_register_endpoint(grpc_endpoint *ep) { - gpr_mu_lock(&g_endpoint_mutex); - if (head == NULL) { - head = (endpoint_ll_node *)gpr_malloc(sizeof(endpoint_ll_node)); - head->ep = ep; - head->next = NULL; - } else { - endpoint_ll_node *prev_head = head; - head = (endpoint_ll_node *)gpr_malloc(sizeof(endpoint_ll_node)); - head->ep = ep; - head->next = prev_head; - } - gpr_mu_unlock(&g_endpoint_mutex); -} +void grpc_destroy_network_status_monitor() {} -void grpc_network_status_unregister_endpoint(grpc_endpoint *ep) { - gpr_mu_lock(&g_endpoint_mutex); - GPR_ASSERT(head); - bool found = false; - endpoint_ll_node *prev = head; - // if we're unregistering the head, just move head to the next - if (ep == head->ep) { - head = head->next; - gpr_free(prev); - found = true; - } else { - for (endpoint_ll_node *curr = head->next; curr != NULL; curr = curr->next) { - if (ep == curr->ep) { - prev->next = curr->next; - gpr_free(curr); - found = true; - break; - } - prev = curr; - } - } - gpr_mu_unlock(&g_endpoint_mutex); - GPR_ASSERT(found); -} +void grpc_network_status_register_endpoint(grpc_endpoint *ep) { (void)ep; } -// Walk the linked-list from head and execute shutdown. It is possible that -// other threads might be in the process of shutdown as well, but that has -// no side effect since endpoint shutdown is idempotent. -void grpc_network_status_shutdown_all_endpoints() { - gpr_mu_lock(&g_endpoint_mutex); - if (head == NULL) { - gpr_mu_unlock(&g_endpoint_mutex); - return; - } - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; +void grpc_network_status_unregister_endpoint(grpc_endpoint *ep) { (void)ep; } - for (endpoint_ll_node *curr = head; curr != NULL; curr = curr->next) { - curr->ep->vtable->shutdown(&exec_ctx, curr->ep, - GRPC_ERROR_CREATE("Network unavailable")); - } - gpr_mu_unlock(&g_endpoint_mutex); - grpc_exec_ctx_finish(&exec_ctx); -} +void grpc_network_status_shutdown_all_endpoints() {} diff --git a/src/core/lib/iomgr/pollset_set.h b/src/core/lib/iomgr/pollset_set.h index 34bb728c410983b856878dc84cd7db2cc6967d3a..d11801d63b013a64883088c5a49492cfb9d0b26e 100644 --- a/src/core/lib/iomgr/pollset_set.h +++ b/src/core/lib/iomgr/pollset_set.h @@ -44,7 +44,8 @@ typedef struct grpc_pollset_set grpc_pollset_set; grpc_pollset_set *grpc_pollset_set_create(void); -void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set); +void grpc_pollset_set_destroy(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *pollset_set); void grpc_pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pollset_set, grpc_pollset *pollset); diff --git a/src/core/lib/iomgr/pollset_set_uv.c b/src/core/lib/iomgr/pollset_set_uv.c index e5ef8b29e0397ece488db93d55c81f66d24ceaaa..836cfee4efc1597e24e5391ce689a8c3d304608d 100644 --- a/src/core/lib/iomgr/pollset_set_uv.c +++ b/src/core/lib/iomgr/pollset_set_uv.c @@ -41,7 +41,8 @@ grpc_pollset_set* grpc_pollset_set_create(void) { return (grpc_pollset_set*)((intptr_t)0xdeafbeef); } -void grpc_pollset_set_destroy(grpc_pollset_set* pollset_set) {} +void grpc_pollset_set_destroy(grpc_exec_ctx* exec_ctx, + grpc_pollset_set* pollset_set) {} void grpc_pollset_set_add_pollset(grpc_exec_ctx* exec_ctx, grpc_pollset_set* pollset_set, diff --git a/src/core/lib/iomgr/pollset_set_windows.c b/src/core/lib/iomgr/pollset_set_windows.c index 645650db9bebc3ccd0f01844dbe3363f03458a9c..ae18c8a3ce113e793f0a03907cfb896e225be921 100644 --- a/src/core/lib/iomgr/pollset_set_windows.c +++ b/src/core/lib/iomgr/pollset_set_windows.c @@ -42,7 +42,8 @@ grpc_pollset_set* grpc_pollset_set_create(void) { return (grpc_pollset_set*)((intptr_t)0xdeafbeef); } -void grpc_pollset_set_destroy(grpc_pollset_set* pollset_set) {} +void grpc_pollset_set_destroy(grpc_exec_ctx* exec_ctx, + grpc_pollset_set* pollset_set) {} void grpc_pollset_set_add_pollset(grpc_exec_ctx* exec_ctx, grpc_pollset_set* pollset_set, diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c index 40c83514724c61daef6521ee66540891236bb151..8a5617e7c13dde50b1d621437bb329162c70fb1f 100644 --- a/src/core/lib/iomgr/timer_generic.c +++ b/src/core/lib/iomgr/timer_generic.c @@ -121,12 +121,6 @@ void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) { g_initialized = false; } -/* This is a cheap, but good enough, pointer hash for sharding the tasks: */ -static size_t shard_idx(const grpc_timer *info) { - size_t x = (size_t)info; - return ((x >> 4) ^ (x >> 9) ^ (x >> 14)) & (NUM_SHARDS - 1); -} - static double ts_to_dbl(gpr_timespec ts) { return (double)ts.tv_sec + 1e-9 * ts.tv_nsec; } @@ -181,7 +175,7 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, gpr_timespec deadline, grpc_closure *closure, gpr_timespec now) { int is_first_timer = 0; - shard_type *shard = &g_shards[shard_idx(timer)]; + shard_type *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)]; GPR_ASSERT(deadline.clock_type == g_clock_type); GPR_ASSERT(now.clock_type == g_clock_type); timer->closure = closure; @@ -247,7 +241,7 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { return; } - shard_type *shard = &g_shards[shard_idx(timer)]; + shard_type *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)]; gpr_mu_lock(&shard->mu); if (!timer->triggered) { grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_CANCELLED); diff --git a/src/core/lib/security/credentials/google_default/google_default_credentials.c b/src/core/lib/security/credentials/google_default/google_default_credentials.c index b188d6ef9e439843fe370dfef0137ae84f0f0515..dd446213477c0ef850609d515ebba6900022155f 100644 --- a/src/core/lib/security/credentials/google_default/google_default_credentials.c +++ b/src/core/lib/security/credentials/google_default/google_default_credentials.c @@ -154,7 +154,7 @@ static int is_stack_running_on_compute_engine(grpc_exec_ctx *exec_ctx) { } gpr_mu_unlock(g_polling_mu); - grpc_httpcli_context_destroy(&context); + grpc_httpcli_context_destroy(exec_ctx, &context); grpc_closure_init(&destroy_closure, destroy_pollset, grpc_polling_entity_pollset(&detector.pollent), grpc_schedule_on_exec_ctx); diff --git a/src/core/lib/security/credentials/jwt/jwt_verifier.c b/src/core/lib/security/credentials/jwt/jwt_verifier.c index 2480f1c2faa6d0f4c3119fb39d2cb2975452bf55..5c59cf0f4ad1fa76a3015e068a1635f2f89b0953 100644 --- a/src/core/lib/security/credentials/jwt/jwt_verifier.c +++ b/src/core/lib/security/credentials/jwt/jwt_verifier.c @@ -895,10 +895,10 @@ grpc_jwt_verifier *grpc_jwt_verifier_create( return v; } -void grpc_jwt_verifier_destroy(grpc_jwt_verifier *v) { +void grpc_jwt_verifier_destroy(grpc_exec_ctx *exec_ctx, grpc_jwt_verifier *v) { size_t i; if (v == NULL) return; - grpc_httpcli_context_destroy(&v->http_ctx); + grpc_httpcli_context_destroy(exec_ctx, &v->http_ctx); if (v->mappings != NULL) { for (i = 0; i < v->num_mappings; i++) { gpr_free(v->mappings[i].email_domain); diff --git a/src/core/lib/security/credentials/jwt/jwt_verifier.h b/src/core/lib/security/credentials/jwt/jwt_verifier.h index 4fa320a415943dc5549f209f65478d5347405b4b..5c3d2a778870e9064f336a48dcf8c18f8e1d3dbb 100644 --- a/src/core/lib/security/credentials/jwt/jwt_verifier.h +++ b/src/core/lib/security/credentials/jwt/jwt_verifier.h @@ -109,7 +109,8 @@ grpc_jwt_verifier *grpc_jwt_verifier_create( size_t num_mappings); /*The verifier must not be destroyed if there are still outstanding callbacks.*/ -void grpc_jwt_verifier_destroy(grpc_jwt_verifier *verifier); +void grpc_jwt_verifier_destroy(grpc_exec_ctx *exec_ctx, + grpc_jwt_verifier *verifier); /* User provided callback that will be called when the verification of the JWT is done (maybe in another thread). diff --git a/src/core/lib/security/credentials/oauth2/oauth2_credentials.c b/src/core/lib/security/credentials/oauth2/oauth2_credentials.c index c763c0e8cf7d20a89838d21c0af256ac0c3c280c..ccfb3566c1127bd319f39c0f737d1ebe72d67c1f 100644 --- a/src/core/lib/security/credentials/oauth2/oauth2_credentials.c +++ b/src/core/lib/security/credentials/oauth2/oauth2_credentials.c @@ -124,7 +124,7 @@ static void oauth2_token_fetcher_destruct(grpc_exec_ctx *exec_ctx, (grpc_oauth2_token_fetcher_credentials *)creds; grpc_credentials_md_store_unref(exec_ctx, c->access_token_md); gpr_mu_destroy(&c->mu); - grpc_httpcli_context_destroy(&c->httpcli_context); + grpc_httpcli_context_destroy(exec_ctx, &c->httpcli_context); } grpc_credentials_status diff --git a/src/core/lib/support/cpu_posix.c b/src/core/lib/support/cpu_posix.c index 667bde7cad4e4deb355e47f4867c7d0daba0eaa2..245f12f06d71d7efa4f79b436eb96330330088eb 100644 --- a/src/core/lib/support/cpu_posix.c +++ b/src/core/lib/support/cpu_posix.c @@ -41,6 +41,7 @@ #include <grpc/support/log.h> #include <grpc/support/sync.h> +#include <grpc/support/useful.h> static __thread char magic_thread_local; @@ -60,18 +61,12 @@ unsigned gpr_cpu_num_cores(void) { return (unsigned)ncpus; } -/* This is a cheap, but good enough, pointer hash for sharding things: */ -static size_t shard_ptr(const void *info) { - size_t x = (size_t)info; - return ((x >> 4) ^ (x >> 9) ^ (x >> 14)) % gpr_cpu_num_cores(); -} - unsigned gpr_cpu_current_cpu(void) { /* NOTE: there's no way I know to return the actual cpu index portably... most code that's using this is using it to shard across work queues though, so here we use thread identity instead to achieve a similar though not identical effect */ - return (unsigned)shard_ptr(&magic_thread_local); + return (unsigned)GPR_HASH_POINTER(&magic_thread_local, gpr_cpu_num_cores()); } #endif /* GPR_CPU_POSIX */ diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index e79de72576392ef7051351a41020f45335edb69e..96463d42c8423004ff558c911ca794d9e2a68d93 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -480,7 +480,10 @@ void grpc_call_destroy(grpc_call *c) { c->destroy_called = 1; cancel = !c->received_final_op; gpr_mu_unlock(&c->mu); - if (cancel) grpc_call_cancel(c, NULL); + if (cancel) { + cancel_with_error(&exec_ctx, c, STATUS_FROM_API_OVERRIDE, + GRPC_ERROR_CANCELLED); + } GRPC_CALL_INTERNAL_UNREF(&exec_ctx, c, "destroy"); grpc_exec_ctx_finish(&exec_ctx); GPR_TIMER_END("grpc_call_destroy", 0); @@ -489,8 +492,11 @@ void grpc_call_destroy(grpc_call *c) { grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) { GRPC_API_TRACE("grpc_call_cancel(call=%p, reserved=%p)", 2, (call, reserved)); GPR_ASSERT(!reserved); - return grpc_call_cancel_with_status(call, GRPC_STATUS_CANCELLED, "Cancelled", - NULL); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + cancel_with_error(&exec_ctx, call, STATUS_FROM_API_OVERRIDE, + GRPC_ERROR_CANCELLED); + grpc_exec_ctx_finish(&exec_ctx); + return GRPC_CALL_OK; } static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call, diff --git a/src/core/lib/transport/connectivity_state.c b/src/core/lib/transport/connectivity_state.c index 8fc5bf3e9a6bfab042e90667a5172ec0d7decd79..afe1f6164d71974baa52c508cc730347067a2643 100644 --- a/src/core/lib/transport/connectivity_state.c +++ b/src/core/lib/transport/connectivity_state.c @@ -62,7 +62,7 @@ const char *grpc_connectivity_state_name(grpc_connectivity_state state) { void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker, grpc_connectivity_state init_state, const char *name) { - tracker->current_state = init_state; + gpr_atm_no_barrier_store(&tracker->current_state_atm, init_state); tracker->current_error = GRPC_ERROR_NONE; tracker->watchers = NULL; tracker->name = gpr_strdup(name); @@ -89,15 +89,30 @@ void grpc_connectivity_state_destroy(grpc_exec_ctx *exec_ctx, } grpc_connectivity_state grpc_connectivity_state_check( + grpc_connectivity_state_tracker *tracker) { + grpc_connectivity_state cur = + (grpc_connectivity_state)gpr_atm_no_barrier_load( + &tracker->current_state_atm); + if (grpc_connectivity_state_trace) { + gpr_log(GPR_DEBUG, "CONWATCH: %p %s: get %s", tracker, tracker->name, + grpc_connectivity_state_name(cur)); + } + return cur; +} + +grpc_connectivity_state grpc_connectivity_state_get( grpc_connectivity_state_tracker *tracker, grpc_error **error) { + grpc_connectivity_state cur = + (grpc_connectivity_state)gpr_atm_no_barrier_load( + &tracker->current_state_atm); if (grpc_connectivity_state_trace) { gpr_log(GPR_DEBUG, "CONWATCH: %p %s: get %s", tracker, tracker->name, - grpc_connectivity_state_name(tracker->current_state)); + grpc_connectivity_state_name(cur)); } if (error != NULL) { *error = GRPC_ERROR_REF(tracker->current_error); } - return tracker->current_state; + return cur; } bool grpc_connectivity_state_has_watchers( @@ -108,6 +123,9 @@ bool grpc_connectivity_state_has_watchers( bool grpc_connectivity_state_notify_on_state_change( grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current, grpc_closure *notify) { + grpc_connectivity_state cur = + (grpc_connectivity_state)gpr_atm_no_barrier_load( + &tracker->current_state_atm); if (grpc_connectivity_state_trace) { if (current == NULL) { gpr_log(GPR_DEBUG, "CONWATCH: %p %s: unsubscribe notify=%p", tracker, @@ -115,7 +133,7 @@ bool grpc_connectivity_state_notify_on_state_change( } else { gpr_log(GPR_DEBUG, "CONWATCH: %p %s: from %s [cur=%s] notify=%p", tracker, tracker->name, grpc_connectivity_state_name(*current), - grpc_connectivity_state_name(tracker->current_state), notify); + grpc_connectivity_state_name(cur), notify); } } if (current == NULL) { @@ -138,8 +156,8 @@ bool grpc_connectivity_state_notify_on_state_change( } return false; } else { - if (tracker->current_state != *current) { - *current = tracker->current_state; + if (cur != *current) { + *current = cur; grpc_closure_sched(exec_ctx, notify, GRPC_ERROR_REF(tracker->current_error)); } else { @@ -149,7 +167,7 @@ bool grpc_connectivity_state_notify_on_state_change( w->next = tracker->watchers; tracker->watchers = w; } - return tracker->current_state == GRPC_CHANNEL_IDLE; + return cur == GRPC_CHANNEL_IDLE; } } @@ -157,11 +175,14 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker, grpc_connectivity_state state, grpc_error *error, const char *reason) { + grpc_connectivity_state cur = + (grpc_connectivity_state)gpr_atm_no_barrier_load( + &tracker->current_state_atm); grpc_connectivity_state_watcher *w; if (grpc_connectivity_state_trace) { const char *error_string = grpc_error_string(error); gpr_log(GPR_DEBUG, "SET: %p %s: %s --> %s [%s] error=%p %s", tracker, - tracker->name, grpc_connectivity_state_name(tracker->current_state), + tracker->name, grpc_connectivity_state_name(cur), grpc_connectivity_state_name(state), reason, error, error_string); } switch (state) { @@ -178,13 +199,13 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx, } GRPC_ERROR_UNREF(tracker->current_error); tracker->current_error = error; - if (tracker->current_state == state) { + if (cur == state) { return; } - GPR_ASSERT(tracker->current_state != GRPC_CHANNEL_SHUTDOWN); - tracker->current_state = state; + GPR_ASSERT(cur != GRPC_CHANNEL_SHUTDOWN); + gpr_atm_no_barrier_store(&tracker->current_state_atm, state); while ((w = tracker->watchers) != NULL) { - *w->current = tracker->current_state; + *w->current = state; tracker->watchers = w->next; if (grpc_connectivity_state_trace) { gpr_log(GPR_DEBUG, "NOTIFY: %p %s: %p", tracker, tracker->name, diff --git a/src/core/lib/transport/connectivity_state.h b/src/core/lib/transport/connectivity_state.h index 769c675b797ba447fa23c78f15ed975e0a631df1..c9604c34dda6dd26a9e187decb0c2b344c628616 100644 --- a/src/core/lib/transport/connectivity_state.h +++ b/src/core/lib/transport/connectivity_state.h @@ -47,8 +47,8 @@ typedef struct grpc_connectivity_state_watcher { } grpc_connectivity_state_watcher; typedef struct { - /** current connectivity state */ - grpc_connectivity_state current_state; + /** current grpc_connectivity_state */ + gpr_atm current_state_atm; /** error associated with state */ grpc_error *current_error; /** all our watchers */ @@ -59,6 +59,7 @@ typedef struct { extern int grpc_connectivity_state_trace; +/** enum --> string conversion */ const char *grpc_connectivity_state_name(grpc_connectivity_state state); void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker, @@ -68,22 +69,31 @@ void grpc_connectivity_state_destroy(grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker); /** Set connectivity state; not thread safe; access must be serialized with an - * external lock */ + * external lock */ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker, grpc_connectivity_state state, grpc_error *associated_error, const char *reason); +/** Return true if this connectivity state has watchers. + Access must be serialized with an external lock. */ bool grpc_connectivity_state_has_watchers( grpc_connectivity_state_tracker *tracker); +/** Return the last seen connectivity state. No need to synchronize access. */ grpc_connectivity_state grpc_connectivity_state_check( - grpc_connectivity_state_tracker *tracker, grpc_error **current_error); + grpc_connectivity_state_tracker *tracker); + +/** Return the last seen connectivity state, and the associated error. + Access must be serialized with an external lock. */ +grpc_connectivity_state grpc_connectivity_state_get( + grpc_connectivity_state_tracker *tracker, grpc_error **error); /** Return 1 if the channel should start connecting, 0 otherwise. If current==NULL cancel notify if it is already queued (success==0 in that - case) */ + case). + Access must be serialized with an external lock. */ bool grpc_connectivity_state_notify_on_state_change( grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current, grpc_closure *notify); diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index f2dbb891b24bbf40b32612a5d73f653f21dc2cd0..bb23c0225aa9877779d6d6acdace9704500a4b76 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -167,9 +167,9 @@ typedef struct grpc_transport_stream_op { /*************************************************************************** * remaining fields are initialized and used at the discretion of the - * transport implementation */ + * current handler of the op */ - grpc_transport_private_op_data transport_private; + grpc_transport_private_op_data handler_private; } grpc_transport_stream_op; /** Transport op: a set of operations to perform on a transport as a whole */ diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs index 5ba83b143ed19d879462132a32b612fcd7a45c89..68a8f4879b46b6690b28862bcd3627990394751e 100644 --- a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs +++ b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs @@ -56,7 +56,7 @@ namespace Grpc.IntegrationTesting { private class ClientOptions { - [Option("server_host", Default = "127.0.0.1")] + [Option("server_host", Default = "localhost")] public string ServerHost { get; set; } [Option("server_host_override", Default = TestCredentials.DefaultHostOverride)] diff --git a/src/python/grpcio_tests/tests/interop/client.py b/src/python/grpcio_tests/tests/interop/client.py index 833818e6625bb1fa1d1ec73aca3f3c31708a5677..97f6843d3cb59d75eccce9078dbb017a5640c0b7 100644 --- a/src/python/grpcio_tests/tests/interop/client.py +++ b/src/python/grpcio_tests/tests/interop/client.py @@ -45,7 +45,7 @@ def _args(): '--server_host', help='the host to which to connect', type=str, - default="127.0.0.1") + default="localhost") parser.add_argument( '--server_port', help='the port to which to connect', type=int) parser.add_argument( diff --git a/test/core/end2end/fixtures/http_proxy.c b/test/core/end2end/fixtures/http_proxy.c index 6fdc86fc12fa5ac58d9266cbd26c0e5d1c7e6d95..2682ea0e7b0084e899c9e14fe58eb65b7ea9fedd 100644 --- a/test/core/end2end/fixtures/http_proxy.c +++ b/test/core/end2end/fixtures/http_proxy.c @@ -110,7 +110,7 @@ static void proxy_connection_unref(grpc_exec_ctx* exec_ctx, grpc_endpoint_destroy(exec_ctx, conn->client_endpoint); if (conn->server_endpoint != NULL) grpc_endpoint_destroy(exec_ctx, conn->server_endpoint); - grpc_pollset_set_destroy(conn->pollset_set); + grpc_pollset_set_destroy(exec_ctx, conn->pollset_set); grpc_slice_buffer_destroy_internal(exec_ctx, &conn->client_read_buffer); grpc_slice_buffer_destroy_internal(exec_ctx, &conn->client_deferred_write_buffer); diff --git a/test/core/end2end/tests/network_status_change.c b/test/core/end2end/tests/network_status_change.c index 9cef02b2b3b79d60bfe56f47201418f6cf2ef249..7540ce93a1b18746f98824a13848a249b3a8f0d9 100644 --- a/test/core/end2end/tests/network_status_change.c +++ b/test/core/end2end/tests/network_status_change.c @@ -212,8 +212,11 @@ static void test_invoke_network_status_change(grpc_end2end_test_config config) { CQ_EXPECT_COMPLETION(cqv, tag(1), 1); cq_verify(cqv); + // TODO(makdharma) Update this when the shutdown_all_endpoints is implemented. // Expected behavior of a RPC when network is lost. - GPR_ASSERT(status == GRPC_STATUS_UNAVAILABLE); + // GPR_ASSERT(status == GRPC_STATUS_UNAVAILABLE); + GPR_ASSERT(status == GRPC_STATUS_OK); + GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/foo")); validate_host_override_string("foo.test.google.fr:1234", call_details.host, config); diff --git a/test/core/http/httpcli_test.c b/test/core/http/httpcli_test.c index 6cc00f871d8e7db430651a885f3619bec0e1429a..be8301c5e33940c2254e8620d4f63daf2269fc1d 100644 --- a/test/core/http/httpcli_test.c +++ b/test/core/http/httpcli_test.c @@ -209,7 +209,7 @@ int main(int argc, char **argv) { test_get(port); test_post(port); - grpc_httpcli_context_destroy(&g_context); + grpc_httpcli_context_destroy(&exec_ctx, &g_context); grpc_closure_init(&destroyed, destroy_pops, &g_pops, grpc_schedule_on_exec_ctx); grpc_pollset_shutdown(&exec_ctx, grpc_polling_entity_pollset(&g_pops), diff --git a/test/core/http/httpscli_test.c b/test/core/http/httpscli_test.c index e1a26d91e9ea4bb70106ead1fd4799a8b1eab4db..5a6f07bec25d3c053f694fb131f9f7b0acdbaec5 100644 --- a/test/core/http/httpscli_test.c +++ b/test/core/http/httpscli_test.c @@ -212,7 +212,7 @@ int main(int argc, char **argv) { test_get(port); test_post(port); - grpc_httpcli_context_destroy(&g_context); + grpc_httpcli_context_destroy(&exec_ctx, &g_context); grpc_closure_init(&destroyed, destroy_pops, &g_pops, grpc_schedule_on_exec_ctx); grpc_pollset_shutdown(&exec_ctx, grpc_polling_entity_pollset(&g_pops), diff --git a/test/core/iomgr/pollset_set_test.c b/test/core/iomgr/pollset_set_test.c index 40fa8586020977e2373fc6920ce7482b38f1f091..e7777acce1e25aaa6ae45bd93070d2067d951297 100644 --- a/test/core/iomgr/pollset_set_test.c +++ b/test/core/iomgr/pollset_set_test.c @@ -59,10 +59,11 @@ void init_test_pollset_sets(test_pollset_set *pollset_sets, const int num_pss) { } } -void cleanup_test_pollset_sets(test_pollset_set *pollset_sets, +void cleanup_test_pollset_sets(grpc_exec_ctx *exec_ctx, + test_pollset_set *pollset_sets, const int num_pss) { for (int i = 0; i < num_pss; i++) { - grpc_pollset_set_destroy(pollset_sets[i].pss); + grpc_pollset_set_destroy(exec_ctx, pollset_sets[i].pss); pollset_sets[i].pss = NULL; } } @@ -297,7 +298,7 @@ static void pollset_set_test_basic() { cleanup_test_fds(&exec_ctx, tfds, num_fds); cleanup_test_pollsets(&exec_ctx, pollsets, num_ps); - cleanup_test_pollset_sets(pollset_sets, num_pss); + cleanup_test_pollset_sets(&exec_ctx, pollset_sets, num_pss); grpc_exec_ctx_finish(&exec_ctx); } @@ -372,7 +373,7 @@ void pollset_set_test_dup_fds() { cleanup_test_fds(&exec_ctx, tfds, num_fds); cleanup_test_pollsets(&exec_ctx, &pollset, num_ps); - cleanup_test_pollset_sets(pollset_sets, num_pss); + cleanup_test_pollset_sets(&exec_ctx, pollset_sets, num_pss); grpc_exec_ctx_finish(&exec_ctx); } @@ -437,7 +438,7 @@ void pollset_set_test_empty_pollset() { cleanup_test_fds(&exec_ctx, tfds, num_fds); cleanup_test_pollsets(&exec_ctx, pollsets, num_ps); - cleanup_test_pollset_sets(&pollset_set, num_pss); + cleanup_test_pollset_sets(&exec_ctx, &pollset_set, num_pss); grpc_exec_ctx_finish(&exec_ctx); } diff --git a/test/core/iomgr/resolve_address_posix_test.c b/test/core/iomgr/resolve_address_posix_test.c index a4feff8b009cb94abd8f04604e929d5cc85b5fce..ef4cfdf06f856005fc4c193fe9129cf39f4dee7b 100644 --- a/test/core/iomgr/resolve_address_posix_test.c +++ b/test/core/iomgr/resolve_address_posix_test.c @@ -74,7 +74,7 @@ void args_finish(grpc_exec_ctx *exec_ctx, args_struct *args) { GPR_ASSERT(gpr_event_wait(&args->ev, test_deadline())); grpc_resolved_addresses_destroy(args->addrs); grpc_pollset_set_del_pollset(exec_ctx, args->pollset_set, args->pollset); - grpc_pollset_set_destroy(args->pollset_set); + grpc_pollset_set_destroy(exec_ctx, args->pollset_set); grpc_closure do_nothing_cb; grpc_closure_init(&do_nothing_cb, do_nothing, NULL, grpc_schedule_on_exec_ctx); diff --git a/test/core/iomgr/resolve_address_test.c b/test/core/iomgr/resolve_address_test.c index 54de9a20e1cd48d160ee82edf030f923dd07041e..6a9bb5ae6f3fdb9f5a03227f64aea05b768a4dc8 100644 --- a/test/core/iomgr/resolve_address_test.c +++ b/test/core/iomgr/resolve_address_test.c @@ -69,7 +69,7 @@ void args_finish(grpc_exec_ctx *exec_ctx, args_struct *args) { GPR_ASSERT(gpr_event_wait(&args->ev, test_deadline())); grpc_resolved_addresses_destroy(args->addrs); grpc_pollset_set_del_pollset(exec_ctx, args->pollset_set, args->pollset); - grpc_pollset_set_destroy(args->pollset_set); + grpc_pollset_set_destroy(exec_ctx, args->pollset_set); grpc_closure do_nothing_cb; grpc_closure_init(&do_nothing_cb, do_nothing, NULL, grpc_schedule_on_exec_ctx); diff --git a/test/core/iomgr/tcp_client_posix_test.c b/test/core/iomgr/tcp_client_posix_test.c index dcdff8efb13f3e5c153ec23cef6a4fd76bde4c50..c9b514a024a484596149f6fad52fc1b30e955758 100644 --- a/test/core/iomgr/tcp_client_posix_test.c +++ b/test/core/iomgr/tcp_client_posix_test.c @@ -207,7 +207,7 @@ int main(int argc, char **argv) { test_succeeds(); gpr_log(GPR_ERROR, "End of first test"); test_fails(); - grpc_pollset_set_destroy(g_pollset_set); + grpc_pollset_set_destroy(&exec_ctx, g_pollset_set); grpc_closure_init(&destroyed, destroy_pollset, g_pollset, grpc_schedule_on_exec_ctx); grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed); diff --git a/test/core/security/jwt_verifier_test.c b/test/core/security/jwt_verifier_test.c index a9bd976a39d1a1f2c5c114f80725a7c029890a8d..0a73f675280df4700d6da54d1fb294fee93c8532 100644 --- a/test/core/security/jwt_verifier_test.c +++ b/test/core/security/jwt_verifier_test.c @@ -386,9 +386,9 @@ static void test_jwt_verifier_google_email_issuer_success(void) { GPR_ASSERT(jwt != NULL); grpc_jwt_verifier_verify(&exec_ctx, verifier, NULL, jwt, expected_audience, on_verification_success, (void *)expected_user_data); + grpc_jwt_verifier_destroy(&exec_ctx, verifier); grpc_exec_ctx_finish(&exec_ctx); gpr_free(jwt); - grpc_jwt_verifier_destroy(verifier); grpc_httpcli_set_override(NULL, NULL); } @@ -420,9 +420,9 @@ static void test_jwt_verifier_custom_email_issuer_success(void) { GPR_ASSERT(jwt != NULL); grpc_jwt_verifier_verify(&exec_ctx, verifier, NULL, jwt, expected_audience, on_verification_success, (void *)expected_user_data); + grpc_jwt_verifier_destroy(&exec_ctx, verifier); grpc_exec_ctx_finish(&exec_ctx); gpr_free(jwt); - grpc_jwt_verifier_destroy(verifier); grpc_httpcli_set_override(NULL, NULL); } @@ -469,9 +469,9 @@ static void test_jwt_verifier_url_issuer_success(void) { GPR_ASSERT(jwt != NULL); grpc_jwt_verifier_verify(&exec_ctx, verifier, NULL, jwt, expected_audience, on_verification_success, (void *)expected_user_data); + grpc_jwt_verifier_destroy(&exec_ctx, verifier); grpc_exec_ctx_finish(&exec_ctx); gpr_free(jwt); - grpc_jwt_verifier_destroy(verifier); grpc_httpcli_set_override(NULL, NULL); } @@ -511,9 +511,9 @@ static void test_jwt_verifier_url_issuer_bad_config(void) { grpc_jwt_verifier_verify(&exec_ctx, verifier, NULL, jwt, expected_audience, on_verification_key_retrieval_error, (void *)expected_user_data); + grpc_jwt_verifier_destroy(&exec_ctx, verifier); grpc_exec_ctx_finish(&exec_ctx); gpr_free(jwt); - grpc_jwt_verifier_destroy(verifier); grpc_httpcli_set_override(NULL, NULL); } @@ -534,9 +534,9 @@ static void test_jwt_verifier_bad_json_key(void) { grpc_jwt_verifier_verify(&exec_ctx, verifier, NULL, jwt, expected_audience, on_verification_key_retrieval_error, (void *)expected_user_data); + grpc_jwt_verifier_destroy(&exec_ctx, verifier); grpc_exec_ctx_finish(&exec_ctx); gpr_free(jwt); - grpc_jwt_verifier_destroy(verifier); grpc_httpcli_set_override(NULL, NULL); } @@ -588,9 +588,9 @@ static void test_jwt_verifier_bad_signature(void) { grpc_jwt_verifier_verify(&exec_ctx, verifier, NULL, jwt, expected_audience, on_verification_bad_signature, (void *)expected_user_data); - grpc_exec_ctx_finish(&exec_ctx); gpr_free(jwt); - grpc_jwt_verifier_destroy(verifier); + grpc_jwt_verifier_destroy(&exec_ctx, verifier); + grpc_exec_ctx_finish(&exec_ctx); grpc_httpcli_set_override(NULL, NULL); } @@ -619,8 +619,8 @@ static void test_jwt_verifier_bad_format(void) { grpc_jwt_verifier_verify(&exec_ctx, verifier, NULL, "bad jwt", expected_audience, on_verification_bad_format, (void *)expected_user_data); + grpc_jwt_verifier_destroy(&exec_ctx, verifier); grpc_exec_ctx_finish(&exec_ctx); - grpc_jwt_verifier_destroy(verifier); grpc_httpcli_set_override(NULL, NULL); } diff --git a/test/core/security/verify_jwt.c b/test/core/security/verify_jwt.c index bbd4a67ac1cba1cc80fb86869407ef1a22c91b7c..aaf0e7f6b18b293d74d1d08e279ac26ec49d6d9b 100644 --- a/test/core/security/verify_jwt.c +++ b/test/core/security/verify_jwt.c @@ -123,14 +123,15 @@ int main(int argc, char **argv) { gpr_inf_future(GPR_CLOCK_MONOTONIC)))) sync.is_done = true; gpr_mu_unlock(sync.mu); - grpc_exec_ctx_finish(&exec_ctx); + grpc_exec_ctx_flush(&exec_ctx); gpr_mu_lock(sync.mu); } gpr_mu_unlock(sync.mu); gpr_free(sync.pollset); - grpc_jwt_verifier_destroy(verifier); + grpc_jwt_verifier_destroy(&exec_ctx, verifier); + grpc_exec_ctx_finish(&exec_ctx); gpr_cmdline_destroy(cl); grpc_shutdown(); return !sync.success; diff --git a/test/core/transport/connectivity_state_test.c b/test/core/transport/connectivity_state_test.c index 3520ef0a80de24fab332da568e507f11e3954fcc..8314a5f6190a4e3ef1babaaa929a621a0e5dbf33 100644 --- a/test/core/transport/connectivity_state_test.c +++ b/test/core/transport/connectivity_state_test.c @@ -77,8 +77,9 @@ static void test_check(void) { grpc_error *error; gpr_log(GPR_DEBUG, "test_check"); grpc_connectivity_state_init(&tracker, GRPC_CHANNEL_IDLE, "xxx"); - GPR_ASSERT(grpc_connectivity_state_check(&tracker, &error) == + GPR_ASSERT(grpc_connectivity_state_get(&tracker, &error) == GRPC_CHANNEL_IDLE); + GPR_ASSERT(grpc_connectivity_state_check(&tracker) == GRPC_CHANNEL_IDLE); GPR_ASSERT(error == GRPC_ERROR_NONE); grpc_connectivity_state_destroy(&exec_ctx, &tracker); grpc_exec_ctx_finish(&exec_ctx); diff --git a/test/core/util/port_server_client.c b/test/core/util/port_server_client.c index 6d722ffc882024a0cc57482bd3dbc30081db5c59..7b733ab9c7d9f254e3174ac4543ae506b0cb662b 100644 --- a/test/core/util/port_server_client.c +++ b/test/core/util/port_server_client.c @@ -121,7 +121,7 @@ void grpc_free_port_using_server(char *server, int port) { } gpr_mu_unlock(pr.mu); - grpc_httpcli_context_destroy(&context); + grpc_httpcli_context_destroy(&exec_ctx, &context); grpc_exec_ctx_finish(&exec_ctx); grpc_pollset_shutdown(&exec_ctx, grpc_polling_entity_pollset(&pr.pops), shutdown_closure); @@ -245,7 +245,7 @@ int grpc_pick_port_using_server(char *server) { gpr_mu_unlock(pr.mu); grpc_http_response_destroy(&pr.response); - grpc_httpcli_context_destroy(&context); + grpc_httpcli_context_destroy(&exec_ctx, &context); grpc_pollset_shutdown(&exec_ctx, grpc_polling_entity_pollset(&pr.pops), shutdown_closure); grpc_exec_ctx_finish(&exec_ctx); diff --git a/test/cpp/interop/client.cc b/test/cpp/interop/client.cc index 8a00b61cef13ba00d1cc308c61070ea792911165..5688ab7971622238258a7dffa5626b1b26cec1b8 100644 --- a/test/cpp/interop/client.cc +++ b/test/cpp/interop/client.cc @@ -51,7 +51,7 @@ DEFINE_bool(use_tls, false, "Whether to use tls."); DEFINE_string(custom_credentials_type, "", "User provided credentials type."); DEFINE_bool(use_test_ca, false, "False to use SSL roots for google"); DEFINE_int32(server_port, 0, "Server port."); -DEFINE_string(server_host, "127.0.0.1", "Server host to connect to"); +DEFINE_string(server_host, "localhost", "Server host to connect to"); DEFINE_string(server_host_override, "foo.test.google.fr", "Override the server host which is sent in HTTP header"); DEFINE_string( diff --git a/test/cpp/interop/http2_client.cc b/test/cpp/interop/http2_client.cc index 38aee43b26a947b9ed39b9e071e3d8ba194b6973..b96e9fac36e6b8c5aad4e50e1d0b769981f42bb5 100644 --- a/test/cpp/interop/http2_client.cc +++ b/test/cpp/interop/http2_client.cc @@ -223,7 +223,7 @@ bool Http2Client::DoMaxStreams() { } // namespace grpc DEFINE_int32(server_port, 0, "Server port."); -DEFINE_string(server_host, "127.0.0.1", "Server host to connect to"); +DEFINE_string(server_host, "localhost", "Server host to connect to"); DEFINE_string(test_case, "rst_after_header", "Configure different test cases. Valid options are:\n\n" "goaway\n" diff --git a/test/cpp/interop/reconnect_interop_client.cc b/test/cpp/interop/reconnect_interop_client.cc index 797e52c744f2ce3a5a5189bf5795652645eb52c0..1c2f6066377b1c652fe0e7be41722d3801b29c5f 100644 --- a/test/cpp/interop/reconnect_interop_client.cc +++ b/test/cpp/interop/reconnect_interop_client.cc @@ -48,7 +48,7 @@ DEFINE_int32(server_control_port, 0, "Server port for control rpcs."); DEFINE_int32(server_retry_port, 0, "Server port for testing reconnection."); -DEFINE_string(server_host, "127.0.0.1", "Server host to connect to"); +DEFINE_string(server_host, "localhost", "Server host to connect to"); DEFINE_int32(max_reconnect_backoff_ms, 0, "Maximum backoff time, or 0 for default."); diff --git a/third_party/nanopb/BUILD b/third_party/nanopb/BUILD index 570988435f1441699553cf6c6f793b7cd200b561..f9fc57f50aa00d98557f9f667e3870be8ba10916 100644 --- a/third_party/nanopb/BUILD +++ b/third_party/nanopb/BUILD @@ -1,4 +1,7 @@ licenses(["notice"]) + +exports_files(["LICENSE.txt"]) + package(default_visibility = ["//visibility:public"]) cc_library( diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py index 3d9b538ca776c43ec703147ab2e585e247be774e..cfc2b04955303d633ff17c74152e4bef68328e0c 100755 --- a/tools/run_tests/run_tests.py +++ b/tools/run_tests/run_tests.py @@ -1099,6 +1099,18 @@ def runs_per_test_type(arg_str): raise argparse.ArgumentTypeError(msg) +def percent_type(arg_str): + pct = float(arg_str) + if pct > 100 or pct < 0: + raise argparse.ArgumentTypeError( + "'%f' is not a valid percentage in the [0, 100] range" % pct) + return pct + +# This is math.isclose in python >= 3.5 +def isclose(a, b, rel_tol=1e-09, abs_tol=0.0): + return abs(a-b) <= max(rel_tol * max(abs(a), abs(b)), abs_tol) + + # parse command line argp = argparse.ArgumentParser(description='Run grpc tests.') argp.add_argument('-c', '--config', @@ -1111,6 +1123,8 @@ argp.add_argument('-r', '--regex', default='.*', type=str) argp.add_argument('--regex_exclude', default='', type=str) argp.add_argument('-j', '--jobs', default=multiprocessing.cpu_count(), type=int) argp.add_argument('-s', '--slowdown', default=1.0, type=float) +argp.add_argument('-p', '--sample_percent', default=100.0, type=percent_type, + help='Run a random sample with that percentage of tests') argp.add_argument('-f', '--forever', default=False, action='store_const', @@ -1443,8 +1457,18 @@ def _build_and_run( else: # whereas otherwise, we want to shuffle things up to give all tests a # chance to run. - massaged_one_run = list(one_run) # random.shuffle needs an indexable seq. - random.shuffle(massaged_one_run) # which it modifies in-place. + massaged_one_run = list(one_run) # random.sample needs an indexable seq. + num_jobs = len(massaged_one_run) + # for a random sample, get as many as indicated by the 'sample_percent' + # argument. By default this arg is 100, resulting in a shuffle of all + # jobs. + sample_size = int(num_jobs * args.sample_percent/100.0) + massaged_one_run = random.sample(massaged_one_run, sample_size) + if not isclose(args.sample_percent, 100.0): + print("Running %d tests out of %d (~%d%%)" % + (sample_size, num_jobs, args.sample_percent)) + else: + assert args.runs_per_test == 1, "Can't do sampling (-p) over multiple runs (-n)." if infinite_runs: assert len(massaged_one_run) > 0, 'Must have at least one test for a -n inf run' runs_sequence = (itertools.repeat(massaged_one_run) if infinite_runs @@ -1455,7 +1479,7 @@ def _build_and_run( jobset.message('START', 'Running tests quietly, only failing tests will be reported', do_newline=True) num_test_failures, resultset = jobset.run( all_runs, check_cancelled, newline_on_success=newline_on_success, - travis=args.travis, infinite_runs=infinite_runs, maxjobs=args.jobs, + travis=args.travis, maxjobs=args.jobs, stop_on_failure=args.stop_on_failure, add_env={'GRPC_TEST_PORT_SERVER': 'localhost:%d' % port_server_port}, quiet_success=args.quiet_success)