Skip to content
Snippets Groups Projects
Commit 09df1d6e authored by Craig Tiller's avatar Craig Tiller
Browse files

Make workqueue lockfree, and distributing

parent 9b22a46d
No related branches found
No related tags found
No related merge requests found
...@@ -52,8 +52,7 @@ typedef struct grpc_combiner grpc_combiner; ...@@ -52,8 +52,7 @@ typedef struct grpc_combiner grpc_combiner;
grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue); grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue);
// Destroy the lock // Destroy the lock
void grpc_combiner_destroy(grpc_combiner *lock); void grpc_combiner_destroy(grpc_combiner *lock);
// Execute \a action within the lock. \a arg is the argument to pass to \a // Execute \a action within the lock.
// action and sizeof_arg is the sizeof(*arg), or 0 if arg is non-copyable.
void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
grpc_closure *closure, grpc_error *error); grpc_closure *closure, grpc_error *error);
......
...@@ -50,8 +50,6 @@ ...@@ -50,8 +50,6 @@
/* grpc_workqueue is forward declared in exec_ctx.h */ /* grpc_workqueue is forward declared in exec_ctx.h */
void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue);
//#define GRPC_WORKQUEUE_REFCOUNT_DEBUG //#define GRPC_WORKQUEUE_REFCOUNT_DEBUG
#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG #ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
#define GRPC_WORKQUEUE_REF(p, r) \ #define GRPC_WORKQUEUE_REF(p, r) \
......
...@@ -52,8 +52,7 @@ grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx, ...@@ -52,8 +52,7 @@ grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx,
char name[32]; char name[32];
*workqueue = gpr_malloc(sizeof(grpc_workqueue)); *workqueue = gpr_malloc(sizeof(grpc_workqueue));
gpr_ref_init(&(*workqueue)->refs, 1); gpr_ref_init(&(*workqueue)->refs, 1);
gpr_mu_init(&(*workqueue)->mu); gpr_atm_no_barrier_store(&(*workqueue)->state, 1);
(*workqueue)->closure_list.head = (*workqueue)->closure_list.tail = NULL;
grpc_error *err = grpc_wakeup_fd_init(&(*workqueue)->wakeup_fd); grpc_error *err = grpc_wakeup_fd_init(&(*workqueue)->wakeup_fd);
if (err != GRPC_ERROR_NONE) { if (err != GRPC_ERROR_NONE) {
gpr_free(*workqueue); gpr_free(*workqueue);
...@@ -62,6 +61,7 @@ grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx, ...@@ -62,6 +61,7 @@ grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx,
sprintf(name, "workqueue:%p", (void *)(*workqueue)); sprintf(name, "workqueue:%p", (void *)(*workqueue));
(*workqueue)->wakeup_read_fd = grpc_fd_create( (*workqueue)->wakeup_read_fd = grpc_fd_create(
GRPC_WAKEUP_FD_GET_READ_FD(&(*workqueue)->wakeup_fd), name); GRPC_WAKEUP_FD_GET_READ_FD(&(*workqueue)->wakeup_fd), name);
gpr_mpscq_init(&(*workqueue)->queue);
grpc_closure_init(&(*workqueue)->read_closure, on_readable, *workqueue); grpc_closure_init(&(*workqueue)->read_closure, on_readable, *workqueue);
grpc_fd_notify_on_read(exec_ctx, (*workqueue)->wakeup_read_fd, grpc_fd_notify_on_read(exec_ctx, (*workqueue)->wakeup_read_fd,
&(*workqueue)->read_closure); &(*workqueue)->read_closure);
...@@ -70,10 +70,16 @@ grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx, ...@@ -70,10 +70,16 @@ grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx,
static void workqueue_destroy(grpc_exec_ctx *exec_ctx, static void workqueue_destroy(grpc_exec_ctx *exec_ctx,
grpc_workqueue *workqueue) { grpc_workqueue *workqueue) {
grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL);
grpc_fd_shutdown(exec_ctx, workqueue->wakeup_read_fd); grpc_fd_shutdown(exec_ctx, workqueue->wakeup_read_fd);
} }
static void workqueue_orphan(grpc_exec_ctx *exec_ctx,
grpc_workqueue *workqueue) {
if (gpr_atm_full_fetch_add(&workqueue->state, -1) == 1) {
workqueue_destroy(exec_ctx, workqueue);
}
}
#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG #ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
void grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, int line, void grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, int line,
const char *reason) { const char *reason) {
...@@ -96,31 +102,34 @@ void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, ...@@ -96,31 +102,34 @@ void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {
#endif #endif
if (gpr_unref(&workqueue->refs)) { if (gpr_unref(&workqueue->refs)) {
workqueue_destroy(exec_ctx, workqueue); workqueue_orphan(exec_ctx, workqueue);
} }
} }
void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { static void drain(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {
gpr_mu_lock(&workqueue->mu); abort();
grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL); }
gpr_mu_unlock(&workqueue->mu);
static void wakeup(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {
grpc_error *err = grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd);
if (!GRPC_LOG_IF_ERROR("wakeupfd_wakeup", err)) {
drain(exec_ctx, workqueue);
}
} }
static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
grpc_workqueue *workqueue = arg; grpc_workqueue *workqueue = arg;
if (error != GRPC_ERROR_NONE) { if (error != GRPC_ERROR_NONE) {
gpr_mu_destroy(&workqueue->mu);
/* HACK: let wakeup_fd code know that we stole the fd */ /* HACK: let wakeup_fd code know that we stole the fd */
workqueue->wakeup_fd.read_fd = 0; workqueue->wakeup_fd.read_fd = 0;
grpc_wakeup_fd_destroy(&workqueue->wakeup_fd); grpc_wakeup_fd_destroy(&workqueue->wakeup_fd);
grpc_fd_orphan(exec_ctx, workqueue->wakeup_read_fd, NULL, NULL, "destroy"); grpc_fd_orphan(exec_ctx, workqueue->wakeup_read_fd, NULL, NULL, "destroy");
drain(exec_ctx, workqueue);
gpr_free(workqueue); gpr_free(workqueue);
} else { } else {
gpr_mu_lock(&workqueue->mu);
grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL);
error = grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd); error = grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd);
gpr_mu_unlock(&workqueue->mu); gpr_mpscq_node *n = gpr_mpscq_pop(&workqueue->queue);
if (error == GRPC_ERROR_NONE) { if (error == GRPC_ERROR_NONE) {
grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd, grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd,
&workqueue->read_closure); &workqueue->read_closure);
...@@ -128,24 +137,41 @@ static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { ...@@ -128,24 +137,41 @@ static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
/* recurse to get error handling */ /* recurse to get error handling */
on_readable(exec_ctx, arg, error); on_readable(exec_ctx, arg, error);
} }
if (n == NULL) {
/* try again - queue in an inconsistant state */
wakeup(exec_ctx, workqueue);
} else {
switch (gpr_atm_full_fetch_add(&workqueue->state, -2)) {
case 3: // had one count, one unorphaned --> done, unorphaned
break;
case 2: // had one count, one orphaned --> done, orphaned
workqueue_destroy(exec_ctx, workqueue);
break;
case 1:
case 0:
// these values are illegal - representing an already done or
// deleted workqueue
GPR_UNREACHABLE_CODE(break);
default:
// schedule a wakeup since there's more to do
wakeup(exec_ctx, workqueue);
}
grpc_closure *cl = (grpc_closure *)n;
grpc_error *clerr = cl->error;
cl->cb(exec_ctx, cl->cb_arg, clerr);
GRPC_ERROR_UNREF(clerr);
}
} }
} }
void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
grpc_closure *closure, grpc_error *error) { grpc_closure *closure, grpc_error *error) {
grpc_error *push_error = GRPC_ERROR_NONE; gpr_atm last = gpr_atm_full_fetch_add(&workqueue->state, 2);
gpr_mu_lock(&workqueue->mu); GPR_ASSERT(last & 1);
if (grpc_closure_list_empty(workqueue->closure_list)) { gpr_mpscq_push(&workqueue->queue, &closure->next_data.atm_next);
push_error = grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd); if (last == 1) {
} wakeup(exec_ctx, workqueue);
grpc_closure_list_append(&workqueue->closure_list, closure, error);
if (push_error != GRPC_ERROR_NONE) {
const char *msg = grpc_error_string(push_error);
gpr_log(GPR_ERROR, "Failed to push to workqueue: %s", msg);
grpc_error_free_string(msg);
grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL);
} }
gpr_mu_unlock(&workqueue->mu);
} }
#endif /* GPR_POSIX_SOCKET */ #endif /* GPR_POSIX_SOCKET */
...@@ -35,14 +35,17 @@ ...@@ -35,14 +35,17 @@
#define GRPC_CORE_LIB_IOMGR_WORKQUEUE_POSIX_H #define GRPC_CORE_LIB_IOMGR_WORKQUEUE_POSIX_H
#include "src/core/lib/iomgr/wakeup_fd_posix.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h"
#include "src/core/lib/support/mpscq.h"
struct grpc_fd; struct grpc_fd;
struct grpc_workqueue { struct grpc_workqueue {
gpr_refcount refs; gpr_refcount refs;
gpr_mpscq queue;
gpr_mu mu; // state is:
grpc_closure_list closure_list; // lower bit - zero if orphaned
// other bits - number of items enqueued
gpr_atm state;
grpc_wakeup_fd wakeup_fd; grpc_wakeup_fd wakeup_fd;
struct grpc_fd *wakeup_read_fd; struct grpc_fd *wakeup_read_fd;
......
...@@ -42,8 +42,6 @@ ...@@ -42,8 +42,6 @@
// context, which is at least correct, if not performant or in the spirit of // context, which is at least correct, if not performant or in the spirit of
// workqueues. // workqueues.
void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {}
#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG #ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
void grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, int line, void grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, int line,
const char *reason) {} const char *reason) {}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment