From de2c41c394770fd87f5e406af5dfe70ba6656b4c Mon Sep 17 00:00:00 2001
From: Craig Tiller <ctiller@google.com>
Date: Thu, 1 Sep 2016 15:08:08 -0700
Subject: [PATCH] Call closures directly where safe

---
 .../chttp2/transport/chttp2_transport.c       | 19 +++++++------------
 src/core/lib/iomgr/closure.c                  | 10 ++++++++++
 src/core/lib/iomgr/closure.h                  |  5 +++++
 src/core/lib/iomgr/exec_ctx.c                 |  6 +-----
 src/core/lib/iomgr/tcp_posix.c                |  9 +++------
 src/core/lib/surface/server.c                 |  3 +--
 6 files changed, 27 insertions(+), 25 deletions(-)

diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 94440a9539..13fc8ab374 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -759,7 +759,7 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx,
       grpc_transport_move_stats(&s->stats, s->collecting_stats);
       s->collecting_stats = NULL;
     }
-    grpc_exec_ctx_sched(exec_ctx, closure, closure->error_data.error, NULL);
+    grpc_closure_run(exec_ctx, closure, closure->error_data.error);
   }
   *pclosure = NULL;
 }
@@ -1155,7 +1155,7 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
     close_transport_locked(exec_ctx, t, close_transport);
   }
 
-  grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL);
+  grpc_closure_run(exec_ctx, op->on_consumed, GRPC_ERROR_NONE);
 
   GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "transport_op");
 }
@@ -1199,8 +1199,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
       }
       grpc_chttp2_incoming_metadata_buffer_publish(&s->metadata_buffer[0],
                                                    s->recv_initial_metadata);
-      grpc_exec_ctx_sched(exec_ctx, s->recv_initial_metadata_ready,
-                          GRPC_ERROR_NONE, NULL);
+      grpc_closure_run(exec_ctx, s->recv_initial_metadata_ready, GRPC_ERROR_NONE);
       s->recv_initial_metadata_ready = NULL;
     }
     if (s->recv_message_ready != NULL) {
@@ -1213,13 +1212,11 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
         *s->recv_message =
             grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames);
         GPR_ASSERT(*s->recv_message != NULL);
-        grpc_exec_ctx_sched(exec_ctx, s->recv_message_ready, GRPC_ERROR_NONE,
-                            NULL);
+        grpc_closure_run(exec_ctx, s->recv_message_ready, GRPC_ERROR_NONE);
         s->recv_message_ready = NULL;
       } else if (s->published_metadata[1]) {
         *s->recv_message = NULL;
-        grpc_exec_ctx_sched(exec_ctx, s->recv_message_ready, GRPC_ERROR_NONE,
-                            NULL);
+        grpc_closure_run(exec_ctx, s->recv_message_ready, GRPC_ERROR_NONE);
         s->recv_message_ready = NULL;
       }
     }
@@ -1853,11 +1850,9 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx,
   gpr_mu_lock(&bs->slice_mu);
   if (bs->slices.count > 0) {
     *bs->next_action.slice = gpr_slice_buffer_take_first(&bs->slices);
-    grpc_exec_ctx_sched(exec_ctx, bs->next_action.on_complete, GRPC_ERROR_NONE,
-                        NULL);
+    grpc_closure_run(exec_ctx, bs->next_action.on_complete, GRPC_ERROR_NONE);
   } else if (bs->error != GRPC_ERROR_NONE) {
-    grpc_exec_ctx_sched(exec_ctx, bs->next_action.on_complete,
-                        GRPC_ERROR_REF(bs->error), NULL);
+    grpc_closure_run(exec_ctx, bs->next_action.on_complete, GRPC_ERROR_REF(bs->error));
   } else {
     bs->on_next = bs->next_action.on_complete;
     bs->next = bs->next_action.slice;
diff --git a/src/core/lib/iomgr/closure.c b/src/core/lib/iomgr/closure.c
index 6200cda5dc..5cbd6bd7a5 100644
--- a/src/core/lib/iomgr/closure.c
+++ b/src/core/lib/iomgr/closure.c
@@ -35,6 +35,8 @@
 
 #include <grpc/support/alloc.h>
 
+#include "src/core/lib/profiling/timers.h"
+
 void grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb,
                        void *cb_arg) {
   closure->cb = cb;
@@ -110,3 +112,11 @@ grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg) {
   grpc_closure_init(&wc->wrapper, closure_wrapper, wc);
   return &wc->wrapper;
 }
+
+void grpc_closure_run(grpc_exec_ctx *exec_ctx, grpc_closure *c, grpc_error *error) {
+  GPR_TIMER_BEGIN("grpc_closure_run", 0);
+  c->cb(exec_ctx, c->cb_arg, error);
+  GRPC_ERROR_UNREF(error);
+  GPR_TIMER_END("grpc_closure_run", 0);
+}
+
diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h
index bf7c006097..29ed19cb4f 100644
--- a/src/core/lib/iomgr/closure.h
+++ b/src/core/lib/iomgr/closure.h
@@ -109,4 +109,9 @@ void grpc_closure_list_move(grpc_closure_list *src, grpc_closure_list *dst);
 /** return whether \a list is empty. */
 bool grpc_closure_list_empty(grpc_closure_list list);
 
+/** Run a closure directly. Caller ensures that no locks are being held above.
+ *  Note that calling this at the end of a closure callback function itself is
+ *  by definition safe. */
+void grpc_closure_run(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_error *error);
+
 #endif /* GRPC_CORE_LIB_IOMGR_CLOSURE_H */
diff --git a/src/core/lib/iomgr/exec_ctx.c b/src/core/lib/iomgr/exec_ctx.c
index eec32a4f26..a3c40e8092 100644
--- a/src/core/lib/iomgr/exec_ctx.c
+++ b/src/core/lib/iomgr/exec_ctx.c
@@ -67,12 +67,8 @@ bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) {
       exec_ctx->closure_list.head = exec_ctx->closure_list.tail = NULL;
       while (c != NULL) {
         grpc_closure *next = c->next_data.next;
-        grpc_error *error = c->error_data.error;
         did_something = true;
-        GPR_TIMER_BEGIN("grpc_exec_ctx_flush.cb", 0);
-        c->cb(exec_ctx, c->cb_arg, error);
-        GRPC_ERROR_UNREF(error);
-        GPR_TIMER_END("grpc_exec_ctx_flush.cb", 0);
+        grpc_closure_run(exec_ctx, c, c->error_data.error);
         c = next;
       }
       continue;
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c
index caaed23212..ffdc7c7b42 100644
--- a/src/core/lib/iomgr/tcp_posix.c
+++ b/src/core/lib/iomgr/tcp_posix.c
@@ -177,7 +177,7 @@ static void call_read_cb(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp,
 
   tcp->read_cb = NULL;
   tcp->incoming_buffer = NULL;
-  grpc_exec_ctx_sched(exec_ctx, cb, error, NULL);
+  grpc_closure_run(exec_ctx, cb, error);
 }
 
 #define MAX_READ_IOVEC 4
@@ -279,7 +279,7 @@ static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
     tcp->finished_edge = false;
     grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure);
   } else {
-    grpc_exec_ctx_sched(exec_ctx, &tcp->read_closure, GRPC_ERROR_NONE, NULL);
+    grpc_exec_ctx_sched(exec_ctx, &tcp->read_closure, GRPC_ERROR_NONE, grpc_fd_get_workqueue(tcp->em_fd));
   }
 }
 
@@ -392,11 +392,8 @@ static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
       grpc_error_free_string(str);
     }
 
-    GPR_TIMER_BEGIN("tcp_handle_write.cb", 0);
-    cb->cb(exec_ctx, cb->cb_arg, error);
-    GPR_TIMER_END("tcp_handle_write.cb", 0);
+    grpc_closure_run(exec_ctx, cb, error);
     TCP_UNREF(exec_ctx, tcp, "write");
-    GRPC_ERROR_UNREF(error);
   }
 }
 
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index 9a517b0a7a..8f9b995265 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -773,8 +773,7 @@ static void server_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr,
         GRPC_ERROR_CREATE_REFERENCING("Missing :authority or :path", &error, 1);
   }
 
-  grpc_exec_ctx_sched(exec_ctx, calld->on_done_recv_initial_metadata, error,
-                      NULL);
+  grpc_closure_run(exec_ctx, calld->on_done_recv_initial_metadata, error);
 }
 
 static void server_mutate_op(grpc_call_element *elem,
-- 
GitLab