Skip to content
Snippets Groups Projects
Commit 40664c71 authored by Craig Tiller's avatar Craig Tiller
Browse files

fix workqueue

parent 0b4c9011
No related branches found
No related tags found
No related merge requests found
......@@ -278,9 +278,9 @@ static grpc_fd *fd_create(int fd, const char *name) {
GRPC_LOG_IF_ERROR("fd_create",
grpc_wakeup_fd_init(&new_fd->workqueue_wakeup_fd));
new_fd->workqueue_scheduler.vtable = &workqueue_scheduler_vtable;
new_fd->workqueue_read_mu = GPR_SPINLOCK_INIT;
new_fd->workqueue_read_mu = GPR_SPINLOCK_INITIALIZER;
gpr_mpscq_init(&new_fd->workqueue_items);
gpr_atm_no_barrier_store(&new_fd->workqueue_item_count);
gpr_atm_no_barrier_store(&new_fd->workqueue_item_count, 0);
new_fd->freelist_next = NULL;
new_fd->on_done_closure = NULL;
......@@ -414,14 +414,14 @@ static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
GPR_TIMER_BEGIN("workqueue.enqueue", 0);
grpc_fd *fd = (grpc_fd *)(((char *)closure->scheduler) -
offsetof(grpc_fd, workqueue_scheduler));
REF_BY(fd, 2);
REF_BY(fd, 2, "workqueue_enqueue");
gpr_atm last = gpr_atm_no_barrier_fetch_add(&fd->workqueue_item_count, 1);
closure->error_data.error = error;
gpr_mpscq_push(&fd->workqueue_items, &closure->next_data.atm_next);
if (last == 0) {
workqueue_wakeup(fd);
}
UNREF_BY(fd, 2);
UNREF_BY(fd, 2, "workqueue_enqueue");
}
static void fd_invoke_workqueue(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
......@@ -430,15 +430,14 @@ static void fd_invoke_workqueue(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
gpr_mpscq_node *n = gpr_mpscq_pop(&fd->workqueue_items);
gpr_spinlock_unlock(&fd->workqueue_read_mu);
if (n != NULL) {
if (gpr_atm_full_fetch_add(&pi->workqueue_item_count, -1) > 1) {
if (gpr_atm_full_fetch_add(&fd->workqueue_item_count, -1) > 1) {
workqueue_wakeup(fd);
}
grpc_closure *c = (grpc_closure *)n;
grpc_error *error = c->error_data.error;
c->cb(exec_ctx, c->cb_arg, error);
GRPC_ERROR_UNREF(error);
return true;
} else if (gpr_atm_no_barrier_load(&pi->workqueue_item_count) > 0) {
} else if (gpr_atm_no_barrier_load(&fd->workqueue_item_count) > 0) {
/* n == NULL might mean there's work but it's not available to be popped
* yet - try to ensure another workqueue wakes up to check shortly if so
*/
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment