Skip to content
Snippets Groups Projects
Commit 061ef740 authored by Craig Tiller's avatar Craig Tiller
Browse files

Some fixes

parent d6887e0e
No related branches found
No related tags found
No related merge requests found
......@@ -1420,7 +1420,9 @@ static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx,
workqueue_maybe_wakeup(pi);
}
grpc_closure *c = (grpc_closure *)n;
grpc_closure_run(exec_ctx, c, c->error_data.error);
grpc_error *error = c->error_data.error;
c->cb(exec_ctx, c->cb_arg, error);
GRPC_ERROR_UNREF(error);
return true;
} else if (gpr_atm_no_barrier_load(&pi->workqueue_item_count) > 0) {
/* n == NULL might mean there's work but it's not available to be popped
......
......@@ -66,8 +66,10 @@ bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) {
exec_ctx->closure_list.head = exec_ctx->closure_list.tail = NULL;
while (c != NULL) {
grpc_closure *next = c->next_data.next;
grpc_error *error = c->error_data.error;
did_something = true;
grpc_closure_run(exec_ctx, c, c->error_data.error);
c->cb(exec_ctx, c->cb_arg, error);
GRPC_ERROR_UNREF(error);
c = next;
}
} else if (!grpc_combiner_continue_exec_ctx(exec_ctx)) {
......
......@@ -77,10 +77,18 @@ static void closure_exec_thread_func(void *ignored) {
gpr_mu_unlock(&g_executor.mu);
break;
} else {
grpc_closure_list_sched(&exec_ctx, &g_executor.closures);
grpc_closure *c = g_executor.closures.head;
grpc_closure_list_init(&g_executor.closures);
gpr_mu_unlock(&g_executor.mu);
while (c != NULL) {
grpc_closure *next = c->next_data.next;
grpc_error *error = c->error_data.error;
c->cb(&exec_ctx, c->cb_arg, error);
GRPC_ERROR_UNREF(error);
c = next;
}
grpc_exec_ctx_flush(&exec_ctx);
}
gpr_mu_unlock(&g_executor.mu);
grpc_exec_ctx_flush(&exec_ctx);
}
grpc_exec_ctx_finish(&exec_ctx);
}
......@@ -115,8 +123,6 @@ static void maybe_spawn_locked() {
static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
grpc_error *error) {
gpr_mu_lock(&g_executor.mu);
GPR_ASSERT(closure->scheduler == grpc_executor_scheduler);
closure->scheduler = grpc_schedule_on_exec_ctx;
if (g_executor.shutting_down == 0) {
grpc_closure_list_append(&g_executor.closures, closure, error);
maybe_spawn_locked();
......@@ -136,7 +142,15 @@ void grpc_executor_shutdown() {
* list below because we aren't accepting new work */
/* Execute pending callbacks, some may be performing cleanups */
grpc_closure_list_sched(&exec_ctx, &g_executor.closures);
grpc_closure *c = g_executor.closures.head;
grpc_closure_list_init(&g_executor.closures);
while (c != NULL) {
grpc_closure *next = c->next_data.next;
grpc_error *error = c->error_data.error;
c->cb(&exec_ctx, c->cb_arg, error);
GRPC_ERROR_UNREF(error);
c = next;
}
grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(grpc_closure_list_empty(g_executor.closures));
if (pending_join) {
......
......@@ -219,8 +219,8 @@ static void listen_cb(grpc_exec_ctx *exec_ctx, void *arg, /*=sv_arg*/
se->sv = sv;
se->em_fd = grpc_fd_create(fd, "listener");
grpc_pollset_add_fd(exec_ctx, g_pollset, se->em_fd);
se->session_read_closure.cb = session_read_cb;
se->session_read_closure.cb_arg = se;
grpc_closure_init(&se->session_read_closure, session_read_cb, se,
grpc_schedule_on_exec_ctx);
grpc_fd_notify_on_read(exec_ctx, se->em_fd, &se->session_read_closure);
grpc_fd_notify_on_read(exec_ctx, listen_em_fd, &sv->listen_closure);
......@@ -249,8 +249,8 @@ static int server_start(grpc_exec_ctx *exec_ctx, server *sv) {
sv->em_fd = grpc_fd_create(fd, "server");
grpc_pollset_add_fd(exec_ctx, g_pollset, sv->em_fd);
/* Register to be interested in reading from listen_fd. */
sv->listen_closure.cb = listen_cb;
sv->listen_closure.cb_arg = sv;
grpc_closure_init(&sv->listen_closure, listen_cb, sv,
grpc_schedule_on_exec_ctx);
grpc_fd_notify_on_read(exec_ctx, sv->em_fd, &sv->listen_closure);
return port;
......@@ -333,8 +333,8 @@ static void client_session_write(grpc_exec_ctx *exec_ctx, void *arg, /*client */
if (errno == EAGAIN) {
gpr_mu_lock(g_mu);
if (cl->client_write_cnt < CLIENT_TOTAL_WRITE_CNT) {
cl->write_closure.cb = client_session_write;
cl->write_closure.cb_arg = cl;
grpc_closure_init(&cl->write_closure, client_session_write, cl,
grpc_schedule_on_exec_ctx);
grpc_fd_notify_on_write(exec_ctx, cl->em_fd, &cl->write_closure);
cl->client_write_cnt++;
} else {
......@@ -459,10 +459,10 @@ static void test_grpc_fd_change(void) {
grpc_closure second_closure;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
first_closure.cb = first_read_callback;
first_closure.cb_arg = &a;
second_closure.cb = second_read_callback;
second_closure.cb_arg = &b;
grpc_closure_init(&first_closure, first_read_callback, &a,
grpc_schedule_on_exec_ctx);
grpc_closure_init(&second_closure, second_read_callback, &b,
grpc_schedule_on_exec_ctx);
init_change_data(&a);
init_change_data(&b);
......@@ -546,7 +546,8 @@ int main(int argc, char **argv) {
grpc_pollset_init(g_pollset, &g_mu);
test_grpc_fd();
test_grpc_fd_change();
grpc_closure_init(&destroyed, destroy_pollset, g_pollset, grpc_schedule_on_exec_ctx);
grpc_closure_init(&destroyed, destroy_pollset, g_pollset,
grpc_schedule_on_exec_ctx);
grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed);
grpc_exec_ctx_finish(&exec_ctx);
gpr_free(g_pollset);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment