From 86037cd0e7a1e26d77be46bedc45d6d6167d98a7 Mon Sep 17 00:00:00 2001 From: Craig Tiller <ctiller@google.com> Date: Fri, 2 Sep 2016 19:58:43 -0700 Subject: [PATCH] fixes --- src/core/lib/iomgr/combiner.c | 22 +++++++--- src/core/lib/iomgr/error.h | 8 +++- src/core/lib/iomgr/tcp_posix.c | 2 +- src/core/lib/iomgr/workqueue_posix.c | 62 +++++++++++++++------------- 4 files changed, 57 insertions(+), 37 deletions(-) diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c index 981d3348ce..721db6337e 100644 --- a/src/core/lib/iomgr/combiner.c +++ b/src/core/lib/iomgr/combiner.c @@ -112,7 +112,8 @@ void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { } } -static void queue_on_exec_ctx(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { +static void push_last_on_exec_ctx(grpc_exec_ctx *exec_ctx, + grpc_combiner *lock) { lock->next_combiner_on_this_exec_ctx = NULL; if (exec_ctx->active_combiner == NULL) { exec_ctx->active_combiner = exec_ctx->last_combiner = lock; @@ -122,6 +123,15 @@ static void queue_on_exec_ctx(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { } } +static void push_first_on_exec_ctx(grpc_exec_ctx *exec_ctx, + grpc_combiner *lock) { + lock->next_combiner_on_this_exec_ctx = exec_ctx->active_combiner; + exec_ctx->active_combiner = lock; + if (lock->next_combiner_on_this_exec_ctx == NULL) { + exec_ctx->last_combiner = lock; + } +} + void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, grpc_closure *cl, grpc_error *error, bool covered_by_poller) { @@ -140,7 +150,7 @@ void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, if (last == 1) { // code will be written when the exec_ctx calls // grpc_combiner_continue_exec_ctx - queue_on_exec_ctx(exec_ctx, lock); + push_last_on_exec_ctx(exec_ctx, lock); } GPR_TIMER_END("combiner.execute", 0); } @@ -155,7 +165,7 @@ static void move_next(grpc_exec_ctx *exec_ctx) { static void offload(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_combiner *lock = arg; - queue_on_exec_ctx(exec_ctx, lock); + push_last_on_exec_ctx(exec_ctx, lock); } static void queue_offload(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { @@ -230,10 +240,11 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { } GPR_TIMER_MARK("unref", 0); + move_next(exec_ctx); + lock->time_to_execute_final_list = false; gpr_atm old_state = gpr_atm_full_fetch_add(&lock->state, -2); GRPC_COMBINER_TRACE( gpr_log(GPR_DEBUG, "C:%p finish old_state=%" PRIdPTR, lock, old_state)); - lock->time_to_execute_final_list = false; switch (old_state) { default: // we have multiple queued work items: just continue executing them @@ -245,11 +256,9 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { } break; case 3: // had one count, one unorphaned --> unlocked unorphaned - move_next(exec_ctx); GPR_TIMER_END("combiner.continue_exec_ctx", 0); return true; case 2: // and one count, one orphaned --> unlocked and orphaned - move_next(exec_ctx); really_destroy(exec_ctx, lock); GPR_TIMER_END("combiner.continue_exec_ctx", 0); return true; @@ -260,6 +269,7 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { GPR_TIMER_END("combiner.continue_exec_ctx", 0); GPR_UNREACHABLE_CODE(return true); } + push_first_on_exec_ctx(exec_ctx, lock); GPR_TIMER_END("combiner.continue_exec_ctx", 0); return true; } diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h index 6c769accdb..2ab3ef9f40 100644 --- a/src/core/lib/iomgr/error.h +++ b/src/core/lib/iomgr/error.h @@ -123,9 +123,13 @@ typedef enum { GRPC_ERROR_TIME_CREATED, } grpc_error_times; +/// The following "special" errors can be propagated without allocating memory. +/// They are always even so that other code (particularly combiner locks) can +/// safely use the lower bit for themselves. + #define GRPC_ERROR_NONE ((grpc_error *)NULL) -#define GRPC_ERROR_OOM ((grpc_error *)1) -#define GRPC_ERROR_CANCELLED ((grpc_error *)2) +#define GRPC_ERROR_OOM ((grpc_error *)2) +#define GRPC_ERROR_CANCELLED ((grpc_error *)4) const char *grpc_error_string(grpc_error *error); void grpc_error_free_string(const char *str); diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index ffdc7c7b42..00fd77679a 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -279,7 +279,7 @@ static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, tcp->finished_edge = false; grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure); } else { - grpc_exec_ctx_sched(exec_ctx, &tcp->read_closure, GRPC_ERROR_NONE, grpc_fd_get_workqueue(tcp->em_fd)); + grpc_exec_ctx_sched(exec_ctx, &tcp->read_closure, GRPC_ERROR_NONE, NULL); } } diff --git a/src/core/lib/iomgr/workqueue_posix.c b/src/core/lib/iomgr/workqueue_posix.c index 6c27c3b41e..c7d4fc6423 100644 --- a/src/core/lib/iomgr/workqueue_posix.c +++ b/src/core/lib/iomgr/workqueue_posix.c @@ -76,7 +76,8 @@ static void workqueue_destroy(grpc_exec_ctx *exec_ctx, static void workqueue_orphan(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { - if (gpr_atm_full_fetch_add(&workqueue->state, -1) == 1) { + gpr_atm last = gpr_atm_full_fetch_add(&workqueue->state, -1); + if (last == 1) { workqueue_destroy(exec_ctx, workqueue); } } @@ -143,37 +144,40 @@ static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { gpr_free(workqueue); } else { error = grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd); - gpr_mpscq_node *n = gpr_mpscq_pop(&workqueue->queue); - if (error == GRPC_ERROR_NONE) { - grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd, - &workqueue->read_closure); - } else { + if (error != GRPC_ERROR_NONE) { /* recurse to get error handling */ on_readable(exec_ctx, arg, error); - } - if (n == NULL) { - /* try again - queue in an inconsistant state */ - wakeup(exec_ctx, workqueue); } else { - switch (gpr_atm_full_fetch_add(&workqueue->state, -2)) { - case 3: // had one count, one unorphaned --> done, unorphaned - break; - case 2: // had one count, one orphaned --> done, orphaned - workqueue_destroy(exec_ctx, workqueue); - break; - case 1: - case 0: - // these values are illegal - representing an already done or - // deleted workqueue - GPR_UNREACHABLE_CODE(break); - default: - // schedule a wakeup since there's more to do - wakeup(exec_ctx, workqueue); + gpr_mpscq_node *n = gpr_mpscq_pop(&workqueue->queue); + if (n == NULL) { + /* try again - queue in an ephemerally inconsistent state */ + wakeup(exec_ctx, workqueue); + grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd, + &workqueue->read_closure); + } else { + gpr_atm last = gpr_atm_full_fetch_add(&workqueue->state, -2); + switch (last) { + default: + // schedule a wakeup since there's more to do + wakeup(exec_ctx, workqueue); + break; + case 3: // had one count, one unorphaned --> done, unorphaned + break; + case 2: // had one count, one orphaned --> done, orphaned + workqueue_destroy(exec_ctx, workqueue); + break; + case 1: + case 0: + // these values are illegal - representing an already done or + // deleted workqueue + GPR_UNREACHABLE_CODE(break); + } + grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd, + &workqueue->read_closure); + grpc_closure *cl = (grpc_closure *)n; + grpc_error *clerr = cl->error_data.error; + grpc_closure_run(exec_ctx, cl, clerr); } - grpc_closure *cl = (grpc_closure *)n; - grpc_error *clerr = cl->error_data.error; - cl->cb(exec_ctx, cl->cb_arg, clerr); - GRPC_ERROR_UNREF(clerr); } } @@ -183,6 +187,7 @@ static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, grpc_closure *closure, grpc_error *error) { GPR_TIMER_BEGIN("workqueue.enqueue", 0); + GRPC_WORKQUEUE_REF(workqueue, "enqueue"); gpr_atm last = gpr_atm_full_fetch_add(&workqueue->state, 2); GPR_ASSERT(last & 1); closure->error_data.error = error; @@ -190,6 +195,7 @@ void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, if (last == 1) { wakeup(exec_ctx, workqueue); } + GRPC_WORKQUEUE_UNREF(exec_ctx, workqueue, "enqueue"); GPR_TIMER_END("workqueue.enqueue", 0); } -- GitLab