Skip to content
Snippets Groups Projects
Commit 40a3e64a authored by Jan Tattermusch's avatar Jan Tattermusch Committed by GitHub
Browse files

Merge pull request #7123 from sreecha/epoll_perf

Trace statements in epoll poller (enabled by default to debug the recent perf regression)
parents 3db76b92 34217248
No related branches found
No related tags found
No related merge requests found
...@@ -60,6 +60,13 @@ ...@@ -60,6 +60,13 @@
#include "src/core/lib/profiling/timers.h" #include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/block_annotate.h" #include "src/core/lib/support/block_annotate.h"
/* TODO: sreek - Move this to init.c and initialize this like other tracers. */
static int grpc_polling_trace = 0; /* Disabled by default */
#define GRPC_POLLING_TRACE(fmt, ...) \
if (grpc_polling_trace) { \
gpr_log(GPR_INFO, (fmt), __VA_ARGS__); \
}
static int grpc_wakeup_signal = -1; static int grpc_wakeup_signal = -1;
static bool is_grpc_wakeup_signal_initialized = false; static bool is_grpc_wakeup_signal_initialized = false;
...@@ -195,7 +202,11 @@ typedef struct polling_island { ...@@ -195,7 +202,11 @@ typedef struct polling_island {
* Pollset Declarations * Pollset Declarations
*/ */
struct grpc_pollset_worker { struct grpc_pollset_worker {
pthread_t pt_id; /* Thread id of this worker */ /* Thread id of this worker */
pthread_t pt_id;
/* Used to prevent a worker from getting kicked multiple times */
gpr_atm is_kicked;
struct grpc_pollset_worker *next; struct grpc_pollset_worker *next;
struct grpc_pollset_worker *prev; struct grpc_pollset_worker *prev;
}; };
...@@ -1058,9 +1069,16 @@ static void pollset_global_shutdown(void) { ...@@ -1058,9 +1069,16 @@ static void pollset_global_shutdown(void) {
static grpc_error *pollset_worker_kick(grpc_pollset_worker *worker) { static grpc_error *pollset_worker_kick(grpc_pollset_worker *worker) {
grpc_error *err = GRPC_ERROR_NONE; grpc_error *err = GRPC_ERROR_NONE;
int err_num = pthread_kill(worker->pt_id, grpc_wakeup_signal);
if (err_num != 0) { /* Kick the worker only if it was not already kicked */
err = GRPC_OS_ERROR(err_num, "pthread_kill"); if (gpr_atm_no_barrier_cas(&worker->is_kicked, (gpr_atm)0, (gpr_atm)1)) {
GRPC_POLLING_TRACE(
"pollset_worker_kick: Kicking worker: %p (thread id: %ld)",
(void *)worker, worker->pt_id);
int err_num = pthread_kill(worker->pt_id, grpc_wakeup_signal);
if (err_num != 0) {
err = GRPC_OS_ERROR(err_num, "pthread_kill");
}
} }
return err; return err;
} }
...@@ -1104,7 +1122,6 @@ static grpc_error *pollset_kick(grpc_pollset *p, ...@@ -1104,7 +1122,6 @@ static grpc_error *pollset_kick(grpc_pollset *p,
GPR_TIMER_BEGIN("pollset_kick", 0); GPR_TIMER_BEGIN("pollset_kick", 0);
grpc_error *error = GRPC_ERROR_NONE; grpc_error *error = GRPC_ERROR_NONE;
const char *err_desc = "Kick Failure"; const char *err_desc = "Kick Failure";
grpc_pollset_worker *worker = specific_worker; grpc_pollset_worker *worker = specific_worker;
if (worker != NULL) { if (worker != NULL) {
if (worker == GRPC_POLLSET_KICK_BROADCAST) { if (worker == GRPC_POLLSET_KICK_BROADCAST) {
...@@ -1270,7 +1287,8 @@ static void pollset_reset(grpc_pollset *pollset) { ...@@ -1270,7 +1287,8 @@ static void pollset_reset(grpc_pollset *pollset) {
#define GRPC_EPOLL_MAX_EVENTS 1000 #define GRPC_EPOLL_MAX_EVENTS 1000
/* Note: sig_mask contains the signal mask to use *during* epoll_wait() */ /* Note: sig_mask contains the signal mask to use *during* epoll_wait() */
static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
grpc_pollset *pollset, int timeout_ms, grpc_pollset *pollset,
grpc_pollset_worker *worker, int timeout_ms,
sigset_t *sig_mask, grpc_error **error) { sigset_t *sig_mask, grpc_error **error) {
struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS]; struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
int epoll_fd = -1; int epoll_fd = -1;
...@@ -1298,6 +1316,8 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, ...@@ -1298,6 +1316,8 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
} }
PI_ADD_REF(pollset->polling_island, "ps"); PI_ADD_REF(pollset->polling_island, "ps");
GRPC_POLLING_TRACE("pollset_work: pollset: %p created new pi: %p",
(void *)pollset, (void *)pollset->polling_island);
} }
pi = polling_island_maybe_get_latest(pollset->polling_island); pi = polling_island_maybe_get_latest(pollset->polling_island);
...@@ -1331,6 +1351,9 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, ...@@ -1331,6 +1351,9 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
} else { } else {
/* We were interrupted. Save an interation by doing a zero timeout /* We were interrupted. Save an interation by doing a zero timeout
epoll_wait to see if there are any other events of interest */ epoll_wait to see if there are any other events of interest */
GRPC_POLLING_TRACE(
"pollset_work: pollset: %p, worker: %p received kick",
(void *)pollset, (void *)worker);
ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0); ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
} }
} }
...@@ -1347,6 +1370,10 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, ...@@ -1347,6 +1370,10 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd), grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd),
err_desc); err_desc);
} else if (data_ptr == &polling_island_wakeup_fd) { } else if (data_ptr == &polling_island_wakeup_fd) {
GRPC_POLLING_TRACE(
"pollset_work: pollset: %p, worker: %p polling island (epoll_fd: "
"%d) got merged",
(void *)pollset, (void *)worker, epoll_fd);
/* This means that our polling island is merged with a different /* This means that our polling island is merged with a different
island. We do not have to do anything here since the subsequent call island. We do not have to do anything here since the subsequent call
to the function pollset_work_and_unlock() will pick up the correct to the function pollset_work_and_unlock() will pick up the correct
...@@ -1394,6 +1421,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, ...@@ -1394,6 +1421,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker worker; grpc_pollset_worker worker;
worker.next = worker.prev = NULL; worker.next = worker.prev = NULL;
worker.pt_id = pthread_self(); worker.pt_id = pthread_self();
gpr_atm_no_barrier_store(&worker.is_kicked, (gpr_atm)0);
*worker_hdl = &worker; *worker_hdl = &worker;
...@@ -1409,18 +1437,20 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, ...@@ -1409,18 +1437,20 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
pollset->kicked_without_pollers = 0; pollset->kicked_without_pollers = 0;
} else if (!pollset->shutting_down) { } else if (!pollset->shutting_down) {
/* We use the posix-signal with number 'grpc_wakeup_signal' for waking up /* We use the posix-signal with number 'grpc_wakeup_signal' for waking up
(i.e 'kicking') a worker in the pollset. (i.e 'kicking') a worker in the pollset. A 'kick' is a way to inform the
A 'kick' is a way to inform that worker that there is some pending work worker that there is some pending work that needs immediate attention
that needs immediate attention (like an event on the completion queue, (like an event on the completion queue, or a polling island merge that
or a polling island merge that results in a new epoll-fd to wait on) and results in a new epoll-fd to wait on) and that the worker should not
that the worker should not spend time waiting in epoll_pwait(). spend time waiting in epoll_pwait().
A kick can come at anytime (i.e before/during or after the worker calls A worker can be kicked anytime from the point it is added to the pollset
epoll_pwait()) but in all cases we have to make sure that when a worker via push_front_worker() (or push_back_worker()) to the point it is
gets a kick, it does not spend time in epoll_pwait(). In other words, one removed via remove_worker().
kick should result in skipping/exiting of one epoll_pwait(); If the worker is kicked before/during it calls epoll_pwait(), it should
immediately exit from epoll_wait(). If the worker is kicked after it
To accomplish this, we mask 'grpc_wakeup_signal' on this worker at all returns from epoll_wait(), then nothing really needs to be done.
To accomplish this, we mask 'grpc_wakeup_signal' on this thread at all
times *except* when it is in epoll_pwait(). This way, the worker never times *except* when it is in epoll_pwait(). This way, the worker never
misses acting on a kick */ misses acting on a kick */
...@@ -1442,11 +1472,14 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, ...@@ -1442,11 +1472,14 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
push_front_worker(pollset, &worker); /* Add worker to pollset */ push_front_worker(pollset, &worker); /* Add worker to pollset */
pollset_work_and_unlock(exec_ctx, pollset, timeout_ms, &g_orig_sigmask, pollset_work_and_unlock(exec_ctx, pollset, &worker, timeout_ms,
&error); &g_orig_sigmask, &error);
grpc_exec_ctx_flush(exec_ctx); grpc_exec_ctx_flush(exec_ctx);
gpr_mu_lock(&pollset->mu); gpr_mu_lock(&pollset->mu);
/* Note: There is no need to reset worker.is_kicked to 0 since we are no
longer going to use this worker */
remove_worker(pollset, &worker); remove_worker(pollset, &worker);
} }
...@@ -1506,17 +1539,38 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, ...@@ -1506,17 +1539,38 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
pi_new = fd->polling_island; pi_new = fd->polling_island;
if (pi_new == NULL) { if (pi_new == NULL) {
pi_new = polling_island_create(fd, &error); pi_new = polling_island_create(fd, &error);
GRPC_POLLING_TRACE(
"pollset_add_fd: Created new polling island. pi_new: %p (fd: %d, "
"pollset: %p)",
(void *)pi_new, fd->fd, (void *)pollset);
} }
} else if (fd->polling_island == NULL) { } else if (fd->polling_island == NULL) {
pi_new = polling_island_lock(pollset->polling_island); pi_new = polling_island_lock(pollset->polling_island);
polling_island_add_fds_locked(pi_new, &fd, 1, true, &error); polling_island_add_fds_locked(pi_new, &fd, 1, true, &error);
gpr_mu_unlock(&pi_new->mu); gpr_mu_unlock(&pi_new->mu);
GRPC_POLLING_TRACE(
"pollset_add_fd: fd->pi was NULL. pi_new: %p (fd: %d, pollset: %p, "
"pollset->pi: %p)",
(void *)pi_new, fd->fd, (void *)pollset,
(void *)pollset->polling_island);
} else if (pollset->polling_island == NULL) { } else if (pollset->polling_island == NULL) {
pi_new = polling_island_lock(fd->polling_island); pi_new = polling_island_lock(fd->polling_island);
gpr_mu_unlock(&pi_new->mu); gpr_mu_unlock(&pi_new->mu);
GRPC_POLLING_TRACE(
"pollset_add_fd: pollset->pi was NULL. pi_new: %p (fd: %d, pollset: "
"%p, fd->pi: %p",
(void *)pi_new, fd->fd, (void *)pollset, (void *)fd->polling_island);
} else { } else {
pi_new = polling_island_merge(fd->polling_island, pollset->polling_island, pi_new = polling_island_merge(fd->polling_island, pollset->polling_island,
&error); &error);
GRPC_POLLING_TRACE(
"pollset_add_fd: polling islands merged. pi_new: %p (fd: %d, pollset: "
"%p, fd->pi: %p, pollset->pi: %p)",
(void *)pi_new, fd->fd, (void *)pollset, (void *)fd->polling_island,
(void *)pollset->polling_island);
} }
/* At this point, pi_new is the polling island that both fd->polling_island /* At this point, pi_new is the polling island that both fd->polling_island
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment