From 5e56f00d3a82da4d28f66791fa58faf4644a9d09 Mon Sep 17 00:00:00 2001
From: Craig Tiller <ctiller@google.com>
Date: Tue, 16 May 2017 15:02:50 -0700
Subject: [PATCH] Fixes to new executor

---
 src/core/lib/iomgr/combiner.c                |  2 +-
 src/core/lib/iomgr/executor.c                | 81 ++++++++++++--------
 src/core/lib/iomgr/executor.h                |  9 ++-
 src/core/lib/iomgr/iomgr.c                   |  9 +--
 src/core/lib/iomgr/iomgr.h                   |  4 +-
 src/core/lib/surface/init.c                  |  6 +-
 test/core/end2end/fuzzers/api_fuzzer.c       |  6 ++
 test/core/end2end/fuzzers/client_fuzzer.c    |  2 +
 test/core/end2end/fuzzers/server_fuzzer.c    |  2 +
 test/core/iomgr/ev_epollsig_linux_test.c     | 12 ++-
 test/core/iomgr/fd_conservation_posix_test.c | 15 ++--
 test/core/iomgr/fd_posix_test.c              |  4 +-
 test/core/iomgr/pollset_set_test.c           |  4 +-
 test/core/iomgr/resolve_address_posix_test.c | 15 ++--
 test/core/iomgr/resolve_address_test.c       | 15 ++--
 15 files changed, 105 insertions(+), 81 deletions(-)

diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c
index aa7a8c1c70..38eace12c7 100644
--- a/src/core/lib/iomgr/combiner.c
+++ b/src/core/lib/iomgr/combiner.c
@@ -214,7 +214,7 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) {
                               lock, grpc_exec_ctx_ready_to_finish(exec_ctx),
                               lock->time_to_execute_final_list));
 
-  if (grpc_exec_ctx_ready_to_finish(exec_ctx)) {
+  if (grpc_exec_ctx_ready_to_finish(exec_ctx) && grpc_executor_is_threaded()) {
     GPR_TIMER_MARK("offload_from_finished_exec_ctx", 0);
     // this execution context wants to move on, and we have a workqueue (and
     // so can help the execution context out): schedule remaining work to be
diff --git a/src/core/lib/iomgr/executor.c b/src/core/lib/iomgr/executor.c
index 2a2544dc1f..513248ca57 100644
--- a/src/core/lib/iomgr/executor.c
+++ b/src/core/lib/iomgr/executor.c
@@ -66,22 +66,6 @@ GPR_TLS_DECL(g_this_thread_state);
 
 static void executor_thread(void *arg);
 
-void grpc_executor_init() {
-  g_max_threads = GPR_MAX(1, 2 * gpr_cpu_num_cores());
-  gpr_atm_no_barrier_store(&g_cur_threads, 1);
-  gpr_tls_init(&g_this_thread_state);
-  g_thread_state = gpr_zalloc(sizeof(thread_state) * g_max_threads);
-  for (size_t i = 0; i < g_max_threads; i++) {
-    gpr_mu_init(&g_thread_state[i].mu);
-    gpr_cv_init(&g_thread_state[i].cv);
-    g_thread_state[i].elems = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
-  }
-
-  gpr_thd_options opt = gpr_thd_options_default();
-  gpr_thd_options_set_joinable(&opt);
-  gpr_thd_new(&g_thread_state[0].id, executor_thread, &g_thread_state[0], &opt);
-}
-
 static size_t run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list list) {
   size_t n = 0;
 
@@ -100,24 +84,57 @@ static size_t run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list list) {
   return n;
 }
 
-void grpc_executor_shutdown(grpc_exec_ctx *exec_ctx) {
-  for (size_t i = 0; i < g_max_threads; i++) {
-    gpr_mu_lock(&g_thread_state[i].mu);
-    g_thread_state[i].shutdown = true;
-    gpr_cv_signal(&g_thread_state[i].cv);
-    gpr_mu_unlock(&g_thread_state[i].mu);
-  }
-  for (gpr_atm i = 0; i < g_cur_threads; i++) {
-    gpr_thd_join(g_thread_state[i].id);
+bool grpc_executor_is_threaded() {
+  return gpr_atm_no_barrier_load(&g_cur_threads) > 0;
+}
+
+void grpc_executor_set_threading(grpc_exec_ctx *exec_ctx, bool threading) {
+  gpr_atm cur_threads = gpr_atm_no_barrier_load(&g_cur_threads);
+  if (threading) {
+    if (cur_threads > 0) return;
+    g_max_threads = GPR_MAX(1, 2 * gpr_cpu_num_cores());
+    gpr_atm_no_barrier_store(&g_cur_threads, 1);
+    gpr_tls_init(&g_this_thread_state);
+    g_thread_state = gpr_zalloc(sizeof(thread_state) * g_max_threads);
+    for (size_t i = 0; i < g_max_threads; i++) {
+      gpr_mu_init(&g_thread_state[i].mu);
+      gpr_cv_init(&g_thread_state[i].cv);
+      g_thread_state[i].elems = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
+    }
+
+    gpr_thd_options opt = gpr_thd_options_default();
+    gpr_thd_options_set_joinable(&opt);
+    gpr_thd_new(&g_thread_state[0].id, executor_thread, &g_thread_state[0],
+                &opt);
+  } else {
+    if (cur_threads == 0) return;
+    for (size_t i = 0; i < g_max_threads; i++) {
+      gpr_mu_lock(&g_thread_state[i].mu);
+      g_thread_state[i].shutdown = true;
+      gpr_cv_signal(&g_thread_state[i].cv);
+      gpr_mu_unlock(&g_thread_state[i].mu);
+    }
+    for (gpr_atm i = 0; i < g_cur_threads; i++) {
+      gpr_thd_join(g_thread_state[i].id);
+    }
+    gpr_atm_no_barrier_store(&g_cur_threads, 0);
+    for (size_t i = 0; i < g_max_threads; i++) {
+      gpr_mu_destroy(&g_thread_state[i].mu);
+      gpr_cv_destroy(&g_thread_state[i].cv);
+      run_closures(exec_ctx, g_thread_state[i].elems);
+    }
+    gpr_free(g_thread_state);
+    gpr_tls_destroy(&g_this_thread_state);
   }
+}
+
+void grpc_executor_init(grpc_exec_ctx *exec_ctx) {
   gpr_atm_no_barrier_store(&g_cur_threads, 0);
-  for (size_t i = 0; i < g_max_threads; i++) {
-    gpr_mu_destroy(&g_thread_state[i].mu);
-    gpr_cv_destroy(&g_thread_state[i].cv);
-    run_closures(exec_ctx, g_thread_state[i].elems);
-  }
-  gpr_free(g_thread_state);
-  gpr_tls_destroy(&g_this_thread_state);
+  grpc_executor_set_threading(exec_ctx, true);
+}
+
+void grpc_executor_shutdown(grpc_exec_ctx *exec_ctx) {
+  grpc_executor_set_threading(exec_ctx, false);
 }
 
 static void executor_thread(void *arg) {
diff --git a/src/core/lib/iomgr/executor.h b/src/core/lib/iomgr/executor.h
index 1213016383..792d5056cb 100644
--- a/src/core/lib/iomgr/executor.h
+++ b/src/core/lib/iomgr/executor.h
@@ -41,11 +41,18 @@
  * This mechanism is meant to outsource work (grpc_closure instances) to a
  * thread, for those cases where blocking isn't an option but there isn't a
  * non-blocking solution available. */
-void grpc_executor_init();
+void grpc_executor_init(grpc_exec_ctx *exec_ctx);
 
 extern grpc_closure_scheduler *grpc_executor_scheduler;
 
 /** Shutdown the executor, running all pending work as part of the call */
 void grpc_executor_shutdown(grpc_exec_ctx *exec_ctx);
 
+/** Is the executor multi-threaded? */
+bool grpc_executor_is_threaded();
+
+/* enable/disable threading - must be called after grpc_executor_init and before
+   grpc_executor_shutdown */
+void grpc_executor_set_threading(grpc_exec_ctx *exec_ctx, bool enable);
+
 #endif /* GRPC_CORE_LIB_IOMGR_EXECUTOR_H */
diff --git a/src/core/lib/iomgr/iomgr.c b/src/core/lib/iomgr/iomgr.c
index 9e0e4dbfe0..0623acc597 100644
--- a/src/core/lib/iomgr/iomgr.c
+++ b/src/core/lib/iomgr/iomgr.c
@@ -57,12 +57,12 @@ static gpr_cv g_rcv;
 static int g_shutdown;
 static grpc_iomgr_object g_root_object;
 
-void grpc_iomgr_init(void) {
+void grpc_iomgr_init(grpc_exec_ctx *exec_ctx) {
   g_shutdown = 0;
   gpr_mu_init(&g_mu);
   gpr_cv_init(&g_rcv);
   grpc_exec_ctx_global_init();
-  grpc_executor_init();
+  grpc_executor_init(exec_ctx);
   grpc_timer_list_init(gpr_now(GPR_CLOCK_MONOTONIC));
   g_root_object.next = g_root_object.prev = &g_root_object;
   g_root_object.name = "root";
@@ -70,7 +70,7 @@ void grpc_iomgr_init(void) {
   grpc_iomgr_platform_init();
 }
 
-void grpc_iomgr_start(void) { grpc_timer_manager_init(); }
+void grpc_iomgr_start(grpc_exec_ctx *exec_ctx) { grpc_timer_manager_init(); }
 
 static size_t count_objects(void) {
   grpc_iomgr_object *obj;
@@ -95,6 +95,7 @@ void grpc_iomgr_shutdown(grpc_exec_ctx *exec_ctx) {
 
   grpc_timer_manager_shutdown();
   grpc_iomgr_platform_flush();
+  grpc_executor_shutdown(exec_ctx);
 
   gpr_mu_lock(&g_mu);
   g_shutdown = 1;
@@ -145,8 +146,6 @@ void grpc_iomgr_shutdown(grpc_exec_ctx *exec_ctx) {
 
   grpc_timer_list_shutdown(exec_ctx);
   grpc_exec_ctx_flush(exec_ctx);
-  grpc_executor_shutdown(exec_ctx);
-  grpc_exec_ctx_flush(exec_ctx);
 
   /* ensure all threads have left g_mu */
   gpr_mu_lock(&g_mu);
diff --git a/src/core/lib/iomgr/iomgr.h b/src/core/lib/iomgr/iomgr.h
index 6e2e023615..bd6ca4a0b8 100644
--- a/src/core/lib/iomgr/iomgr.h
+++ b/src/core/lib/iomgr/iomgr.h
@@ -38,10 +38,10 @@
 #include "src/core/lib/iomgr/port.h"
 
 /** Initializes the iomgr. */
-void grpc_iomgr_init(void);
+void grpc_iomgr_init(grpc_exec_ctx *exec_ctx);
 
 /** Starts any background threads for iomgr. */
-void grpc_iomgr_start(void);
+void grpc_iomgr_start(grpc_exec_ctx *exec_ctx);
 
 /** Signals the intention to shutdown the iomgr. Expects to be able to flush
  * exec_ctx. */
diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c
index 452e6c444b..86ce4bde61 100644
--- a/src/core/lib/surface/init.c
+++ b/src/core/lib/surface/init.c
@@ -128,6 +128,7 @@ void grpc_init(void) {
   int i;
   gpr_once_init(&g_basic_init, do_basic_init);
 
+  grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
   gpr_mu_lock(&g_init_mu);
   if (++g_initializations == 1) {
     gpr_time_init();
@@ -154,7 +155,7 @@ void grpc_init(void) {
     grpc_register_tracer("pending_tags", &grpc_trace_pending_tags);
 #endif
     grpc_security_pre_init();
-    grpc_iomgr_init();
+    grpc_iomgr_init(&exec_ctx);
     gpr_timers_global_init();
     grpc_handshaker_factory_registry_init();
     grpc_security_init();
@@ -170,9 +171,10 @@ void grpc_init(void) {
     grpc_tracer_init("GRPC_TRACE");
     /* no more changes to channel init pipelines */
     grpc_channel_init_finalize();
-    grpc_iomgr_start();
+    grpc_iomgr_start(&exec_ctx);
   }
   gpr_mu_unlock(&g_init_mu);
+  grpc_exec_ctx_finish(&exec_ctx);
   GRPC_API_TRACE("grpc_init(void)", 0, ());
 }
 
diff --git a/test/core/end2end/fuzzers/api_fuzzer.c b/test/core/end2end/fuzzers/api_fuzzer.c
index b33b43dac5..d8d34fba9c 100644
--- a/test/core/end2end/fuzzers/api_fuzzer.c
+++ b/test/core/end2end/fuzzers/api_fuzzer.c
@@ -41,6 +41,7 @@
 
 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
 #include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/iomgr/executor.h"
 #include "src/core/lib/iomgr/resolve_address.h"
 #include "src/core/lib/iomgr/tcp_client.h"
 #include "src/core/lib/iomgr/timer.h"
@@ -724,6 +725,11 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) {
   gpr_now_impl = now_impl;
   grpc_init();
   grpc_timer_manager_set_threading(false);
+  {
+    grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+    grpc_executor_set_threading(&exec_ctx, false);
+    grpc_exec_ctx_finish(&exec_ctx);
+  }
   grpc_resolve_address = my_resolve_address;
 
   GPR_ASSERT(g_channel == NULL);
diff --git a/test/core/end2end/fuzzers/client_fuzzer.c b/test/core/end2end/fuzzers/client_fuzzer.c
index 6f49baffd2..2307a3c771 100644
--- a/test/core/end2end/fuzzers/client_fuzzer.c
+++ b/test/core/end2end/fuzzers/client_fuzzer.c
@@ -37,6 +37,7 @@
 #include <grpc/support/alloc.h>
 
 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
+#include "src/core/lib/iomgr/executor.h"
 #include "src/core/lib/slice/slice_internal.h"
 #include "src/core/lib/surface/channel.h"
 #include "test/core/util/memory_counters.h"
@@ -58,6 +59,7 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) {
   if (leak_check) grpc_memory_counters_init();
   grpc_init();
   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+  grpc_executor_set_threading(&exec_ctx, false);
 
   grpc_resource_quota *resource_quota =
       grpc_resource_quota_create("client_fuzzer");
diff --git a/test/core/end2end/fuzzers/server_fuzzer.c b/test/core/end2end/fuzzers/server_fuzzer.c
index 6d65fe1847..e6f6be2325 100644
--- a/test/core/end2end/fuzzers/server_fuzzer.c
+++ b/test/core/end2end/fuzzers/server_fuzzer.c
@@ -34,6 +34,7 @@
 #include <grpc/grpc.h>
 
 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
+#include "src/core/lib/iomgr/executor.h"
 #include "src/core/lib/slice/slice_internal.h"
 #include "src/core/lib/surface/server.h"
 #include "test/core/util/memory_counters.h"
@@ -56,6 +57,7 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) {
   if (leak_check) grpc_memory_counters_init();
   grpc_init();
   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+  grpc_executor_set_threading(&exec_ctx, false);
 
   grpc_resource_quota *resource_quota =
       grpc_resource_quota_create("server_fuzzer");
diff --git a/test/core/iomgr/ev_epollsig_linux_test.c b/test/core/iomgr/ev_epollsig_linux_test.c
index a20c4f2b98..952e774670 100644
--- a/test/core/iomgr/ev_epollsig_linux_test.c
+++ b/test/core/iomgr/ev_epollsig_linux_test.c
@@ -321,8 +321,9 @@ static void test_threading(void) {
 int main(int argc, char **argv) {
   const char *poll_strategy = NULL;
   grpc_test_init(argc, argv);
-  grpc_iomgr_init();
-  grpc_iomgr_start();
+  grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+  grpc_iomgr_init(&exec_ctx);
+  grpc_iomgr_start(&exec_ctx);
 
   poll_strategy = grpc_get_poll_strategy_name();
   if (poll_strategy != NULL && strcmp(poll_strategy, "epollsig") == 0) {
@@ -335,11 +336,8 @@ int main(int argc, char **argv) {
             poll_strategy);
   }
 
-  {
-    grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
-    grpc_iomgr_shutdown(&exec_ctx);
-    grpc_exec_ctx_finish(&exec_ctx);
-  }
+  grpc_iomgr_shutdown(&exec_ctx);
+  grpc_exec_ctx_finish(&exec_ctx);
   return 0;
 }
 #else /* defined(GRPC_LINUX_EPOLL) */
diff --git a/test/core/iomgr/fd_conservation_posix_test.c b/test/core/iomgr/fd_conservation_posix_test.c
index f662070655..18d8cf4ec8 100644
--- a/test/core/iomgr/fd_conservation_posix_test.c
+++ b/test/core/iomgr/fd_conservation_posix_test.c
@@ -45,8 +45,9 @@ int main(int argc, char **argv) {
   grpc_endpoint_pair p;
 
   grpc_test_init(argc, argv);
-  grpc_iomgr_init();
-  grpc_iomgr_start();
+  grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+  grpc_iomgr_init(&exec_ctx);
+  grpc_iomgr_start(&exec_ctx);
 
   /* set max # of file descriptors to a low value, and
      verify we can create and destroy many more than this number
@@ -57,19 +58,15 @@ int main(int argc, char **argv) {
       grpc_resource_quota_create("fd_conservation_posix_test");
 
   for (i = 0; i < 100; i++) {
-    grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
     p = grpc_iomgr_create_endpoint_pair("test", NULL);
     grpc_endpoint_destroy(&exec_ctx, p.client);
     grpc_endpoint_destroy(&exec_ctx, p.server);
-    grpc_exec_ctx_finish(&exec_ctx);
+    grpc_exec_ctx_flush(&exec_ctx);
   }
 
   grpc_resource_quota_unref(resource_quota);
 
-  {
-    grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
-    grpc_iomgr_shutdown(&exec_ctx);
-    grpc_exec_ctx_finish(&exec_ctx);
-  }
+  grpc_iomgr_shutdown(&exec_ctx);
+  grpc_exec_ctx_finish(&exec_ctx);
   return 0;
 }
diff --git a/test/core/iomgr/fd_posix_test.c b/test/core/iomgr/fd_posix_test.c
index 9e8fe8bffa..d0f31e087d 100644
--- a/test/core/iomgr/fd_posix_test.c
+++ b/test/core/iomgr/fd_posix_test.c
@@ -542,8 +542,8 @@ int main(int argc, char **argv) {
   grpc_closure destroyed;
   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
   grpc_test_init(argc, argv);
-  grpc_iomgr_init();
-  grpc_iomgr_start();
+  grpc_iomgr_init(&exec_ctx);
+  grpc_iomgr_start(&exec_ctx);
   g_pollset = gpr_zalloc(grpc_pollset_size());
   grpc_pollset_init(g_pollset, &g_mu);
   test_grpc_fd();
diff --git a/test/core/iomgr/pollset_set_test.c b/test/core/iomgr/pollset_set_test.c
index 092711381d..587130704d 100644
--- a/test/core/iomgr/pollset_set_test.c
+++ b/test/core/iomgr/pollset_set_test.c
@@ -447,8 +447,8 @@ int main(int argc, char **argv) {
   const char *poll_strategy = grpc_get_poll_strategy_name();
   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
   grpc_test_init(argc, argv);
-  grpc_iomgr_init();
-  grpc_iomgr_start();
+  grpc_iomgr_init(&exec_ctx);
+  grpc_iomgr_start(&exec_ctx);
 
   if (poll_strategy != NULL &&
       (strcmp(poll_strategy, "epoll") == 0 ||
diff --git a/test/core/iomgr/resolve_address_posix_test.c b/test/core/iomgr/resolve_address_posix_test.c
index bee7036ec8..f07bd045b6 100644
--- a/test/core/iomgr/resolve_address_posix_test.c
+++ b/test/core/iomgr/resolve_address_posix_test.c
@@ -174,16 +174,13 @@ static void test_unix_socket_path_name_too_long(void) {
 
 int main(int argc, char **argv) {
   grpc_test_init(argc, argv);
-  grpc_executor_init();
-  grpc_iomgr_init();
-  grpc_iomgr_start();
+  grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+  grpc_iomgr_init(&exec_ctx);
+  grpc_iomgr_start(&exec_ctx);
   test_unix_socket();
   test_unix_socket_path_name_too_long();
-  {
-    grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
-    grpc_executor_shutdown(&exec_ctx);
-    grpc_iomgr_shutdown(&exec_ctx);
-    grpc_exec_ctx_finish(&exec_ctx);
-  }
+  grpc_executor_shutdown(&exec_ctx);
+  grpc_iomgr_shutdown(&exec_ctx);
+  grpc_exec_ctx_finish(&exec_ctx);
   return 0;
 }
diff --git a/test/core/iomgr/resolve_address_test.c b/test/core/iomgr/resolve_address_test.c
index 83f73070dc..2c3240aaac 100644
--- a/test/core/iomgr/resolve_address_test.c
+++ b/test/core/iomgr/resolve_address_test.c
@@ -263,9 +263,9 @@ static void test_unparseable_hostports(void) {
 
 int main(int argc, char **argv) {
   grpc_test_init(argc, argv);
-  grpc_executor_init();
-  grpc_iomgr_init();
-  grpc_iomgr_start();
+  grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+  grpc_iomgr_init(&exec_ctx);
+  grpc_iomgr_start(&exec_ctx);
   test_localhost();
   test_default_port();
   test_non_numeric_default_port();
@@ -274,11 +274,8 @@ int main(int argc, char **argv) {
   test_ipv6_without_port();
   test_invalid_ip_addresses();
   test_unparseable_hostports();
-  {
-    grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
-    grpc_executor_shutdown(&exec_ctx);
-    grpc_iomgr_shutdown(&exec_ctx);
-    grpc_exec_ctx_finish(&exec_ctx);
-  }
+  grpc_executor_shutdown(&exec_ctx);
+  grpc_iomgr_shutdown(&exec_ctx);
+  grpc_exec_ctx_finish(&exec_ctx);
   return 0;
 }
-- 
GitLab