Skip to content
Snippets Groups Projects
Commit e9f6eac1 authored by Sree Kuchibhotla's avatar Sree Kuchibhotla Committed by GitHub
Browse files

Merge pull request #11684 from sreecha/timer_manager_fix

timer_manager fix 
parents 17bf3e14 65dd99fe
No related branches found
No related tags found
No related merge requests found
...@@ -50,6 +50,9 @@ static completed_thread *g_completed_threads; ...@@ -50,6 +50,9 @@ static completed_thread *g_completed_threads;
static bool g_kicked; static bool g_kicked;
// is there a thread waiting until the next timer should fire? // is there a thread waiting until the next timer should fire?
static bool g_has_timed_waiter; static bool g_has_timed_waiter;
// the deadline of the current timed waiter thread (only relevant if
// g_has_timed_waiter is true)
static gpr_timespec g_timed_waiter_deadline;
// generation counter to track which thread is waiting for the next timer // generation counter to track which thread is waiting for the next timer
static uint64_t g_timed_waiter_generation; static uint64_t g_timed_waiter_generation;
...@@ -101,8 +104,7 @@ static void run_some_timers(grpc_exec_ctx *exec_ctx) { ...@@ -101,8 +104,7 @@ static void run_some_timers(grpc_exec_ctx *exec_ctx) {
start_timer_thread_and_unlock(); start_timer_thread_and_unlock();
} else { } else {
// if there's no thread waiting with a timeout, kick an existing // if there's no thread waiting with a timeout, kick an existing
// waiter // waiter so that the next deadline is not missed
// so that the next deadline is not missed
if (!g_has_timed_waiter) { if (!g_has_timed_waiter) {
if (GRPC_TRACER_ON(grpc_timer_check_trace)) { if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
gpr_log(GPR_DEBUG, "kick untimed waiter"); gpr_log(GPR_DEBUG, "kick untimed waiter");
...@@ -132,44 +134,79 @@ static bool wait_until(gpr_timespec next) { ...@@ -132,44 +134,79 @@ static bool wait_until(gpr_timespec next) {
gpr_mu_unlock(&g_mu); gpr_mu_unlock(&g_mu);
return false; return false;
} }
// if there's no timed waiter, we should become one: that waiter waits
// only until the next timer should expire // If g_kicked is true at this point, it means there was a kick from the timer
// all other timers wait forever // system that the timer-manager threads here missed. We cannot trust 'next'
uint64_t my_timed_waiter_generation = g_timed_waiter_generation - 1; // here any longer (since there might be an earlier deadline). So if g_kicked
if (!g_has_timed_waiter && gpr_time_cmp(next, inf_future) != 0) { // is true at this point, we should quickly exit this and get the next
g_has_timed_waiter = true; // deadline from the timer system
// we use a generation counter to track the timed waiter so we can
// cancel an existing one quickly (and when it actually times out it'll if (!g_kicked) {
// figure stuff out instead of incurring a wakeup) // if there's no timed waiter, we should become one: that waiter waits
my_timed_waiter_generation = ++g_timed_waiter_generation; // only until the next timer should expire. All other timers wait forever
if (GRPC_TRACER_ON(grpc_timer_check_trace)) { //
gpr_timespec wait_time = gpr_time_sub(next, gpr_now(GPR_CLOCK_MONOTONIC)); // 'g_timed_waiter_generation' is a global generation counter. The idea here
gpr_log(GPR_DEBUG, "sleep for a %" PRId64 ".%09d seconds", // is that the thread becoming a timed-waiter increments and stores this
wait_time.tv_sec, wait_time.tv_nsec); // global counter locally in 'my_timed_waiter_generation' before going to
// sleep. After waking up, if my_timed_waiter_generation ==
// g_timed_waiter_generation, it can be sure that it was the timed_waiter
// thread (and that no other thread took over while this was asleep)
//
// Initialize my_timed_waiter_generation to some value that is NOT equal to
// g_timed_waiter_generation
uint64_t my_timed_waiter_generation = g_timed_waiter_generation - 1;
/* If there's no timed waiter, we should become one: that waiter waits only
until the next timer should expire. All other timer threads wait forever
unless their 'next' is earlier than the current timed-waiter's deadline
(in which case the thread with earlier 'next' takes over as the new timed
waiter) */
if (gpr_time_cmp(next, inf_future) != 0) {
if (!g_has_timed_waiter ||
(gpr_time_cmp(next, g_timed_waiter_deadline) < 0)) {
my_timed_waiter_generation = ++g_timed_waiter_generation;
g_has_timed_waiter = true;
g_timed_waiter_deadline = next;
if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
gpr_timespec wait_time =
gpr_time_sub(next, gpr_now(GPR_CLOCK_MONOTONIC));
gpr_log(GPR_DEBUG, "sleep for a %" PRId64 ".%09d seconds",
wait_time.tv_sec, wait_time.tv_nsec);
}
} else { // g_timed_waiter == true && next >= g_timed_waiter_deadline
next = inf_future;
}
} }
} else {
next = inf_future; if (GRPC_TRACER_ON(grpc_timer_check_trace) &&
if (GRPC_TRACER_ON(grpc_timer_check_trace)) { gpr_time_cmp(next, inf_future) == 0) {
gpr_log(GPR_DEBUG, "sleep until kicked"); gpr_log(GPR_DEBUG, "sleep until kicked");
} }
gpr_cv_wait(&g_cv_wait, &g_mu, next);
if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
gpr_log(GPR_DEBUG, "wait ended: was_timed:%d kicked:%d",
my_timed_waiter_generation == g_timed_waiter_generation,
g_kicked);
}
// if this was the timed waiter, then we need to check timers, and flag
// that there's now no timed waiter... we'll look for a replacement if
// there's work to do after checking timers (code above)
if (my_timed_waiter_generation == g_timed_waiter_generation) {
g_has_timed_waiter = false;
g_timed_waiter_deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
}
} }
gpr_cv_wait(&g_cv_wait, &g_mu, next);
if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
gpr_log(GPR_DEBUG, "wait ended: was_timed:%d kicked:%d",
my_timed_waiter_generation == g_timed_waiter_generation, g_kicked);
}
// if this was the timed waiter, then we need to check timers, and flag
// that there's now no timed waiter... we'll look for a replacement if
// there's work to do after checking timers (code above)
if (my_timed_waiter_generation == g_timed_waiter_generation) {
g_has_timed_waiter = false;
}
// if this was a kick from the timer system, consume it (and don't stop // if this was a kick from the timer system, consume it (and don't stop
// this thread yet) // this thread yet)
if (g_kicked) { if (g_kicked) {
grpc_timer_consume_kick(); grpc_timer_consume_kick();
g_kicked = false; g_kicked = false;
} }
gpr_mu_unlock(&g_mu); gpr_mu_unlock(&g_mu);
return true; return true;
} }
...@@ -257,6 +294,9 @@ void grpc_timer_manager_init(void) { ...@@ -257,6 +294,9 @@ void grpc_timer_manager_init(void) {
g_waiter_count = 0; g_waiter_count = 0;
g_completed_threads = NULL; g_completed_threads = NULL;
g_has_timed_waiter = false;
g_timed_waiter_deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
start_threads(); start_threads();
} }
...@@ -302,6 +342,7 @@ void grpc_kick_poller(void) { ...@@ -302,6 +342,7 @@ void grpc_kick_poller(void) {
gpr_mu_lock(&g_mu); gpr_mu_lock(&g_mu);
g_kicked = true; g_kicked = true;
g_has_timed_waiter = false; g_has_timed_waiter = false;
g_timed_waiter_deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
++g_timed_waiter_generation; ++g_timed_waiter_generation;
gpr_cv_signal(&g_cv_wait); gpr_cv_signal(&g_cv_wait);
gpr_mu_unlock(&g_mu); gpr_mu_unlock(&g_mu);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment