diff --git a/src/core/lib/iomgr/async_execution_lock.c b/src/core/lib/iomgr/async_execution_lock.c index 71608b2bfe5b7d9ce6c9ecb515712c60906bf1a7..00cd27504b7742f827dfc5b5a34b6d40eaeefae2 100644 --- a/src/core/lib/iomgr/async_execution_lock.c +++ b/src/core/lib/iomgr/async_execution_lock.c @@ -67,23 +67,30 @@ static void finish(grpc_exec_ctx *exec_ctx, grpc_aelock *lock) { NO_CONSUMER)) { return; } + // TODO(ctiller): consider sleeping + continue; } else { + // skip the tombstone: we'll re-add it later lock->tail = next; tail = next; next = (grpc_aelock_qnode *)gpr_atm_acq_load(&tail->next); } } if (next != NULL) { + // found a node lock->tail = next; tail->action(exec_ctx, tail->arg); gpr_free(tail); } else { + // nothing there: might be in an incosistant state grpc_aelock_qnode *head = (grpc_aelock_qnode *)gpr_atm_acq_load(&lock->head); if (head != tail) { + // non-empty list: spin for a bit // TODO(ctiller): consider sleeping? continue; } + // must have swallowed tombstone above: re-add it gpr_atm_no_barrier_store(&lock->tombstone.next, 0); while (!gpr_atm_rel_cas(&lock->head, (gpr_atm)head, (gpr_atm)&lock->tombstone)) { @@ -117,7 +124,7 @@ retry_top: } else { n->arg = arg; } - gpr_atm_no_barrier_store(&n->next, 0); + gpr_atm_rel_store(&n->next, 0); while (!gpr_atm_rel_cas(&lock->head, head, (gpr_atm)n)) { retry_queue_load: head = gpr_atm_acq_load(&lock->head); @@ -132,5 +139,6 @@ retry_top: return; // early out } } - gpr_atm_rel_store(&((grpc_aelock_qnode *)head)->next, (gpr_atm)n); + GPR_ASSERT(gpr_atm_rel_cas(&((grpc_aelock_qnode *)head)->next, 0, (gpr_atm)n)); +// gpr_atm_rel_store(&((grpc_aelock_qnode *)head)->next, (gpr_atm)n); } diff --git a/test/core/iomgr/async_execution_lock_test.c b/test/core/iomgr/async_execution_lock_test.c index d7f1823412a7479fffa664450f4a503812c67893..8c9665a45871d4d0e25fc3d71ec63b623670de87 100644 --- a/test/core/iomgr/async_execution_lock_test.c +++ b/test/core/iomgr/async_execution_lock_test.c @@ -87,7 +87,7 @@ static void execute_many_loop(void *a) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; size_t n = 1; for (size_t i = 0; i < 10; i++) { - for (size_t j = 0; j < 1000; j++) { + for (size_t j = 0; j < 10000; j++) { ex_args c = {&args->ctr, n++}; grpc_aelock_execute(&exec_ctx, args->lock, check_one, &c, sizeof(c)); grpc_exec_ctx_flush(&exec_ctx);