Skip to content
Snippets Groups Projects
Commit 6b82f8c4 authored by Craig Tiller's avatar Craig Tiller
Browse files

Fix misplaced unlock, cleanup tracing

parent 25a01850
No related branches found
No related tags found
No related merge requests found
...@@ -37,6 +37,7 @@ ...@@ -37,6 +37,7 @@
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include <grpc/support/thd.h> #include <grpc/support/thd.h>
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/timer.h"
typedef struct completed_thread { typedef struct completed_thread {
...@@ -44,6 +45,8 @@ typedef struct completed_thread { ...@@ -44,6 +45,8 @@ typedef struct completed_thread {
struct completed_thread *next; struct completed_thread *next;
} completed_thread; } completed_thread;
extern grpc_tracer_flag grpc_timer_check_trace;
// global mutex // global mutex
static gpr_mu g_mu; static gpr_mu g_mu;
// are we multi-threaded // are we multi-threaded
...@@ -83,10 +86,13 @@ static void gc_completed_threads(void) { ...@@ -83,10 +86,13 @@ static void gc_completed_threads(void) {
} }
static void start_timer_thread_and_unlock(void) { static void start_timer_thread_and_unlock(void) {
GPR_ASSERT(g_threaded);
++g_waiter_count; ++g_waiter_count;
++g_thread_count; ++g_thread_count;
gpr_mu_unlock(&g_mu); gpr_mu_unlock(&g_mu);
gpr_log(GPR_DEBUG, "Spawn timer thread"); if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
gpr_log(GPR_DEBUG, "Spawn timer thread");
}
gpr_thd_id thd; gpr_thd_id thd;
gpr_thd_options opt = gpr_thd_options_default(); gpr_thd_options opt = gpr_thd_options_default();
gpr_thd_options_set_joinable(&opt); gpr_thd_options_set_joinable(&opt);
...@@ -122,7 +128,9 @@ static void timer_thread(void *unused) { ...@@ -122,7 +128,9 @@ static void timer_thread(void *unused) {
// if there's no thread waiting with a timeout, kick an existing waiter // if there's no thread waiting with a timeout, kick an existing waiter
// so that the next deadline is not missed // so that the next deadline is not missed
if (!g_has_timed_waiter) { if (!g_has_timed_waiter) {
gpr_log(GPR_DEBUG, "kick untimed waiter"); if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
gpr_log(GPR_DEBUG, "kick untimed waiter");
}
gpr_cv_signal(&g_cv_wait); gpr_cv_signal(&g_cv_wait);
} }
gpr_mu_unlock(&g_mu); gpr_mu_unlock(&g_mu);
...@@ -149,15 +157,21 @@ static void timer_thread(void *unused) { ...@@ -149,15 +157,21 @@ static void timer_thread(void *unused) {
// cancel an existing one quickly (and when it actually times out it'll // cancel an existing one quickly (and when it actually times out it'll
// figure stuff out instead of incurring a wakeup) // figure stuff out instead of incurring a wakeup)
my_timed_waiter_generation = ++g_timed_waiter_generation; my_timed_waiter_generation = ++g_timed_waiter_generation;
gpr_log(GPR_DEBUG, "sleep for a while"); if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
gpr_log(GPR_DEBUG, "sleep for a while");
}
} else { } else {
next = inf_future; next = inf_future;
gpr_log(GPR_DEBUG, "sleep until kicked"); if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
gpr_log(GPR_DEBUG, "sleep until kicked");
}
} }
gpr_cv_wait(&g_cv_wait, &g_mu, next); gpr_cv_wait(&g_cv_wait, &g_mu, next);
gpr_log(GPR_DEBUG, "wait ended: was_timed:%d kicked:%d", if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
my_timed_waiter_generation == g_timed_waiter_generation, gpr_log(GPR_DEBUG, "wait ended: was_timed:%d kicked:%d",
g_kicked); my_timed_waiter_generation == g_timed_waiter_generation,
g_kicked);
}
// if this was the timed waiter, then we need to check timers, and flag // if this was the timed waiter, then we need to check timers, and flag
// that there's now no timed waiter... we'll look for a replacement if // that there's now no timed waiter... we'll look for a replacement if
// there's work to do after checking timers (code above) // there's work to do after checking timers (code above)
...@@ -170,8 +184,8 @@ static void timer_thread(void *unused) { ...@@ -170,8 +184,8 @@ static void timer_thread(void *unused) {
grpc_timer_consume_kick(); grpc_timer_consume_kick();
g_kicked = false; g_kicked = false;
} }
gpr_mu_unlock(&g_mu);
} }
gpr_mu_unlock(&g_mu);
} }
// terminate the thread: drop the waiter count, thread count, and let whomever // terminate the thread: drop the waiter count, thread count, and let whomever
// stopped the threading stuff know that we're done // stopped the threading stuff know that we're done
...@@ -186,7 +200,9 @@ static void timer_thread(void *unused) { ...@@ -186,7 +200,9 @@ static void timer_thread(void *unused) {
g_completed_threads = ct; g_completed_threads = ct;
gpr_mu_unlock(&g_mu); gpr_mu_unlock(&g_mu);
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx);
gpr_log(GPR_DEBUG, "End timer thread"); if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
gpr_log(GPR_DEBUG, "End timer thread");
}
} }
static void start_threads(void) { static void start_threads(void) {
...@@ -214,11 +230,20 @@ void grpc_timer_manager_init(void) { ...@@ -214,11 +230,20 @@ void grpc_timer_manager_init(void) {
static void stop_threads(void) { static void stop_threads(void) {
gpr_mu_lock(&g_mu); gpr_mu_lock(&g_mu);
if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
gpr_log(GPR_DEBUG, "stop timer threads: threaded=%d", g_threaded);
}
if (g_threaded) { if (g_threaded) {
g_threaded = false; g_threaded = false;
gpr_cv_broadcast(&g_cv_wait); gpr_cv_broadcast(&g_cv_wait);
if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
gpr_log(GPR_DEBUG, "num timer threads: %d", g_thread_count);
}
while (g_thread_count > 0) { while (g_thread_count > 0) {
gpr_cv_wait(&g_cv_shutdown, &g_mu, gpr_inf_future(GPR_CLOCK_REALTIME)); gpr_cv_wait(&g_cv_shutdown, &g_mu, gpr_inf_future(GPR_CLOCK_REALTIME));
if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
gpr_log(GPR_DEBUG, "num timer threads: %d", g_thread_count);
}
gc_completed_threads(); gc_completed_threads();
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment