From 48db18f9100cba50c1d0f528a64c315506d5d11e Mon Sep 17 00:00:00 2001 From: Craig Tiller <ctiller@google.com> Date: Mon, 2 Nov 2015 14:14:51 -0800 Subject: [PATCH] stream_op cleanup: iomgr closure, executor changes --- src/core/iomgr/closure.c | 23 ++++++++--------------- src/core/iomgr/closure.h | 19 ++++++++----------- src/core/iomgr/executor.c | 13 ++++--------- 3 files changed, 20 insertions(+), 35 deletions(-) diff --git a/src/core/iomgr/closure.c b/src/core/iomgr/closure.c index b4f1817de4..4aae52a454 100644 --- a/src/core/iomgr/closure.c +++ b/src/core/iomgr/closure.c @@ -39,18 +39,17 @@ void grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb, void *cb_arg) { closure->cb = cb; closure->cb_arg = cb_arg; - closure->next = NULL; + closure->final_data = 0; } void grpc_closure_list_add(grpc_closure_list *closure_list, grpc_closure *closure, int success) { if (closure == NULL) return; - closure->next = NULL; - closure->success = success; + closure->final_data = (success != 0); if (closure_list->head == NULL) { closure_list->head = closure; } else { - closure_list->tail->next = closure; + closure_list->tail->final_data |= (gpr_uintptr)closure; } closure_list->tail = closure; } @@ -66,22 +65,12 @@ void grpc_closure_list_move(grpc_closure_list *src, grpc_closure_list *dst) { if (dst->head == NULL) { *dst = *src; } else { - dst->tail->next = src->head; + dst->tail->final_data |= (gpr_uintptr)src->head; dst->tail = src->tail; } src->head = src->tail = NULL; } -grpc_closure *grpc_closure_list_pop(grpc_closure_list *list) { - grpc_closure *head; - if (list->head == NULL) { - return NULL; - } - head = list->head; - list->head = list->head->next; - return head; -} - typedef struct { grpc_iomgr_cb_func cb; void *cb_arg; @@ -103,3 +92,7 @@ grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg) { grpc_closure_init(&wc->wrapper, closure_wrapper, wc); return &wc->wrapper; } + +grpc_closure *grpc_closure_next(grpc_closure *closure) { + return (grpc_closure *)(closure->final_data & ~(gpr_uintptr)1); +} diff --git a/src/core/iomgr/closure.h b/src/core/iomgr/closure.h index 7a9f7ccad0..a1d738bf5a 100644 --- a/src/core/iomgr/closure.h +++ b/src/core/iomgr/closure.h @@ -34,7 +34,7 @@ #ifndef GRPC_INTERNAL_CORE_IOMGR_CLOSURE_H #define GRPC_INTERNAL_CORE_IOMGR_CLOSURE_H -#include <stddef.h> +#include <grpc/support/port_platform.h> struct grpc_closure; typedef struct grpc_closure grpc_closure; @@ -64,13 +64,10 @@ struct grpc_closure { /** Arguments to be passed to "cb". */ void *cb_arg; - /** Internal. A boolean indication to "cb" on the state of the iomgr. - * For instance, closures created during a shutdown would have this field set - * to false. */ - int success; - - /**< Internal. Do not touch */ - struct grpc_closure *next; + /** Once enqueued, contains in the lower bit the success of the closure, + and in the upper bits the pointer to the next closure in the list. + Before enqueing for execution, this is usable for scratch data. */ + gpr_uintptr final_data; }; /** Initializes \a closure with \a cb and \a cb_arg. */ @@ -91,10 +88,10 @@ void grpc_closure_list_add(grpc_closure_list *list, grpc_closure *closure, /** append all closures from \a src to \a dst and empty \a src. */ void grpc_closure_list_move(grpc_closure_list *src, grpc_closure_list *dst); -/** pop (return and remove) the head closure from \a list. */ -grpc_closure *grpc_closure_list_pop(grpc_closure_list *list); - /** return whether \a list is empty. */ int grpc_closure_list_empty(grpc_closure_list list); +/** return the next pointer for a queued closure list */ +grpc_closure *grpc_closure_next(grpc_closure *closure); + #endif /* GRPC_INTERNAL_CORE_IOMGR_CLOSURE_H */ diff --git a/src/core/iomgr/executor.c b/src/core/iomgr/executor.c index 457e5cdbac..00c68f7828 100644 --- a/src/core/iomgr/executor.c +++ b/src/core/iomgr/executor.c @@ -63,8 +63,6 @@ void grpc_executor_init() { /* thread body */ static void closure_exec_thread_func(void *ignored) { - grpc_closure *closure; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; while (1) { gpr_mu_lock(&g_executor.mu); @@ -72,16 +70,16 @@ static void closure_exec_thread_func(void *ignored) { gpr_mu_unlock(&g_executor.mu); break; } - closure = grpc_closure_list_pop(&g_executor.closures); - if (closure == NULL) { + if (grpc_closure_list_empty(g_executor.closures)) { /* no more work, time to die */ GPR_ASSERT(g_executor.busy == 1); g_executor.busy = 0; gpr_mu_unlock(&g_executor.mu); break; + } else { + grpc_exec_ctx_enqueue_list(&exec_ctx, &g_executor.closures); } gpr_mu_unlock(&g_executor.mu); - closure->cb(&exec_ctx, closure->cb_arg, closure->success); grpc_exec_ctx_flush(&exec_ctx); } grpc_exec_ctx_finish(&exec_ctx); @@ -125,7 +123,6 @@ void grpc_executor_enqueue(grpc_closure *closure, int success) { void grpc_executor_shutdown() { int pending_join; - grpc_closure *closure; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; gpr_mu_lock(&g_executor.mu); @@ -136,9 +133,7 @@ void grpc_executor_shutdown() { * list below because we aren't accepting new work */ /* Execute pending callbacks, some may be performing cleanups */ - while ((closure = grpc_closure_list_pop(&g_executor.closures)) != NULL) { - closure->cb(&exec_ctx, closure->cb_arg, closure->success); - } + grpc_exec_ctx_enqueue_list(&exec_ctx, &g_executor.closures); grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(grpc_closure_list_empty(g_executor.closures)); if (pending_join) { -- GitLab