Skip to content
Snippets Groups Projects
Commit 4374fd3a authored by Craig Tiller's avatar Craig Tiller Committed by GitHub
Browse files

Merge pull request #9750 from ctiller/deadline_mu

Eliminate mutex in deadline_filter
parents 473380e9 7e1d5178
No related branches found
No related tags found
No related merge requests found
......@@ -52,9 +52,6 @@ static void timer_callback(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
grpc_call_element* elem = arg;
grpc_deadline_state* deadline_state = elem->call_data;
gpr_mu_lock(&deadline_state->timer_mu);
deadline_state->timer_pending = false;
gpr_mu_unlock(&deadline_state->timer_mu);
if (error != GRPC_ERROR_CANCELLED) {
grpc_call_element_signal_error(
exec_ctx, elem,
......@@ -66,53 +63,64 @@ static void timer_callback(grpc_exec_ctx* exec_ctx, void* arg,
}
// Starts the deadline timer.
static void start_timer_if_needed_locked(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem,
gpr_timespec deadline) {
grpc_deadline_state* deadline_state = elem->call_data;
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
// Note: We do not start the timer if there is already a timer
// pending. This should be okay, because this is only called from two
// functions exported by this module: grpc_deadline_state_start(), which
// starts the initial timer, and grpc_deadline_state_reset(), which
// cancels any pre-existing timer before starting a new one. In
// particular, we want to ensure that if grpc_deadline_state_start()
// winds up trying to start the timer after grpc_deadline_state_reset()
// has already done so, we ignore the value from the former.
if (!deadline_state->timer_pending &&
gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) {
// Take a reference to the call stack, to be owned by the timer.
GRPC_CALL_STACK_REF(deadline_state->call_stack, "deadline_timer");
deadline_state->timer_pending = true;
grpc_closure_init(&deadline_state->timer_callback, timer_callback, elem,
grpc_schedule_on_exec_ctx);
grpc_timer_init(exec_ctx, &deadline_state->timer, deadline,
&deadline_state->timer_callback,
gpr_now(GPR_CLOCK_MONOTONIC));
}
}
static void start_timer_if_needed(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem,
gpr_timespec deadline) {
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) == 0) {
return;
}
grpc_deadline_state* deadline_state = elem->call_data;
gpr_mu_lock(&deadline_state->timer_mu);
start_timer_if_needed_locked(exec_ctx, elem, deadline);
gpr_mu_unlock(&deadline_state->timer_mu);
grpc_deadline_timer_state cur_state;
grpc_closure* closure = NULL;
retry:
cur_state =
(grpc_deadline_timer_state)gpr_atm_acq_load(&deadline_state->timer_state);
switch (cur_state) {
case GRPC_DEADLINE_STATE_PENDING:
// Note: We do not start the timer if there is already a timer
return;
case GRPC_DEADLINE_STATE_FINISHED:
if (gpr_atm_rel_cas(&deadline_state->timer_state,
GRPC_DEADLINE_STATE_FINISHED,
GRPC_DEADLINE_STATE_PENDING)) {
// If we've already created and destroyed a timer, we always create a
// new closure: we have no other guarantee that the inlined closure is
// not in use (it may hold a pending call to timer_callback)
closure = grpc_closure_create(timer_callback, elem,
grpc_schedule_on_exec_ctx);
} else {
goto retry;
}
break;
case GRPC_DEADLINE_STATE_INITIAL:
if (gpr_atm_rel_cas(&deadline_state->timer_state,
GRPC_DEADLINE_STATE_INITIAL,
GRPC_DEADLINE_STATE_PENDING)) {
closure =
grpc_closure_init(&deadline_state->timer_callback, timer_callback,
elem, grpc_schedule_on_exec_ctx);
} else {
goto retry;
}
break;
}
GPR_ASSERT(closure);
GRPC_CALL_STACK_REF(deadline_state->call_stack, "deadline_timer");
grpc_timer_init(exec_ctx, &deadline_state->timer, deadline, closure,
gpr_now(GPR_CLOCK_MONOTONIC));
}
// Cancels the deadline timer.
static void cancel_timer_if_needed_locked(grpc_exec_ctx* exec_ctx,
grpc_deadline_state* deadline_state) {
if (deadline_state->timer_pending) {
grpc_timer_cancel(exec_ctx, &deadline_state->timer);
deadline_state->timer_pending = false;
}
}
static void cancel_timer_if_needed(grpc_exec_ctx* exec_ctx,
grpc_deadline_state* deadline_state) {
gpr_mu_lock(&deadline_state->timer_mu);
cancel_timer_if_needed_locked(exec_ctx, deadline_state);
gpr_mu_unlock(&deadline_state->timer_mu);
if (gpr_atm_rel_cas(&deadline_state->timer_state, GRPC_DEADLINE_STATE_PENDING,
GRPC_DEADLINE_STATE_FINISHED)) {
grpc_timer_cancel(exec_ctx, &deadline_state->timer);
} else {
// timer was either in STATE_INITAL (nothing to cancel)
// OR in STATE_FINISHED (again nothing to cancel)
}
}
// Callback run when the call is complete.
......@@ -120,8 +128,8 @@ static void on_complete(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
grpc_deadline_state* deadline_state = arg;
cancel_timer_if_needed(exec_ctx, deadline_state);
// Invoke the next callback.
deadline_state->next_on_complete->cb(
exec_ctx, deadline_state->next_on_complete->cb_arg, error);
grpc_closure_run(exec_ctx, deadline_state->next_on_complete,
GRPC_ERROR_REF(error));
}
// Inject our own on_complete callback into op.
......@@ -138,14 +146,12 @@ void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
grpc_deadline_state* deadline_state = elem->call_data;
memset(deadline_state, 0, sizeof(*deadline_state));
deadline_state->call_stack = call_stack;
gpr_mu_init(&deadline_state->timer_mu);
}
void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem) {
grpc_deadline_state* deadline_state = elem->call_data;
cancel_timer_if_needed(exec_ctx, deadline_state);
gpr_mu_destroy(&deadline_state->timer_mu);
}
// Callback and associated state for starting the timer after call stack
......@@ -187,10 +193,8 @@ void grpc_deadline_state_start(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
void grpc_deadline_state_reset(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
gpr_timespec new_deadline) {
grpc_deadline_state* deadline_state = elem->call_data;
gpr_mu_lock(&deadline_state->timer_mu);
cancel_timer_if_needed_locked(exec_ctx, deadline_state);
start_timer_if_needed_locked(exec_ctx, elem, new_deadline);
gpr_mu_unlock(&deadline_state->timer_mu);
cancel_timer_if_needed(exec_ctx, deadline_state);
start_timer_if_needed(exec_ctx, elem, new_deadline);
}
void grpc_deadline_state_client_start_transport_stream_op(
......
......@@ -35,16 +35,18 @@
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/iomgr/timer.h"
typedef enum grpc_deadline_timer_state {
GRPC_DEADLINE_STATE_INITIAL,
GRPC_DEADLINE_STATE_PENDING,
GRPC_DEADLINE_STATE_FINISHED
} grpc_deadline_timer_state;
// State used for filters that enforce call deadlines.
// Must be the first field in the filter's call_data.
typedef struct grpc_deadline_state {
// We take a reference to the call stack for the timer callback.
grpc_call_stack* call_stack;
// Guards access to timer_pending and timer.
gpr_mu timer_mu;
// True if the timer callback is currently pending.
bool timer_pending;
// The deadline timer.
gpr_atm timer_state;
grpc_timer timer;
grpc_closure timer_callback;
// Closure to invoke when the call is complete.
......
......@@ -180,25 +180,25 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
GPR_ASSERT(now.clock_type == g_clock_type);
timer->closure = closure;
timer->deadline = deadline;
timer->triggered = 0;
if (!g_initialized) {
timer->triggered = 1;
timer->pending = false;
grpc_closure_sched(
exec_ctx, timer->closure,
GRPC_ERROR_CREATE("Attempt to create timer before initialization"));
return;
}
gpr_mu_lock(&shard->mu);
timer->pending = true;
if (gpr_time_cmp(deadline, now) <= 0) {
timer->triggered = 1;
timer->pending = false;
grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_NONE);
gpr_mu_unlock(&shard->mu);
/* early out */
return;
}
/* TODO(ctiller): check deadline expired */
gpr_mu_lock(&shard->mu);
grpc_time_averaged_stats_add_sample(&shard->stats,
ts_to_dbl(gpr_time_sub(deadline, now)));
if (gpr_time_cmp(deadline, shard->queue_deadline_cap) < 0) {
......@@ -243,9 +243,9 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) {
shard_type *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)];
gpr_mu_lock(&shard->mu);
if (!timer->triggered) {
if (timer->pending) {
grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_CANCELLED);
timer->triggered = 1;
timer->pending = false;
if (timer->heap_index == INVALID_HEAP_INDEX) {
list_remove(timer);
} else {
......@@ -296,7 +296,7 @@ static grpc_timer *pop_one(shard_type *shard, gpr_timespec now) {
}
timer = grpc_timer_heap_top(&shard->heap);
if (gpr_time_cmp(timer->deadline, now) > 0) return NULL;
timer->triggered = 1;
timer->pending = false;
grpc_timer_heap_pop(&shard->heap);
return timer;
}
......
......@@ -40,7 +40,7 @@
struct grpc_timer {
gpr_timespec deadline;
uint32_t heap_index; /* INVALID_HEAP_INDEX if not in heap */
int triggered;
bool pending;
struct grpc_timer *next;
struct grpc_timer *prev;
grpc_closure *closure;
......
......@@ -53,8 +53,8 @@ static void stop_uv_timer(uv_timer_t *handle) {
void run_expired_timer(uv_timer_t *handle) {
grpc_timer *timer = (grpc_timer *)handle->data;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GPR_ASSERT(!timer->triggered);
timer->triggered = 1;
GPR_ASSERT(timer->pending);
timer->pending = 0;
grpc_closure_sched(&exec_ctx, timer->closure, GRPC_ERROR_NONE);
stop_uv_timer(handle);
grpc_exec_ctx_finish(&exec_ctx);
......@@ -67,11 +67,11 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
uv_timer_t *uv_timer;
timer->closure = closure;
if (gpr_time_cmp(deadline, now) <= 0) {
timer->triggered = 1;
timer->pending = 0;
grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_NONE);
return;
}
timer->triggered = 0;
timer->pending = 1;
timeout = (uint64_t)gpr_time_to_millis(gpr_time_sub(deadline, now));
uv_timer = gpr_malloc(sizeof(uv_timer_t));
uv_timer_init(uv_default_loop(), uv_timer);
......@@ -81,8 +81,8 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
}
void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) {
if (!timer->triggered) {
timer->triggered = 1;
if (timer->pending) {
timer->pending = 0;
grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_CANCELLED);
stop_uv_timer((uv_timer_t *)timer->uv_timer);
}
......
......@@ -41,7 +41,7 @@ struct grpc_timer {
/* This is actually a uv_timer_t*, but we want to keep platform-specific
types out of headers */
void *uv_timer;
int triggered;
int pending;
};
#endif /* GRPC_CORE_LIB_IOMGR_TIMER_UV_H */
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment