From 84ea341aa3c1435182d8f2e0a687fa45bd4a8d1c Mon Sep 17 00:00:00 2001
From: Craig Tiller <ctiller@google.com>
Date: Thu, 8 Sep 2016 14:57:56 -0700
Subject: [PATCH] Minor perf improvements

---
 src/core/lib/iomgr/combiner.c        |  2 +-
 src/core/lib/iomgr/ev_epoll_linux.c  |  2 +-
 src/core/lib/iomgr/workqueue_posix.c | 71 +++++++++++++++++++---------
 3 files changed, 50 insertions(+), 25 deletions(-)

diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c
index b2d6559751..273505f8b8 100644
--- a/src/core/lib/iomgr/combiner.c
+++ b/src/core/lib/iomgr/combiner.c
@@ -189,7 +189,7 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) {
   }
 
   if (lock->optional_workqueue != NULL &&
-      grpc_exec_ctx_ready_to_finish(exec_ctx) && is_covered_by_poller(lock)) {
+      is_covered_by_poller(lock) && grpc_exec_ctx_ready_to_finish(exec_ctx)) {
     GPR_TIMER_MARK("offload_from_finished_exec_ctx", 0);
     // this execution context wants to move on, and we have a workqueue (and
     // so can help the execution context out): schedule remaining work to be
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c
index 740920d760..f473e4765c 100644
--- a/src/core/lib/iomgr/ev_epoll_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_linux.c
@@ -1299,7 +1299,7 @@ static void pollset_reset(grpc_pollset *pollset) {
   GPR_ASSERT(pollset->polling_island == NULL);
 }
 
-#define GRPC_EPOLL_MAX_EVENTS 1000
+#define GRPC_EPOLL_MAX_EVENTS 100
 /* Note: sig_mask contains the signal mask to use *during* epoll_wait() */
 static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
                                     grpc_pollset *pollset,
diff --git a/src/core/lib/iomgr/workqueue_posix.c b/src/core/lib/iomgr/workqueue_posix.c
index 836bb8b6e0..6f8a26684a 100644
--- a/src/core/lib/iomgr/workqueue_posix.c
+++ b/src/core/lib/iomgr/workqueue_posix.c
@@ -146,42 +146,67 @@ static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
     GPR_ASSERT(gpr_atm_no_barrier_load(&workqueue->state) == 0);
     gpr_free(workqueue);
   } else {
-    error = grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd);
-    if (error != GRPC_ERROR_NONE) {
-      /* recurse to get error handling */
-      on_readable(exec_ctx, arg, error);
-    } else {
-      gpr_mpscq_node *n = gpr_mpscq_pop(&workqueue->queue);
-      if (n == NULL) {
-        /* try again - queue in an ephemerally inconsistent state */
-        wakeup(exec_ctx, workqueue);
-        grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd,
-                               &workqueue->read_closure);
-      } else {
+    gpr_mpscq_node *n = NULL;
+    for (int i = 0; i < 100; i++) {
+      n = gpr_mpscq_pop(&workqueue->queue);
+      if (n != NULL) {
+        grpc_closure *c = (grpc_closure *)n;
+        grpc_closure_run(exec_ctx, c, c->error_data.error);
+        grpc_exec_ctx_flush(exec_ctx);
         gpr_atm last = gpr_atm_full_fetch_add(&workqueue->state, -2);
         switch (last) {
           default:
-            // schedule a wakeup since there's more to do
-            wakeup(exec_ctx, workqueue);
-            break;
+            // there's more to do, keep going
+            goto keep_going;
           case 3:  // had one count, one unorphaned --> done, unorphaned
-            break;
+            goto switch_to_idle;
           case 2:  // had one count, one orphaned --> done, orphaned
-            workqueue_destroy(exec_ctx, workqueue);
-            break;
+            goto destroy;
           case 1:
           case 0:
             // these values are illegal - representing an already done or
             // deleted workqueue
             GPR_UNREACHABLE_CODE(break);
         }
-        grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd,
-                               &workqueue->read_closure);
-        grpc_closure *cl = (grpc_closure *)n;
-        grpc_error *clerr = cl->error_data.error;
-        grpc_closure_run(exec_ctx, cl, clerr);
       }
     }
+    /* fall through to wakeup_next -- we tried a bunch of times to pull a node
+     * but failed */
+wakeup_next:
+    error = grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd);
+    if (error != GRPC_ERROR_NONE) {
+      /* recurse to get error handling */
+      on_readable(exec_ctx, arg, error);
+    } else {
+      grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd,
+                             &workqueue->read_closure);
+      wakeup(exec_ctx, workqueue);
+    }
+    return;
+
+keep_going:
+    if (grpc_exec_ctx_ready_to_finish(exec_ctx)) {
+      goto wakeup_next;
+    } else {
+      /* recurse to continue */
+      on_readable(exec_ctx, arg, GRPC_ERROR_NONE);
+    }
+    return;
+
+switch_to_idle:
+    error = grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd);
+    if (error != GRPC_ERROR_NONE) {
+      /* recurse to get error handling */
+      on_readable(exec_ctx, arg, error);
+    } else {
+      grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd,
+                             &workqueue->read_closure);
+    }
+    return;
+
+destroy:
+    workqueue_destroy(exec_ctx, workqueue);
+    return;
   }
 
   GPR_TIMER_END("workqueue.on_readable", 0);
-- 
GitLab