Skip to content
Snippets Groups Projects
Commit 2b78f039 authored by Craig Tiller's avatar Craig Tiller
Browse files

Merge branch 'timer_pool' into uberpoll

parents ce651a21 6b82f8c4
No related branches found
No related tags found
No related merge requests found
...@@ -37,6 +37,7 @@ ...@@ -37,6 +37,7 @@
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include <grpc/support/thd.h> #include <grpc/support/thd.h>
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/timer.h"
typedef struct completed_thread { typedef struct completed_thread {
...@@ -44,6 +45,8 @@ typedef struct completed_thread { ...@@ -44,6 +45,8 @@ typedef struct completed_thread {
struct completed_thread *next; struct completed_thread *next;
} completed_thread; } completed_thread;
extern grpc_tracer_flag grpc_timer_check_trace;
// global mutex // global mutex
static gpr_mu g_mu; static gpr_mu g_mu;
// are we multi-threaded // are we multi-threaded
...@@ -83,10 +86,13 @@ static void gc_completed_threads(void) { ...@@ -83,10 +86,13 @@ static void gc_completed_threads(void) {
} }
static void start_timer_thread_and_unlock(void) { static void start_timer_thread_and_unlock(void) {
GPR_ASSERT(g_threaded);
++g_waiter_count; ++g_waiter_count;
++g_thread_count; ++g_thread_count;
gpr_mu_unlock(&g_mu); gpr_mu_unlock(&g_mu);
gpr_log(GPR_DEBUG, "Spawn timer thread"); if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
gpr_log(GPR_DEBUG, "Spawn timer thread");
}
gpr_thd_id thd; gpr_thd_id thd;
gpr_thd_options opt = gpr_thd_options_default(); gpr_thd_options opt = gpr_thd_options_default();
gpr_thd_options_set_joinable(&opt); gpr_thd_options_set_joinable(&opt);
...@@ -122,7 +128,9 @@ static void timer_thread(void *unused) { ...@@ -122,7 +128,9 @@ static void timer_thread(void *unused) {
// if there's no thread waiting with a timeout, kick an existing waiter // if there's no thread waiting with a timeout, kick an existing waiter
// so that the next deadline is not missed // so that the next deadline is not missed
if (!g_has_timed_waiter) { if (!g_has_timed_waiter) {
gpr_log(GPR_DEBUG, "kick untimed waiter"); if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
gpr_log(GPR_DEBUG, "kick untimed waiter");
}
gpr_cv_signal(&g_cv_wait); gpr_cv_signal(&g_cv_wait);
} }
gpr_mu_unlock(&g_mu); gpr_mu_unlock(&g_mu);
...@@ -149,15 +157,21 @@ static void timer_thread(void *unused) { ...@@ -149,15 +157,21 @@ static void timer_thread(void *unused) {
// cancel an existing one quickly (and when it actually times out it'll // cancel an existing one quickly (and when it actually times out it'll
// figure stuff out instead of incurring a wakeup) // figure stuff out instead of incurring a wakeup)
my_timed_waiter_generation = ++g_timed_waiter_generation; my_timed_waiter_generation = ++g_timed_waiter_generation;
gpr_log(GPR_DEBUG, "sleep for a while"); if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
gpr_log(GPR_DEBUG, "sleep for a while");
}
} else { } else {
next = inf_future; next = inf_future;
gpr_log(GPR_DEBUG, "sleep until kicked"); if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
gpr_log(GPR_DEBUG, "sleep until kicked");
}
} }
gpr_cv_wait(&g_cv_wait, &g_mu, next); gpr_cv_wait(&g_cv_wait, &g_mu, next);
gpr_log(GPR_DEBUG, "wait ended: was_timed:%d kicked:%d", if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
my_timed_waiter_generation == g_timed_waiter_generation, gpr_log(GPR_DEBUG, "wait ended: was_timed:%d kicked:%d",
g_kicked); my_timed_waiter_generation == g_timed_waiter_generation,
g_kicked);
}
// if this was the timed waiter, then we need to check timers, and flag // if this was the timed waiter, then we need to check timers, and flag
// that there's now no timed waiter... we'll look for a replacement if // that there's now no timed waiter... we'll look for a replacement if
// there's work to do after checking timers (code above) // there's work to do after checking timers (code above)
...@@ -170,8 +184,8 @@ static void timer_thread(void *unused) { ...@@ -170,8 +184,8 @@ static void timer_thread(void *unused) {
grpc_timer_consume_kick(); grpc_timer_consume_kick();
g_kicked = false; g_kicked = false;
} }
gpr_mu_unlock(&g_mu);
} }
gpr_mu_unlock(&g_mu);
} }
// terminate the thread: drop the waiter count, thread count, and let whomever // terminate the thread: drop the waiter count, thread count, and let whomever
// stopped the threading stuff know that we're done // stopped the threading stuff know that we're done
...@@ -186,7 +200,9 @@ static void timer_thread(void *unused) { ...@@ -186,7 +200,9 @@ static void timer_thread(void *unused) {
g_completed_threads = ct; g_completed_threads = ct;
gpr_mu_unlock(&g_mu); gpr_mu_unlock(&g_mu);
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx);
gpr_log(GPR_DEBUG, "End timer thread"); if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
gpr_log(GPR_DEBUG, "End timer thread");
}
} }
static void start_threads(void) { static void start_threads(void) {
...@@ -214,11 +230,20 @@ void grpc_timer_manager_init(void) { ...@@ -214,11 +230,20 @@ void grpc_timer_manager_init(void) {
static void stop_threads(void) { static void stop_threads(void) {
gpr_mu_lock(&g_mu); gpr_mu_lock(&g_mu);
if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
gpr_log(GPR_DEBUG, "stop timer threads: threaded=%d", g_threaded);
}
if (g_threaded) { if (g_threaded) {
g_threaded = false; g_threaded = false;
gpr_cv_broadcast(&g_cv_wait); gpr_cv_broadcast(&g_cv_wait);
if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
gpr_log(GPR_DEBUG, "num timer threads: %d", g_thread_count);
}
while (g_thread_count > 0) { while (g_thread_count > 0) {
gpr_cv_wait(&g_cv_shutdown, &g_mu, gpr_inf_future(GPR_CLOCK_REALTIME)); gpr_cv_wait(&g_cv_shutdown, &g_mu, gpr_inf_future(GPR_CLOCK_REALTIME));
if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
gpr_log(GPR_DEBUG, "num timer threads: %d", g_thread_count);
}
gc_completed_threads(); gc_completed_threads();
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment