Skip to content
Snippets Groups Projects
Commit d0a8ae12 authored by Craig Tiller's avatar Craig Tiller
Browse files

Move worker into pollset

parent 311445fd
Branches
Tags
No related merge requests found
......@@ -83,7 +83,7 @@ void grpc_pollset_destroy(grpc_pollset *pollset);
pollset
lock */
void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker *worker, gpr_timespec now,
grpc_pollset_worker **worker, gpr_timespec now,
gpr_timespec deadline);
/* Break one polling thread out of polling work for this pollset.
......
......
......@@ -246,8 +246,11 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
}
void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker *worker, gpr_timespec now,
grpc_pollset_worker **worker_hdl, gpr_timespec now,
gpr_timespec deadline) {
grpc_pollset_worker worker;
*worker_hdl = &worker;
/* pollset->mu already held */
int added_worker = 0;
int locked = 1;
......@@ -255,16 +258,16 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
int keep_polling = 0;
GPR_TIMER_BEGIN("grpc_pollset_work", 0);
/* this must happen before we (potentially) drop pollset->mu */
worker->next = worker->prev = NULL;
worker->reevaluate_polling_on_wakeup = 0;
worker.next = worker.prev = NULL;
worker.reevaluate_polling_on_wakeup = 0;
if (pollset->local_wakeup_cache != NULL) {
worker->wakeup_fd = pollset->local_wakeup_cache;
pollset->local_wakeup_cache = worker->wakeup_fd->next;
worker.wakeup_fd = pollset->local_wakeup_cache;
pollset->local_wakeup_cache = worker.wakeup_fd->next;
} else {
worker->wakeup_fd = gpr_malloc(sizeof(*worker->wakeup_fd));
grpc_wakeup_fd_init(&worker->wakeup_fd->fd);
worker.wakeup_fd = gpr_malloc(sizeof(*worker.wakeup_fd));
grpc_wakeup_fd_init(&worker.wakeup_fd->fd);
}
worker->kicked_specifically = 0;
worker.kicked_specifically = 0;
/* If there's work waiting for the pollset to be idle, and the
pollset is idle, then do that work */
if (!grpc_pollset_has_workers(pollset) &&
......@@ -293,13 +296,13 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
keep_polling = 0;
if (!pollset->kicked_without_pollers) {
if (!added_worker) {
push_front_worker(pollset, worker);
push_front_worker(pollset, &worker);
added_worker = 1;
gpr_tls_set(&g_current_thread_worker, (intptr_t)worker);
gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
}
gpr_tls_set(&g_current_thread_poller, (intptr_t)pollset);
GPR_TIMER_BEGIN("maybe_work_and_unlock", 0);
pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, worker,
pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, &worker,
deadline, now);
GPR_TIMER_END("maybe_work_and_unlock", 0);
locked = 0;
......@@ -321,10 +324,10 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
/* If we're forced to re-evaluate polling (via grpc_pollset_kick with
GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force
a loop */
if (worker->reevaluate_polling_on_wakeup) {
worker->reevaluate_polling_on_wakeup = 0;
if (worker.reevaluate_polling_on_wakeup) {
worker.reevaluate_polling_on_wakeup = 0;
pollset->kicked_without_pollers = 0;
if (queued_work || worker->kicked_specifically) {
if (queued_work || worker.kicked_specifically) {
/* If there's queued work on the list, then set the deadline to be
immediate so we get back out of the polling loop quickly */
deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC);
......@@ -333,12 +336,12 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
}
}
if (added_worker) {
remove_worker(pollset, worker);
remove_worker(pollset, &worker);
gpr_tls_set(&g_current_thread_worker, 0);
}
/* release wakeup fd to the local pool */
worker->wakeup_fd->next = pollset->local_wakeup_cache;
pollset->local_wakeup_cache = worker->wakeup_fd;
worker.wakeup_fd->next = pollset->local_wakeup_cache;
pollset->local_wakeup_cache = worker.wakeup_fd;
/* check shutdown conditions */
if (pollset->shutting_down) {
if (grpc_pollset_has_workers(pollset)) {
......@@ -360,6 +363,7 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
gpr_mu_lock(&pollset->mu);
}
}
*worker_hdl = NULL;
GPR_TIMER_END("grpc_pollset_work", 0);
}
......
......
......@@ -122,7 +122,7 @@ static int is_stack_running_on_compute_engine(void) {
called once for the lifetime of the process by the default credentials. */
gpr_mu_lock(GRPC_POLLSET_MU(&detector.pollset));
while (!detector.is_done) {
grpc_pollset_worker worker;
grpc_pollset_worker *worker = NULL;
grpc_pollset_work(&exec_ctx, &detector.pollset, &worker,
gpr_now(GPR_CLOCK_MONOTONIC),
gpr_inf_future(GPR_CLOCK_MONOTONIC));
......
......
......@@ -50,7 +50,7 @@
#include <grpc/support/time.h>
typedef struct {
grpc_pollset_worker *worker;
grpc_pollset_worker **worker;
void *tag;
} plucker;
......@@ -252,7 +252,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
pluck_worker = NULL;
for (i = 0; i < cc->num_pluckers; i++) {
if (cc->pluckers[i].tag == tag) {
pluck_worker = cc->pluckers[i].worker;
pluck_worker = *cc->pluckers[i].worker;
break;
}
}
......@@ -275,7 +275,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
gpr_timespec deadline, void *reserved) {
grpc_event ret;
grpc_pollset_worker worker;
grpc_pollset_worker *worker = NULL;
int first_loop = 1;
gpr_timespec now;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
......@@ -348,7 +348,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
}
static int add_plucker(grpc_completion_queue *cc, void *tag,
grpc_pollset_worker *worker) {
grpc_pollset_worker **worker) {
if (cc->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) {
return 0;
}
......@@ -359,7 +359,7 @@ static int add_plucker(grpc_completion_queue *cc, void *tag,
}
static void del_plucker(grpc_completion_queue *cc, void *tag,
grpc_pollset_worker *worker) {
grpc_pollset_worker **worker) {
int i;
for (i = 0; i < cc->num_pluckers; i++) {
if (cc->pluckers[i].tag == tag && cc->pluckers[i].worker == worker) {
......@@ -376,7 +376,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
grpc_event ret;
grpc_cq_completion *c;
grpc_cq_completion *prev;
grpc_pollset_worker worker;
grpc_pollset_worker *worker = NULL;
gpr_timespec now;
int first_loop = 1;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
......
......
......@@ -89,7 +89,7 @@ static void test_get(int port) {
on_finish, (void *)42);
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (!g_done) {
grpc_pollset_worker worker;
grpc_pollset_worker *worker = NULL;
grpc_pollset_work(&exec_ctx, &g_pollset, &worker,
gpr_now(GPR_CLOCK_MONOTONIC), n_seconds_time(20));
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
......@@ -120,7 +120,7 @@ static void test_post(int port) {
n_seconds_time(15), on_finish, (void *)42);
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (!g_done) {
grpc_pollset_worker worker;
grpc_pollset_worker *worker = NULL;
grpc_pollset_work(&exec_ctx, &g_pollset, &worker,
gpr_now(GPR_CLOCK_MONOTONIC), n_seconds_time(20));
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
......
......
......@@ -90,7 +90,7 @@ static void test_get(int port) {
on_finish, (void *)42);
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (!g_done) {
grpc_pollset_worker worker;
grpc_pollset_worker *worker = NULL;
grpc_pollset_work(&exec_ctx, &g_pollset, &worker,
gpr_now(GPR_CLOCK_MONOTONIC), n_seconds_time(20));
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
......@@ -122,7 +122,7 @@ static void test_post(int port) {
n_seconds_time(15), on_finish, (void *)42);
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (!g_done) {
grpc_pollset_worker worker;
grpc_pollset_worker *worker = NULL;
grpc_pollset_work(&exec_ctx, &g_pollset, &worker,
gpr_now(GPR_CLOCK_MONOTONIC), n_seconds_time(20));
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
......
......
......@@ -234,7 +234,7 @@ static void read_and_write_test(grpc_endpoint_test_config config,
gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
while (!state.read_done || !state.write_done) {
grpc_pollset_worker worker;
grpc_pollset_worker *worker = NULL;
GPR_ASSERT(gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), deadline) < 0);
grpc_pollset_work(&exec_ctx, g_pollset, &worker,
gpr_now(GPR_CLOCK_MONOTONIC), deadline);
......
......
......@@ -252,7 +252,7 @@ static void server_wait_and_shutdown(server *sv) {
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (!sv->done) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_pollset_worker worker;
grpc_pollset_worker *worker = NULL;
grpc_pollset_work(&exec_ctx, &g_pollset, &worker,
gpr_now(GPR_CLOCK_MONOTONIC),
gpr_inf_future(GPR_CLOCK_MONOTONIC));
......@@ -366,7 +366,7 @@ static void client_start(grpc_exec_ctx *exec_ctx, client *cl, int port) {
static void client_wait_and_shutdown(client *cl) {
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (!cl->done) {
grpc_pollset_worker worker;
grpc_pollset_worker *worker = NULL;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_pollset_work(&exec_ctx, &g_pollset, &worker,
gpr_now(GPR_CLOCK_MONOTONIC),
......@@ -467,7 +467,7 @@ static void test_grpc_fd_change(void) {
/* And now wait for it to run. */
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (a.cb_that_ran == NULL) {
grpc_pollset_worker worker;
grpc_pollset_worker *worker = NULL;
grpc_pollset_work(&exec_ctx, &g_pollset, &worker,
gpr_now(GPR_CLOCK_MONOTONIC),
gpr_inf_future(GPR_CLOCK_MONOTONIC));
......@@ -491,7 +491,7 @@ static void test_grpc_fd_change(void) {
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (b.cb_that_ran == NULL) {
grpc_pollset_worker worker;
grpc_pollset_worker *worker = NULL;
grpc_pollset_work(&exec_ctx, &g_pollset, &worker,
gpr_now(GPR_CLOCK_MONOTONIC),
gpr_inf_future(GPR_CLOCK_MONOTONIC));
......
......
......@@ -121,7 +121,7 @@ void test_succeeds(void) {
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (g_connections_complete == connections_complete_before) {
grpc_pollset_worker worker;
grpc_pollset_worker *worker = NULL;
grpc_pollset_work(&exec_ctx, &g_pollset, &worker,
gpr_now(GPR_CLOCK_MONOTONIC),
GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5));
......@@ -161,7 +161,7 @@ void test_fails(void) {
/* wait for the connection callback to finish */
while (g_connections_complete == connections_complete_before) {
grpc_pollset_worker worker;
grpc_pollset_worker *worker = NULL;
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_timespec polling_deadline = test_deadline();
if (!grpc_timer_check(&exec_ctx, now, &polling_deadline)) {
......@@ -228,7 +228,7 @@ void test_times_out(void) {
/* Make sure the event doesn't trigger early */
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
for (;;) {
grpc_pollset_worker worker;
grpc_pollset_worker *worker = NULL;
gpr_timespec now = gpr_now(connect_deadline.clock_type);
gpr_timespec continue_verifying_time =
gpr_time_from_seconds(5, GPR_TIMESPAN);
......
......
......@@ -190,7 +190,7 @@ static void read_test(size_t num_bytes, size_t slice_size) {
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (state.read_bytes < state.target_read_bytes) {
grpc_pollset_worker worker;
grpc_pollset_worker *worker = NULL;
grpc_pollset_work(&exec_ctx, &g_pollset, &worker,
gpr_now(GPR_CLOCK_MONOTONIC), deadline);
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
......@@ -236,7 +236,7 @@ static void large_read_test(size_t slice_size) {
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (state.read_bytes < state.target_read_bytes) {
grpc_pollset_worker worker;
grpc_pollset_worker *worker = NULL;
grpc_pollset_work(&exec_ctx, &g_pollset, &worker,
gpr_now(GPR_CLOCK_MONOTONIC), deadline);
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
......@@ -303,7 +303,7 @@ void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) {
GPR_ASSERT(fcntl(fd, F_SETFL, flags & ~O_NONBLOCK) == 0);
for (;;) {
grpc_pollset_worker worker;
grpc_pollset_worker *worker = NULL;
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
grpc_pollset_work(&exec_ctx, &g_pollset, &worker,
gpr_now(GPR_CLOCK_MONOTONIC),
......@@ -365,7 +365,7 @@ static void write_test(size_t num_bytes, size_t slice_size) {
drain_socket_blocking(sv[0], num_bytes, num_bytes);
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
for (;;) {
grpc_pollset_worker worker;
grpc_pollset_worker *worker = NULL;
if (state.write_done) {
break;
}
......@@ -425,7 +425,7 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) {
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (state.read_bytes < state.target_read_bytes) {
grpc_pollset_worker worker;
grpc_pollset_worker *worker = NULL;
grpc_pollset_work(&exec_ctx, &g_pollset, &worker,
gpr_now(GPR_CLOCK_MONOTONIC), deadline);
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
......@@ -439,7 +439,7 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) {
grpc_tcp_destroy_and_release_fd(&exec_ctx, ep, &fd, &fd_released_cb);
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (!fd_released_done) {
grpc_pollset_worker worker;
grpc_pollset_worker *worker = NULL;
grpc_pollset_work(&exec_ctx, &g_pollset, &worker,
gpr_now(GPR_CLOCK_MONOTONIC), deadline);
}
......
......
......@@ -183,7 +183,7 @@ static void tcp_connect(grpc_exec_ctx *exec_ctx, const struct sockaddr *remote,
gpr_log(GPR_DEBUG, "wait");
while (g_nconnects == nconnects_before &&
gpr_time_cmp(deadline, gpr_now(deadline.clock_type)) > 0) {
grpc_pollset_worker worker;
grpc_pollset_worker *worker = NULL;
grpc_pollset_work(exec_ctx, &g_pollset, &worker,
gpr_now(GPR_CLOCK_MONOTONIC), deadline);
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
......
......
......@@ -95,7 +95,7 @@ char *grpc_test_fetch_oauth2_token_with_credentials(
gpr_mu_lock(GRPC_POLLSET_MU(&request.pollset));
while (!request.is_done) {
grpc_pollset_worker worker;
grpc_pollset_worker *worker = NULL;
grpc_pollset_work(&exec_ctx, &request.pollset, &worker,
gpr_now(GPR_CLOCK_MONOTONIC),
gpr_inf_future(GPR_CLOCK_MONOTONIC));
......
......
......@@ -114,7 +114,7 @@ static void free_port_using_server(char *server, int port) {
&pr);
gpr_mu_lock(GRPC_POLLSET_MU(&pr.pollset));
while (!pr.done) {
grpc_pollset_worker worker;
grpc_pollset_worker *worker = NULL;
grpc_pollset_work(&exec_ctx, &pr.pollset, &worker,
gpr_now(GPR_CLOCK_MONOTONIC),
GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1));
......@@ -280,7 +280,7 @@ static int pick_port_using_server(char *server) {
grpc_exec_ctx_finish(&exec_ctx);
gpr_mu_lock(GRPC_POLLSET_MU(&pr.pollset));
while (pr.port == -1) {
grpc_pollset_worker worker;
grpc_pollset_worker *worker = NULL;
grpc_pollset_work(&exec_ctx, &pr.pollset, &worker,
gpr_now(GPR_CLOCK_MONOTONIC),
GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1));
......
......
......@@ -85,7 +85,7 @@ void test_tcp_server_start(test_tcp_server *server, int port) {
}
void test_tcp_server_poll(test_tcp_server *server, int seconds) {
grpc_pollset_worker worker;
grpc_pollset_worker *worker = NULL;
gpr_timespec deadline =
gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
gpr_time_from_seconds(seconds, GPR_TIMESPAN));
......
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment