diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c index b2d6559751482039feacad8eec41c7dc6a0b2e0b..273505f8b810a474e7be1376c1725eed4c5a67b6 100644 --- a/src/core/lib/iomgr/combiner.c +++ b/src/core/lib/iomgr/combiner.c @@ -189,7 +189,7 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { } if (lock->optional_workqueue != NULL && - grpc_exec_ctx_ready_to_finish(exec_ctx) && is_covered_by_poller(lock)) { + is_covered_by_poller(lock) && grpc_exec_ctx_ready_to_finish(exec_ctx)) { GPR_TIMER_MARK("offload_from_finished_exec_ctx", 0); // this execution context wants to move on, and we have a workqueue (and // so can help the execution context out): schedule remaining work to be diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c index 740920d760e90a66c73e40041770fdf9e6b2bdbf..f473e4765c4b932d3721c5c3d2b648a232111cb6 100644 --- a/src/core/lib/iomgr/ev_epoll_linux.c +++ b/src/core/lib/iomgr/ev_epoll_linux.c @@ -1299,7 +1299,7 @@ static void pollset_reset(grpc_pollset *pollset) { GPR_ASSERT(pollset->polling_island == NULL); } -#define GRPC_EPOLL_MAX_EVENTS 1000 +#define GRPC_EPOLL_MAX_EVENTS 100 /* Note: sig_mask contains the signal mask to use *during* epoll_wait() */ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, diff --git a/src/core/lib/iomgr/workqueue_posix.c b/src/core/lib/iomgr/workqueue_posix.c index 836bb8b6e0f252d7e83ed1993951c3fa95cfc291..6f8a26684abe07a5df1a78765ea752443f85b8db 100644 --- a/src/core/lib/iomgr/workqueue_posix.c +++ b/src/core/lib/iomgr/workqueue_posix.c @@ -146,42 +146,67 @@ static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { GPR_ASSERT(gpr_atm_no_barrier_load(&workqueue->state) == 0); gpr_free(workqueue); } else { - error = grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd); - if (error != GRPC_ERROR_NONE) { - /* recurse to get error handling */ - on_readable(exec_ctx, arg, error); - } else { - gpr_mpscq_node *n = gpr_mpscq_pop(&workqueue->queue); - if (n == NULL) { - /* try again - queue in an ephemerally inconsistent state */ - wakeup(exec_ctx, workqueue); - grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd, - &workqueue->read_closure); - } else { + gpr_mpscq_node *n = NULL; + for (int i = 0; i < 100; i++) { + n = gpr_mpscq_pop(&workqueue->queue); + if (n != NULL) { + grpc_closure *c = (grpc_closure *)n; + grpc_closure_run(exec_ctx, c, c->error_data.error); + grpc_exec_ctx_flush(exec_ctx); gpr_atm last = gpr_atm_full_fetch_add(&workqueue->state, -2); switch (last) { default: - // schedule a wakeup since there's more to do - wakeup(exec_ctx, workqueue); - break; + // there's more to do, keep going + goto keep_going; case 3: // had one count, one unorphaned --> done, unorphaned - break; + goto switch_to_idle; case 2: // had one count, one orphaned --> done, orphaned - workqueue_destroy(exec_ctx, workqueue); - break; + goto destroy; case 1: case 0: // these values are illegal - representing an already done or // deleted workqueue GPR_UNREACHABLE_CODE(break); } - grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd, - &workqueue->read_closure); - grpc_closure *cl = (grpc_closure *)n; - grpc_error *clerr = cl->error_data.error; - grpc_closure_run(exec_ctx, cl, clerr); } } + /* fall through to wakeup_next -- we tried a bunch of times to pull a node + * but failed */ +wakeup_next: + error = grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd); + if (error != GRPC_ERROR_NONE) { + /* recurse to get error handling */ + on_readable(exec_ctx, arg, error); + } else { + grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd, + &workqueue->read_closure); + wakeup(exec_ctx, workqueue); + } + return; + +keep_going: + if (grpc_exec_ctx_ready_to_finish(exec_ctx)) { + goto wakeup_next; + } else { + /* recurse to continue */ + on_readable(exec_ctx, arg, GRPC_ERROR_NONE); + } + return; + +switch_to_idle: + error = grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd); + if (error != GRPC_ERROR_NONE) { + /* recurse to get error handling */ + on_readable(exec_ctx, arg, error); + } else { + grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd, + &workqueue->read_closure); + } + return; + +destroy: + workqueue_destroy(exec_ctx, workqueue); + return; } GPR_TIMER_END("workqueue.on_readable", 0);