From 0b4c9011846c9f9640eabefcbecc9d8736b3ccc0 Mon Sep 17 00:00:00 2001 From: Craig Tiller <ctiller@google.com> Date: Thu, 6 Apr 2017 17:19:37 -0700 Subject: [PATCH] fd-workqueue-melding --- src/core/lib/iomgr/ev_epollex_linux.c | 50 ++++++++++++++++++++++++++- 1 file changed, 49 insertions(+), 1 deletion(-) diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c index b3390c602e..608cda4a3b 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.c +++ b/src/core/lib/iomgr/ev_epollex_linux.c @@ -61,6 +61,7 @@ #include "src/core/lib/iomgr/workqueue.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/block_annotate.h" +#include "src/core/lib/support/spinlock.h" #ifndef EPOLLEXCLUSIVE #define EPOLLEXCLUSIVE (1u << 28) @@ -86,8 +87,16 @@ struct grpc_fd { Ref/Unref by two to avoid altering the orphaned bit */ gpr_atm refst; + /* Wakeup fd used to wake pollers to check the contents of workqueue_items */ grpc_wakeup_fd workqueue_wakeup_fd; grpc_closure_scheduler workqueue_scheduler; + /* Spinlock guarding the read end of the workqueue (must be held to pop from + * workqueue_items) */ + gpr_spinlock workqueue_read_mu; + /* Queue of closures to be executed */ + gpr_mpscq workqueue_items; + /* Count of items in workqueue_items */ + gpr_atm workqueue_item_count; /* The fd is either closed or we relinquished control of it. In either cases, this indicates that the 'fd' on this structure is no longer @@ -269,6 +278,9 @@ static grpc_fd *fd_create(int fd, const char *name) { GRPC_LOG_IF_ERROR("fd_create", grpc_wakeup_fd_init(&new_fd->workqueue_wakeup_fd)); new_fd->workqueue_scheduler.vtable = &workqueue_scheduler_vtable; + new_fd->workqueue_read_mu = GPR_SPINLOCK_INIT; + gpr_mpscq_init(&new_fd->workqueue_items); + gpr_atm_no_barrier_store(&new_fd->workqueue_item_count); new_fd->freelist_next = NULL; new_fd->on_done_closure = NULL; @@ -392,12 +404,46 @@ static void workqueue_unref(grpc_exec_ctx *exec_ctx, } #endif +static void workqueue_wakeup(grpc_fd *fd) { + GRPC_LOG_IF_ERROR("workqueue_enqueue", + grpc_wakeup_fd_wakeup(&fd->workqueue_wakeup_fd)); +} + static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_error *error) { GPR_TIMER_BEGIN("workqueue.enqueue", 0); grpc_fd *fd = (grpc_fd *)(((char *)closure->scheduler) - offsetof(grpc_fd, workqueue_scheduler)); - abort(); + REF_BY(fd, 2); + gpr_atm last = gpr_atm_no_barrier_fetch_add(&fd->workqueue_item_count, 1); + closure->error_data.error = error; + gpr_mpscq_push(&fd->workqueue_items, &closure->next_data.atm_next); + if (last == 0) { + workqueue_wakeup(fd); + } + UNREF_BY(fd, 2); +} + +static void fd_invoke_workqueue(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { + /* handle spurious wakeups */ + if (!gpr_spinlock_trylock(&fd->workqueue_read_mu)) return; + gpr_mpscq_node *n = gpr_mpscq_pop(&fd->workqueue_items); + gpr_spinlock_unlock(&fd->workqueue_read_mu); + if (n != NULL) { + if (gpr_atm_full_fetch_add(&pi->workqueue_item_count, -1) > 1) { + workqueue_wakeup(fd); + } + grpc_closure *c = (grpc_closure *)n; + grpc_error *error = c->error_data.error; + c->cb(exec_ctx, c->cb_arg, error); + GRPC_ERROR_UNREF(error); + return true; + } else if (gpr_atm_no_barrier_load(&pi->workqueue_item_count) > 0) { + /* n == NULL might mean there's work but it's not available to be popped + * yet - try to ensure another workqueue wakes up to check shortly if so + */ + workqueue_wakeup(fd); + } } static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) { @@ -556,8 +602,10 @@ static grpc_error *pollset_poll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, "epoll fd failed to initialize"); } + GRPC_SCHEDULING_START_BLOCKING_REGION; int r = epoll_wait(pollset->epfd, events, MAX_EPOLL_EVENTS, poll_deadline_to_millis_timeout(deadline, now)); + GRPC_SCHEDULING_END_BLOCKING_REGION; if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait"); grpc_error *error = GRPC_ERROR_NONE; -- GitLab