Skip to content
Snippets Groups Projects
Commit 14a7f9a2 authored by Craig Tiller's avatar Craig Tiller
Browse files

Reduce wakeups, comment code

parent 9fe1bb1b
No related branches found
No related tags found
No related merge requests found
......@@ -52,8 +52,10 @@ static int g_thread_count;
static int g_waiter_count;
static completed_thread *g_completed_threads;
static bool g_kicked;
static bool g_has_timed_waiter;
static uint64_t g_timed_waiter_generation;
#define MAX_WAITERS 3
#define MAX_WAITERS 2
static void timer_thread(void *unused);
......@@ -92,39 +94,87 @@ void grpc_timer_manager_tick() {
}
static void timer_thread(void *unused) {
// this threads exec_ctx: we try to run things through to completion here
// since it's easy to spin up new threads
grpc_exec_ctx exec_ctx =
GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL);
const gpr_timespec inf_future = gpr_inf_future(GPR_CLOCK_MONOTONIC);
for (;;) {
gpr_timespec next = gpr_inf_future(GPR_CLOCK_MONOTONIC);
gpr_timespec next = inf_future;
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
// check timer state, updates next to the next time to run a check
if (grpc_timer_check(&exec_ctx, now, &next)) {
// if there's something to execute...
gpr_mu_lock(&g_mu);
// remove a waiter from the pool, and start another thread if necessary
--g_waiter_count;
bool start_thread = g_waiter_count == 0;
if (start_thread && g_threaded) {
start_timer_thread_and_unlock();
} else {
// if there's no thread waiting with a timeout, kick an existing waiter
// so that the next deadline is not missed
if (!g_has_timed_waiter) {
gpr_log(GPR_DEBUG, "kick untimed waiter");
gpr_cv_signal(&g_cv_wait);
}
gpr_mu_unlock(&g_mu);
}
// without our lock, flush the exec_ctx
grpc_exec_ctx_flush(&exec_ctx);
gpr_mu_lock(&g_mu);
// garbage collect any threads hanging out that are dead
gc_completed_threads();
// get ready to wait again
++g_waiter_count;
gpr_mu_unlock(&g_mu);
} else {
gpr_mu_lock(&g_mu);
// if we're not threaded anymore, leave
if (!g_threaded) break;
if (gpr_cv_wait(&g_cv_wait, &g_mu, next)) {
if (g_kicked) {
grpc_timer_consume_kick();
g_kicked = false;
} else if (g_waiter_count > MAX_WAITERS) {
break;
}
// if there's no timed waiter, we should become one: that waiter waits
// only until the next timer should expire
// all other timers wait forever
uint64_t my_timed_waiter_generation = g_timed_waiter_generation - 1;
if (!g_has_timed_waiter) {
g_has_timed_waiter = true;
// we use a generation counter to track the timed waiter so we can
// cancel an existing one quickly (and when it actually times out it'll
// figure stuff out instead of incurring a wakeup)
my_timed_waiter_generation = ++g_timed_waiter_generation;
gpr_log(GPR_DEBUG, "sleep for a while");
} else {
next = inf_future;
gpr_log(GPR_DEBUG, "sleep until kicked");
}
bool timed_out = gpr_cv_wait(&g_cv_wait, &g_mu, next);
// if we timed out and we have too many waiters, maybe exit this thread
bool should_stop = (timed_out && g_waiter_count > MAX_WAITERS);
gpr_log(GPR_DEBUG, "wait ended: was_timed:%d timed_out:%d kicked:%d",
my_timed_waiter_generation == g_timed_waiter_generation,
timed_out, g_kicked);
// if this was the timed waiter, then we need to check timers, and flag
// that there's now no timed waiter... we'll look for a replacement if
// there's work to do after checking timers (code above)
if (my_timed_waiter_generation == g_timed_waiter_generation) {
g_has_timed_waiter = false;
should_stop = false;
}
// if this was a kick from the timer system, consume it (and don't stop
// this thread yet)
if (g_kicked) {
grpc_timer_consume_kick();
g_kicked = false;
should_stop = false;
}
if (should_stop) {
break;
}
gpr_mu_unlock(&g_mu);
}
gpr_mu_unlock(&g_mu);
}
// terminate the thread: drop the waiter count, thread count, and let whomever
// stopped the threading stuff know that we're done
--g_waiter_count;
--g_thread_count;
if (0 == g_thread_count) {
......@@ -135,6 +185,7 @@ static void timer_thread(void *unused) {
ct->next = g_completed_threads;
g_completed_threads = ct;
gpr_mu_unlock(&g_mu);
grpc_exec_ctx_finish(&exec_ctx);
gpr_log(GPR_DEBUG, "End timer thread");
}
......@@ -193,6 +244,8 @@ void grpc_timer_manager_set_threading(bool threaded) {
void grpc_kick_poller(void) {
gpr_mu_lock(&g_mu);
g_kicked = true;
g_has_timed_waiter = false;
++g_timed_waiter_generation;
gpr_cv_signal(&g_cv_wait);
gpr_mu_unlock(&g_mu);
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment