diff --git a/src/core/lib/channel/deadline_filter.c b/src/core/lib/channel/deadline_filter.c index fbc858bccc8879be99fcc70294b6edc157e4e024..8a8c3cb830494439cb65c26ef9e76b0c4acb7253 100644 --- a/src/core/lib/channel/deadline_filter.c +++ b/src/core/lib/channel/deadline_filter.c @@ -73,45 +73,48 @@ static void start_timer_if_needed(grpc_exec_ctx* exec_ctx, return; } grpc_deadline_state* deadline_state = elem->call_data; - for (size_t i = 0; i < GPR_ARRAY_SIZE(deadline_state->timers); i++) { - if (gpr_atm_acq_load(&deadline_state->timers[i]) == 0) { - grpc_deadline_timer* timer = (i == 0 ? &deadline_state->inlined_timer - : gpr_malloc(sizeof(*timer))); - if (gpr_atm_rel_cas(&deadline_state->timers[i], 0, (gpr_atm)timer)) { - grpc_timer_init( - exec_ctx, &timer->timer, deadline, - grpc_closure_init(&timer->timer_callback, timer_callback, elem, - grpc_schedule_on_exec_ctx), - gpr_now(GPR_CLOCK_MONOTONIC)); - } else if (i != 0) { - gpr_free(timer); + grpc_deadline_timer_state cur_state; + grpc_closure* closure = NULL; +retry: + cur_state = + (grpc_deadline_timer_state)gpr_atm_acq_load(&deadline_state->timer_state); + switch (cur_state) { + case GRPC_DEADLINE_STATE_PENDING: + return; + case GRPC_DEADLINE_STATE_FINISHED: + if (gpr_atm_rel_cas(&deadline_state->timer_state, + GRPC_DEADLINE_STATE_FINISHED, + GRPC_DEADLINE_STATE_PENDING)) { + closure = grpc_closure_create(timer_callback, elem, + grpc_schedule_on_exec_ctx); + } else { + goto retry; } - } + break; + case GRPC_DEADLINE_STATE_INITIAL: + if (gpr_atm_rel_cas(&deadline_state->timer_state, + GRPC_DEADLINE_STATE_INITIAL, + GRPC_DEADLINE_STATE_PENDING)) { + closure = + grpc_closure_init(&deadline_state->timer_callback, timer_callback, + elem, grpc_schedule_on_exec_ctx); + } else { + goto retry; + } + break; } + GPR_ASSERT(closure); + grpc_timer_init(exec_ctx, &deadline_state->timer, deadline, closure, + gpr_now(GPR_CLOCK_MONOTONIC)); GPR_UNREACHABLE_CODE(return;); } // Cancels the deadline timer. static void cancel_timer_if_needed(grpc_exec_ctx* exec_ctx, grpc_deadline_state* deadline_state) { - for (size_t i = 0; i < GPR_ARRAY_SIZE(deadline_state->timers); i++) { - gpr_atm timer_val; - timer_val = gpr_atm_acq_load(&deadline_state->timers[i]); - switch (timer_val) { - case 0: - return; - case TOMBSTONE_TIMER: - break; - default: - if (!gpr_atm_rel_cas(&deadline_state->timers[i], timer_val, - TOMBSTONE_TIMER)) { - break; // must have become a tombstone - } - grpc_deadline_timer* timer = (grpc_deadline_timer*)timer_val; - grpc_timer_cancel(exec_ctx, &timer->timer); - if (i != 0) gpr_free(timer); - break; - } + if (gpr_atm_acq_load(&deadline_state->timer_state) != + GRPC_DEADLINE_STATE_INITIAL) { + grpc_timer_cancel(exec_ctx, &deadline_state->timer); } } @@ -120,8 +123,8 @@ static void on_complete(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { grpc_deadline_state* deadline_state = arg; cancel_timer_if_needed(exec_ctx, deadline_state); // Invoke the next callback. - deadline_state->next_on_complete->cb( - exec_ctx, deadline_state->next_on_complete->cb_arg, error); + grpc_closure_run(exec_ctx, deadline_state->next_on_complete, + GRPC_ERROR_REF(error)); } // Inject our own on_complete callback into op. diff --git a/src/core/lib/channel/deadline_filter.h b/src/core/lib/channel/deadline_filter.h index 45d524a830290c5e1714db315290ea6787ddaab7..94717f6bc70a33c03d1f7cec327c105f54e2a774 100644 --- a/src/core/lib/channel/deadline_filter.h +++ b/src/core/lib/channel/deadline_filter.h @@ -35,21 +35,20 @@ #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/iomgr/timer.h" -typedef struct grpc_deadline_timer { - grpc_timer timer; - grpc_closure timer_callback; -} grpc_deadline_timer; +typedef enum grpc_deadline_timer_state { + GRPC_DEADLINE_STATE_INITIAL, + GRPC_DEADLINE_STATE_PENDING, + GRPC_DEADLINE_STATE_FINISHED +} grpc_deadline_timer_state; // State used for filters that enforce call deadlines. // Must be the first field in the filter's call_data. typedef struct grpc_deadline_state { // We take a reference to the call stack for the timer callback. grpc_call_stack* call_stack; - // We allow an initial timer and one reset.. these atomics point to the - // grpc_deadline_timer instances - gpr_atm timers[2]; - // Pre-allocated initial timer. - grpc_deadline_timer inlined_timer; + gpr_atm timer_state; + grpc_timer timer; + grpc_closure timer_callback; // Closure to invoke when the call is complete. // We use this to cancel the timer. grpc_closure on_complete; diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c index 40c83514724c61daef6521ee66540891236bb151..45a32f10da0bf6148cf05316c4ddeab8b114d247 100644 --- a/src/core/lib/iomgr/timer_generic.c +++ b/src/core/lib/iomgr/timer_generic.c @@ -186,25 +186,25 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, GPR_ASSERT(now.clock_type == g_clock_type); timer->closure = closure; timer->deadline = deadline; - timer->triggered = 0; if (!g_initialized) { - timer->triggered = 1; + timer->pending = false; grpc_closure_sched( exec_ctx, timer->closure, GRPC_ERROR_CREATE("Attempt to create timer before initialization")); return; } + gpr_mu_lock(&shard->mu); + timer->pending = true; if (gpr_time_cmp(deadline, now) <= 0) { - timer->triggered = 1; + timer->pending = false; grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_NONE); + gpr_mu_unlock(&shard->mu); + /* early out */ return; } - /* TODO(ctiller): check deadline expired */ - - gpr_mu_lock(&shard->mu); grpc_time_averaged_stats_add_sample(&shard->stats, ts_to_dbl(gpr_time_sub(deadline, now))); if (gpr_time_cmp(deadline, shard->queue_deadline_cap) < 0) { @@ -249,9 +249,9 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { shard_type *shard = &g_shards[shard_idx(timer)]; gpr_mu_lock(&shard->mu); - if (!timer->triggered) { + if (timer->pending) { grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_CANCELLED); - timer->triggered = 1; + timer->pending = false; if (timer->heap_index == INVALID_HEAP_INDEX) { list_remove(timer); } else { @@ -302,7 +302,7 @@ static grpc_timer *pop_one(shard_type *shard, gpr_timespec now) { } timer = grpc_timer_heap_top(&shard->heap); if (gpr_time_cmp(timer->deadline, now) > 0) return NULL; - timer->triggered = 1; + timer->pending = false; grpc_timer_heap_pop(&shard->heap); return timer; } diff --git a/src/core/lib/iomgr/timer_generic.h b/src/core/lib/iomgr/timer_generic.h index 9d901c7e68ff96a272f7d723f052926336cc7663..1608dce9fb1058bf0dadc414538c8a724568e016 100644 --- a/src/core/lib/iomgr/timer_generic.h +++ b/src/core/lib/iomgr/timer_generic.h @@ -40,7 +40,7 @@ struct grpc_timer { gpr_timespec deadline; uint32_t heap_index; /* INVALID_HEAP_INDEX if not in heap */ - int triggered; + bool pending; struct grpc_timer *next; struct grpc_timer *prev; grpc_closure *closure; diff --git a/src/core/lib/iomgr/timer_uv.c b/src/core/lib/iomgr/timer_uv.c index fa2cdee9646afae7af224baa769e5665c6cf0a13..f28a14405dbb4e18537e53adbf302df6f1befabc 100644 --- a/src/core/lib/iomgr/timer_uv.c +++ b/src/core/lib/iomgr/timer_uv.c @@ -53,8 +53,8 @@ static void stop_uv_timer(uv_timer_t *handle) { void run_expired_timer(uv_timer_t *handle) { grpc_timer *timer = (grpc_timer *)handle->data; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - GPR_ASSERT(!timer->triggered); - timer->triggered = 1; + GPR_ASSERT(timer->pending); + timer->pending = 0; grpc_closure_sched(&exec_ctx, timer->closure, GRPC_ERROR_NONE); stop_uv_timer(handle); grpc_exec_ctx_finish(&exec_ctx); @@ -67,11 +67,11 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, uv_timer_t *uv_timer; timer->closure = closure; if (gpr_time_cmp(deadline, now) <= 0) { - timer->triggered = 1; + timer->pending = 0; grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_NONE); return; } - timer->triggered = 0; + timer->pending = 1; timeout = (uint64_t)gpr_time_to_millis(gpr_time_sub(deadline, now)); uv_timer = gpr_malloc(sizeof(uv_timer_t)); uv_timer_init(uv_default_loop(), uv_timer); @@ -81,8 +81,8 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, } void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { - if (!timer->triggered) { - timer->triggered = 1; + if (timer->pending) { + timer->pending = 0; grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_CANCELLED); stop_uv_timer((uv_timer_t *)timer->uv_timer); } diff --git a/src/core/lib/iomgr/timer_uv.h b/src/core/lib/iomgr/timer_uv.h index 13cf8bd4fa0f593876a314f4920711013d712970..9870cd4a5ca2f10f2ad19a796bf3807875495657 100644 --- a/src/core/lib/iomgr/timer_uv.h +++ b/src/core/lib/iomgr/timer_uv.h @@ -41,7 +41,7 @@ struct grpc_timer { /* This is actually a uv_timer_t*, but we want to keep platform-specific types out of headers */ void *uv_timer; - int triggered; + int pending; }; #endif /* GRPC_CORE_LIB_IOMGR_TIMER_UV_H */