From d0a8ae126625856aa84ba151fc5584b60097a65e Mon Sep 17 00:00:00 2001 From: Craig Tiller <ctiller@google.com> Date: Thu, 18 Feb 2016 08:01:19 -0800 Subject: [PATCH] Move worker into pollset --- src/core/iomgr/pollset.h | 2 +- src/core/iomgr/pollset_posix.c | 38 ++++++++++--------- .../security/google_default_credentials.c | 2 +- src/core/surface/completion_queue.c | 12 +++--- test/core/httpcli/httpcli_test.c | 4 +- test/core/httpcli/httpscli_test.c | 4 +- test/core/iomgr/endpoint_tests.c | 2 +- test/core/iomgr/fd_posix_test.c | 8 ++-- test/core/iomgr/tcp_client_posix_test.c | 6 +-- test/core/iomgr/tcp_posix_test.c | 12 +++--- test/core/iomgr/tcp_server_posix_test.c | 2 +- test/core/security/oauth2_utils.c | 2 +- test/core/util/port_posix.c | 4 +- test/core/util/test_tcp_server.c | 2 +- 14 files changed, 52 insertions(+), 48 deletions(-) diff --git a/src/core/iomgr/pollset.h b/src/core/iomgr/pollset.h index c6b0214dea..0c0efad760 100644 --- a/src/core/iomgr/pollset.h +++ b/src/core/iomgr/pollset.h @@ -83,7 +83,7 @@ void grpc_pollset_destroy(grpc_pollset *pollset); pollset lock */ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker *worker, gpr_timespec now, + grpc_pollset_worker **worker, gpr_timespec now, gpr_timespec deadline); /* Break one polling thread out of polling work for this pollset. diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index 1063727248..ee7e9f48f4 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -246,8 +246,11 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { } void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker *worker, gpr_timespec now, + grpc_pollset_worker **worker_hdl, gpr_timespec now, gpr_timespec deadline) { + grpc_pollset_worker worker; + *worker_hdl = &worker; + /* pollset->mu already held */ int added_worker = 0; int locked = 1; @@ -255,16 +258,16 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, int keep_polling = 0; GPR_TIMER_BEGIN("grpc_pollset_work", 0); /* this must happen before we (potentially) drop pollset->mu */ - worker->next = worker->prev = NULL; - worker->reevaluate_polling_on_wakeup = 0; + worker.next = worker.prev = NULL; + worker.reevaluate_polling_on_wakeup = 0; if (pollset->local_wakeup_cache != NULL) { - worker->wakeup_fd = pollset->local_wakeup_cache; - pollset->local_wakeup_cache = worker->wakeup_fd->next; + worker.wakeup_fd = pollset->local_wakeup_cache; + pollset->local_wakeup_cache = worker.wakeup_fd->next; } else { - worker->wakeup_fd = gpr_malloc(sizeof(*worker->wakeup_fd)); - grpc_wakeup_fd_init(&worker->wakeup_fd->fd); + worker.wakeup_fd = gpr_malloc(sizeof(*worker.wakeup_fd)); + grpc_wakeup_fd_init(&worker.wakeup_fd->fd); } - worker->kicked_specifically = 0; + worker.kicked_specifically = 0; /* If there's work waiting for the pollset to be idle, and the pollset is idle, then do that work */ if (!grpc_pollset_has_workers(pollset) && @@ -293,13 +296,13 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, keep_polling = 0; if (!pollset->kicked_without_pollers) { if (!added_worker) { - push_front_worker(pollset, worker); + push_front_worker(pollset, &worker); added_worker = 1; - gpr_tls_set(&g_current_thread_worker, (intptr_t)worker); + gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker); } gpr_tls_set(&g_current_thread_poller, (intptr_t)pollset); GPR_TIMER_BEGIN("maybe_work_and_unlock", 0); - pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, worker, + pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, &worker, deadline, now); GPR_TIMER_END("maybe_work_and_unlock", 0); locked = 0; @@ -321,10 +324,10 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, /* If we're forced to re-evaluate polling (via grpc_pollset_kick with GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force a loop */ - if (worker->reevaluate_polling_on_wakeup) { - worker->reevaluate_polling_on_wakeup = 0; + if (worker.reevaluate_polling_on_wakeup) { + worker.reevaluate_polling_on_wakeup = 0; pollset->kicked_without_pollers = 0; - if (queued_work || worker->kicked_specifically) { + if (queued_work || worker.kicked_specifically) { /* If there's queued work on the list, then set the deadline to be immediate so we get back out of the polling loop quickly */ deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC); @@ -333,12 +336,12 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } } if (added_worker) { - remove_worker(pollset, worker); + remove_worker(pollset, &worker); gpr_tls_set(&g_current_thread_worker, 0); } /* release wakeup fd to the local pool */ - worker->wakeup_fd->next = pollset->local_wakeup_cache; - pollset->local_wakeup_cache = worker->wakeup_fd; + worker.wakeup_fd->next = pollset->local_wakeup_cache; + pollset->local_wakeup_cache = worker.wakeup_fd; /* check shutdown conditions */ if (pollset->shutting_down) { if (grpc_pollset_has_workers(pollset)) { @@ -360,6 +363,7 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, gpr_mu_lock(&pollset->mu); } } + *worker_hdl = NULL; GPR_TIMER_END("grpc_pollset_work", 0); } diff --git a/src/core/security/google_default_credentials.c b/src/core/security/google_default_credentials.c index f3ac14568a..cc9c958298 100644 --- a/src/core/security/google_default_credentials.c +++ b/src/core/security/google_default_credentials.c @@ -122,7 +122,7 @@ static int is_stack_running_on_compute_engine(void) { called once for the lifetime of the process by the default credentials. */ gpr_mu_lock(GRPC_POLLSET_MU(&detector.pollset)); while (!detector.is_done) { - grpc_pollset_worker worker; + grpc_pollset_worker *worker = NULL; grpc_pollset_work(&exec_ctx, &detector.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), gpr_inf_future(GPR_CLOCK_MONOTONIC)); diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index de295ab941..376feb1bbe 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -50,7 +50,7 @@ #include <grpc/support/time.h> typedef struct { - grpc_pollset_worker *worker; + grpc_pollset_worker **worker; void *tag; } plucker; @@ -252,7 +252,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, pluck_worker = NULL; for (i = 0; i < cc->num_pluckers; i++) { if (cc->pluckers[i].tag == tag) { - pluck_worker = cc->pluckers[i].worker; + pluck_worker = *cc->pluckers[i].worker; break; } } @@ -275,7 +275,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, gpr_timespec deadline, void *reserved) { grpc_event ret; - grpc_pollset_worker worker; + grpc_pollset_worker *worker = NULL; int first_loop = 1; gpr_timespec now; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; @@ -348,7 +348,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, } static int add_plucker(grpc_completion_queue *cc, void *tag, - grpc_pollset_worker *worker) { + grpc_pollset_worker **worker) { if (cc->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) { return 0; } @@ -359,7 +359,7 @@ static int add_plucker(grpc_completion_queue *cc, void *tag, } static void del_plucker(grpc_completion_queue *cc, void *tag, - grpc_pollset_worker *worker) { + grpc_pollset_worker **worker) { int i; for (i = 0; i < cc->num_pluckers; i++) { if (cc->pluckers[i].tag == tag && cc->pluckers[i].worker == worker) { @@ -376,7 +376,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, grpc_event ret; grpc_cq_completion *c; grpc_cq_completion *prev; - grpc_pollset_worker worker; + grpc_pollset_worker *worker = NULL; gpr_timespec now; int first_loop = 1; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; diff --git a/test/core/httpcli/httpcli_test.c b/test/core/httpcli/httpcli_test.c index 651ef1fa3b..e954a920b0 100644 --- a/test/core/httpcli/httpcli_test.c +++ b/test/core/httpcli/httpcli_test.c @@ -89,7 +89,7 @@ static void test_get(int port) { on_finish, (void *)42); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (!g_done) { - grpc_pollset_worker worker; + grpc_pollset_worker *worker = NULL; grpc_pollset_work(&exec_ctx, &g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), n_seconds_time(20)); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); @@ -120,7 +120,7 @@ static void test_post(int port) { n_seconds_time(15), on_finish, (void *)42); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (!g_done) { - grpc_pollset_worker worker; + grpc_pollset_worker *worker = NULL; grpc_pollset_work(&exec_ctx, &g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), n_seconds_time(20)); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); diff --git a/test/core/httpcli/httpscli_test.c b/test/core/httpcli/httpscli_test.c index db41be17e7..9f31eae278 100644 --- a/test/core/httpcli/httpscli_test.c +++ b/test/core/httpcli/httpscli_test.c @@ -90,7 +90,7 @@ static void test_get(int port) { on_finish, (void *)42); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (!g_done) { - grpc_pollset_worker worker; + grpc_pollset_worker *worker = NULL; grpc_pollset_work(&exec_ctx, &g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), n_seconds_time(20)); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); @@ -122,7 +122,7 @@ static void test_post(int port) { n_seconds_time(15), on_finish, (void *)42); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (!g_done) { - grpc_pollset_worker worker; + grpc_pollset_worker *worker = NULL; grpc_pollset_work(&exec_ctx, &g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), n_seconds_time(20)); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); diff --git a/test/core/iomgr/endpoint_tests.c b/test/core/iomgr/endpoint_tests.c index 1b6a78da9a..6ba67df3b1 100644 --- a/test/core/iomgr/endpoint_tests.c +++ b/test/core/iomgr/endpoint_tests.c @@ -234,7 +234,7 @@ static void read_and_write_test(grpc_endpoint_test_config config, gpr_mu_lock(GRPC_POLLSET_MU(g_pollset)); while (!state.read_done || !state.write_done) { - grpc_pollset_worker worker; + grpc_pollset_worker *worker = NULL; GPR_ASSERT(gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), deadline) < 0); grpc_pollset_work(&exec_ctx, g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), deadline); diff --git a/test/core/iomgr/fd_posix_test.c b/test/core/iomgr/fd_posix_test.c index 347a86af10..07761b6576 100644 --- a/test/core/iomgr/fd_posix_test.c +++ b/test/core/iomgr/fd_posix_test.c @@ -252,7 +252,7 @@ static void server_wait_and_shutdown(server *sv) { gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (!sv->done) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_pollset_worker worker; + grpc_pollset_worker *worker = NULL; grpc_pollset_work(&exec_ctx, &g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), gpr_inf_future(GPR_CLOCK_MONOTONIC)); @@ -366,7 +366,7 @@ static void client_start(grpc_exec_ctx *exec_ctx, client *cl, int port) { static void client_wait_and_shutdown(client *cl) { gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (!cl->done) { - grpc_pollset_worker worker; + grpc_pollset_worker *worker = NULL; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_pollset_work(&exec_ctx, &g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), @@ -467,7 +467,7 @@ static void test_grpc_fd_change(void) { /* And now wait for it to run. */ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (a.cb_that_ran == NULL) { - grpc_pollset_worker worker; + grpc_pollset_worker *worker = NULL; grpc_pollset_work(&exec_ctx, &g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), gpr_inf_future(GPR_CLOCK_MONOTONIC)); @@ -491,7 +491,7 @@ static void test_grpc_fd_change(void) { gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (b.cb_that_ran == NULL) { - grpc_pollset_worker worker; + grpc_pollset_worker *worker = NULL; grpc_pollset_work(&exec_ctx, &g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), gpr_inf_future(GPR_CLOCK_MONOTONIC)); diff --git a/test/core/iomgr/tcp_client_posix_test.c b/test/core/iomgr/tcp_client_posix_test.c index b57478059f..cdd5b096fb 100644 --- a/test/core/iomgr/tcp_client_posix_test.c +++ b/test/core/iomgr/tcp_client_posix_test.c @@ -121,7 +121,7 @@ void test_succeeds(void) { gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (g_connections_complete == connections_complete_before) { - grpc_pollset_worker worker; + grpc_pollset_worker *worker = NULL; grpc_pollset_work(&exec_ctx, &g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)); @@ -161,7 +161,7 @@ void test_fails(void) { /* wait for the connection callback to finish */ while (g_connections_complete == connections_complete_before) { - grpc_pollset_worker worker; + grpc_pollset_worker *worker = NULL; gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); gpr_timespec polling_deadline = test_deadline(); if (!grpc_timer_check(&exec_ctx, now, &polling_deadline)) { @@ -228,7 +228,7 @@ void test_times_out(void) { /* Make sure the event doesn't trigger early */ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); for (;;) { - grpc_pollset_worker worker; + grpc_pollset_worker *worker = NULL; gpr_timespec now = gpr_now(connect_deadline.clock_type); gpr_timespec continue_verifying_time = gpr_time_from_seconds(5, GPR_TIMESPAN); diff --git a/test/core/iomgr/tcp_posix_test.c b/test/core/iomgr/tcp_posix_test.c index d290c6bc3a..20b0b9e13b 100644 --- a/test/core/iomgr/tcp_posix_test.c +++ b/test/core/iomgr/tcp_posix_test.c @@ -190,7 +190,7 @@ static void read_test(size_t num_bytes, size_t slice_size) { gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (state.read_bytes < state.target_read_bytes) { - grpc_pollset_worker worker; + grpc_pollset_worker *worker = NULL; grpc_pollset_work(&exec_ctx, &g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), deadline); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); @@ -236,7 +236,7 @@ static void large_read_test(size_t slice_size) { gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (state.read_bytes < state.target_read_bytes) { - grpc_pollset_worker worker; + grpc_pollset_worker *worker = NULL; grpc_pollset_work(&exec_ctx, &g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), deadline); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); @@ -303,7 +303,7 @@ void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) { GPR_ASSERT(fcntl(fd, F_SETFL, flags & ~O_NONBLOCK) == 0); for (;;) { - grpc_pollset_worker worker; + grpc_pollset_worker *worker = NULL; gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); grpc_pollset_work(&exec_ctx, &g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), @@ -365,7 +365,7 @@ static void write_test(size_t num_bytes, size_t slice_size) { drain_socket_blocking(sv[0], num_bytes, num_bytes); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); for (;;) { - grpc_pollset_worker worker; + grpc_pollset_worker *worker = NULL; if (state.write_done) { break; } @@ -425,7 +425,7 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) { gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (state.read_bytes < state.target_read_bytes) { - grpc_pollset_worker worker; + grpc_pollset_worker *worker = NULL; grpc_pollset_work(&exec_ctx, &g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), deadline); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); @@ -439,7 +439,7 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) { grpc_tcp_destroy_and_release_fd(&exec_ctx, ep, &fd, &fd_released_cb); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (!fd_released_done) { - grpc_pollset_worker worker; + grpc_pollset_worker *worker = NULL; grpc_pollset_work(&exec_ctx, &g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), deadline); } diff --git a/test/core/iomgr/tcp_server_posix_test.c b/test/core/iomgr/tcp_server_posix_test.c index 272d97bfcb..aaab0fc359 100644 --- a/test/core/iomgr/tcp_server_posix_test.c +++ b/test/core/iomgr/tcp_server_posix_test.c @@ -183,7 +183,7 @@ static void tcp_connect(grpc_exec_ctx *exec_ctx, const struct sockaddr *remote, gpr_log(GPR_DEBUG, "wait"); while (g_nconnects == nconnects_before && gpr_time_cmp(deadline, gpr_now(deadline.clock_type)) > 0) { - grpc_pollset_worker worker; + grpc_pollset_worker *worker = NULL; grpc_pollset_work(exec_ctx, &g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), deadline); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); diff --git a/test/core/security/oauth2_utils.c b/test/core/security/oauth2_utils.c index 55ac31e62c..4dd595df95 100644 --- a/test/core/security/oauth2_utils.c +++ b/test/core/security/oauth2_utils.c @@ -95,7 +95,7 @@ char *grpc_test_fetch_oauth2_token_with_credentials( gpr_mu_lock(GRPC_POLLSET_MU(&request.pollset)); while (!request.is_done) { - grpc_pollset_worker worker; + grpc_pollset_worker *worker = NULL; grpc_pollset_work(&exec_ctx, &request.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), gpr_inf_future(GPR_CLOCK_MONOTONIC)); diff --git a/test/core/util/port_posix.c b/test/core/util/port_posix.c index 4b31f810e5..ad21fe0f53 100644 --- a/test/core/util/port_posix.c +++ b/test/core/util/port_posix.c @@ -114,7 +114,7 @@ static void free_port_using_server(char *server, int port) { &pr); gpr_mu_lock(GRPC_POLLSET_MU(&pr.pollset)); while (!pr.done) { - grpc_pollset_worker worker; + grpc_pollset_worker *worker = NULL; grpc_pollset_work(&exec_ctx, &pr.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1)); @@ -280,7 +280,7 @@ static int pick_port_using_server(char *server) { grpc_exec_ctx_finish(&exec_ctx); gpr_mu_lock(GRPC_POLLSET_MU(&pr.pollset)); while (pr.port == -1) { - grpc_pollset_worker worker; + grpc_pollset_worker *worker = NULL; grpc_pollset_work(&exec_ctx, &pr.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1)); diff --git a/test/core/util/test_tcp_server.c b/test/core/util/test_tcp_server.c index 66470c0288..e99d5dcffd 100644 --- a/test/core/util/test_tcp_server.c +++ b/test/core/util/test_tcp_server.c @@ -85,7 +85,7 @@ void test_tcp_server_start(test_tcp_server *server, int port) { } void test_tcp_server_poll(test_tcp_server *server, int seconds) { - grpc_pollset_worker worker; + grpc_pollset_worker *worker = NULL; gpr_timespec deadline = gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_seconds(seconds, GPR_TIMESPAN)); -- GitLab