diff --git a/src/core/lib/iomgr/async_execution_lock.c b/src/core/lib/iomgr/async_execution_lock.c index 62ece4cc8c7f74d8f251ca32ec0764adf217d79a..71608b2bfe5b7d9ce6c9ecb515712c60906bf1a7 100644 --- a/src/core/lib/iomgr/async_execution_lock.c +++ b/src/core/lib/iomgr/async_execution_lock.c @@ -40,11 +40,16 @@ #define NO_CONSUMER ((gpr_atm)1) +static void bad_action(grpc_exec_ctx *exec_ctx, void *arg) { + GPR_UNREACHABLE_CODE(return ); +} + void grpc_aelock_init(grpc_aelock *lock, grpc_workqueue *optional_workqueue) { lock->optional_workqueue = optional_workqueue; gpr_atm_no_barrier_store(&lock->head, NO_CONSUMER); - lock->tail = &lock->stub; - gpr_atm_no_barrier_store(&lock->stub.next, 0); + gpr_atm_no_barrier_store(&lock->tombstone.next, 0); + lock->tombstone.action = bad_action; + lock->tail = &lock->tombstone; } void grpc_aelock_destroy(grpc_aelock *lock) { @@ -56,15 +61,35 @@ static void finish(grpc_exec_ctx *exec_ctx, grpc_aelock *lock) { grpc_aelock_qnode *tail = lock->tail; grpc_aelock_qnode *next = (grpc_aelock_qnode *)gpr_atm_acq_load(&tail->next); - if (next == NULL) { - if (gpr_atm_rel_cas(&lock->head, (gpr_atm)&lock->stub, NO_CONSUMER)) { - return; + if (tail == &lock->tombstone) { + if (next == NULL) { + if (gpr_atm_rel_cas(&lock->head, (gpr_atm)&lock->tombstone, + NO_CONSUMER)) { + return; + } + } else { + lock->tail = next; + tail = next; + next = (grpc_aelock_qnode *)gpr_atm_acq_load(&tail->next); } - } else { + } + if (next != NULL) { lock->tail = next; - - next->action(exec_ctx, next->arg); - gpr_free(next); + tail->action(exec_ctx, tail->arg); + gpr_free(tail); + } else { + grpc_aelock_qnode *head = + (grpc_aelock_qnode *)gpr_atm_acq_load(&lock->head); + if (head != tail) { + // TODO(ctiller): consider sleeping? + continue; + } + gpr_atm_no_barrier_store(&lock->tombstone.next, 0); + while (!gpr_atm_rel_cas(&lock->head, (gpr_atm)head, + (gpr_atm)&lock->tombstone)) { + head = (grpc_aelock_qnode *)gpr_atm_acq_load(&lock->head); + } + gpr_atm_rel_store(&head->next, (gpr_atm)&lock->tombstone); } } } @@ -72,11 +97,11 @@ static void finish(grpc_exec_ctx *exec_ctx, grpc_aelock *lock) { void grpc_aelock_execute(grpc_exec_ctx *exec_ctx, grpc_aelock *lock, grpc_aelock_action action, void *arg, size_t sizeof_arg) { - gpr_atm cur; + gpr_atm head; retry_top: - cur = gpr_atm_acq_load(&lock->head); - if (cur == NO_CONSUMER) { - if (!gpr_atm_rel_cas(&lock->head, NO_CONSUMER, (gpr_atm)&lock->stub)) { + head = gpr_atm_acq_load(&lock->head); + if (head == NO_CONSUMER) { + if (!gpr_atm_rel_cas(&lock->head, NO_CONSUMER, (gpr_atm)&lock->tombstone)) { goto retry_top; } action(exec_ctx, arg); @@ -86,18 +111,19 @@ retry_top: grpc_aelock_qnode *n = gpr_malloc(sizeof(*n) + sizeof_arg); n->action = action; - gpr_atm_no_barrier_store(&n->next, 0); if (sizeof_arg > 0) { memcpy(n + 1, arg, sizeof_arg); n->arg = n + 1; } else { n->arg = arg; } - while (!gpr_atm_rel_cas(&lock->head, cur, (gpr_atm)n)) { + gpr_atm_no_barrier_store(&n->next, 0); + while (!gpr_atm_rel_cas(&lock->head, head, (gpr_atm)n)) { retry_queue_load: - cur = gpr_atm_acq_load(&lock->head); - if (cur == NO_CONSUMER) { - if (!gpr_atm_rel_cas(&lock->head, NO_CONSUMER, (gpr_atm)&lock->stub)) { + head = gpr_atm_acq_load(&lock->head); + if (head == NO_CONSUMER) { + if (!gpr_atm_rel_cas(&lock->head, NO_CONSUMER, + (gpr_atm)&lock->tombstone)) { goto retry_queue_load; } gpr_free(n); @@ -106,5 +132,5 @@ retry_top: return; // early out } } - gpr_atm_no_barrier_store(&((grpc_aelock_qnode *)cur)->next, (gpr_atm)n); + gpr_atm_rel_store(&((grpc_aelock_qnode *)head)->next, (gpr_atm)n); } diff --git a/src/core/lib/iomgr/async_execution_lock.h b/src/core/lib/iomgr/async_execution_lock.h index cb4de61349b6d575ad3695c631b9559078c2cdef..cd1c4811fe7599e9a481f3620815fb4695c6b726 100644 --- a/src/core/lib/iomgr/async_execution_lock.h +++ b/src/core/lib/iomgr/async_execution_lock.h @@ -52,7 +52,7 @@ typedef struct grpc_aelock { // grpc_aelock_qnode* gpr_atm head; grpc_aelock_qnode *tail; - grpc_aelock_qnode stub; + grpc_aelock_qnode tombstone; } grpc_aelock; void grpc_aelock_init(grpc_aelock *lock, grpc_workqueue *optional_workqueue); diff --git a/test/core/iomgr/async_execution_lock_test.c b/test/core/iomgr/async_execution_lock_test.c index 405f89362c669ecbd79dc5bebefbc3c652c64156..d7f1823412a7479fffa664450f4a503812c67893 100644 --- a/test/core/iomgr/async_execution_lock_test.c +++ b/test/core/iomgr/async_execution_lock_test.c @@ -65,6 +65,11 @@ static void test_execute_one(void) { grpc_aelock_destroy(&lock); } +typedef struct { + size_t ctr; + grpc_aelock *lock; +} thd_args; + typedef struct { size_t *ctr; size_t value; @@ -72,31 +77,39 @@ typedef struct { static void check_one(grpc_exec_ctx *exec_ctx, void *a) { ex_args *args = a; + // gpr_log(GPR_DEBUG, "*%p=%d; step %d", args->ctr, *args->ctr, args->value); GPR_ASSERT(*args->ctr == args->value - 1); *args->ctr = args->value; } -static void execute_many_loop(void *lock) { +static void execute_many_loop(void *a) { + thd_args *args = a; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - for (size_t i = 0; i < 100; i++) { - size_t ctr = 0; - for (size_t j = 1; j <= 1000; j++) { - ex_args args = {&ctr, j}; - grpc_aelock_execute(&exec_ctx, lock, check_one, &args, sizeof(args)); + size_t n = 1; + for (size_t i = 0; i < 10; i++) { + for (size_t j = 0; j < 1000; j++) { + ex_args c = {&args->ctr, n++}; + grpc_aelock_execute(&exec_ctx, args->lock, check_one, &c, sizeof(c)); grpc_exec_ctx_flush(&exec_ctx); } - gpr_sleep_until(GRPC_TIMEOUT_MILLIS_TO_DEADLINE(1)); + gpr_sleep_until(GRPC_TIMEOUT_MILLIS_TO_DEADLINE(100)); } + grpc_exec_ctx_finish(&exec_ctx); } static void test_execute_many(void) { + gpr_log(GPR_DEBUG, "test_execute_many"); + grpc_aelock lock; gpr_thd_id thds[100]; + thd_args ta[GPR_ARRAY_SIZE(thds)]; grpc_aelock_init(&lock, NULL); for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) { gpr_thd_options options = gpr_thd_options_default(); gpr_thd_options_set_joinable(&options); - GPR_ASSERT(gpr_thd_new(&thds[i], execute_many_loop, &lock, &options)); + ta[i].ctr = 0; + ta[i].lock = &lock; + GPR_ASSERT(gpr_thd_new(&thds[i], execute_many_loop, &ta[i], &options)); } for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) { gpr_thd_join(thds[i]);