Skip to content
Snippets Groups Projects
Commit 97c2d6a2 authored by Sree Kuchibhotla's avatar Sree Kuchibhotla
Browse files

Remove grpc_fd_watcher and related code from ev_epoll_posix.c

parent 9ff57f67
No related branches found
No related tags found
No related merge requests found
......@@ -59,17 +59,6 @@
* FD declarations
*/
/* TODO(sreek) : Check if grpc_fd_watcher is needed (and if so, check if we can
* share this between ev_poll_posix.h and ev_epoll_posix versions */
typedef struct grpc_fd_watcher {
struct grpc_fd_watcher *next;
struct grpc_fd_watcher *prev;
grpc_pollset *pollset;
grpc_pollset_worker *worker;
grpc_fd *fd;
} grpc_fd_watcher;
struct grpc_fd {
int fd;
/* refst format:
......@@ -84,32 +73,6 @@ struct grpc_fd {
int closed;
int released;
/* The watcher list.
The following watcher related fields are protected by watcher_mu.
An fd_watcher is an ephemeral object created when an fd wants to
begin polling, and destroyed after the poll.
It denotes the fd's interest in whether to read poll or write poll
or both or neither on this fd.
If a watcher is asked to poll for reads or writes, the read_watcher
or write_watcher fields are set respectively. A watcher may be asked
to poll for both, in which case both fields will be set.
read_watcher and write_watcher may be NULL if no watcher has been
asked to poll for reads or writes.
If an fd_watcher is not asked to poll for reads or writes, it's added
to a linked list of inactive watchers, rooted at inactive_watcher_root.
If at a later time there becomes need of a poller to poll, one of
the inactive pollers may be kicked out of their poll loops to take
that responsibility. */
grpc_fd_watcher inactive_watcher_root;
grpc_fd_watcher *read_watcher;
grpc_fd_watcher *write_watcher;
grpc_closure *read_closure;
grpc_closure *write_closure;
......@@ -120,27 +83,6 @@ struct grpc_fd {
grpc_iomgr_object iomgr_object;
};
/* Begin polling on an fd.
Registers that the given pollset is interested in this fd - so that if read
or writability interest changes, the pollset can be kicked to pick up that
new interest.
Return value is:
(fd_needs_read? read_mask : 0) | (fd_needs_write? write_mask : 0)
i.e. a combination of read_mask and write_mask determined by the fd's current
interest in said events.
Polling strategies that do not need to alter their behavior depending on the
fd's current interest (such as epoll) do not need to call this function.
MUST NOT be called with a pollset lock taken */
static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
grpc_pollset_worker *worker, uint32_t read_mask,
uint32_t write_mask, grpc_fd_watcher *rec);
/* Complete polling previously started with fd_begin_poll
MUST NOT be called with a pollset lock taken
if got_read or got_write are 1, also does the become_{readable,writable} as
appropriate. */
static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *rec,
int got_read, int got_write);
/* Return 1 if this fd is orphaned, 0 otherwise */
static bool fd_is_orphaned(grpc_fd *fd);
......@@ -307,10 +249,7 @@ static grpc_fd *alloc_fd(int fd) {
r->read_closure = CLOSURE_NOT_READY;
r->write_closure = CLOSURE_NOT_READY;
r->fd = fd;
r->inactive_watcher_root.next = r->inactive_watcher_root.prev =
&r->inactive_watcher_root;
r->freelist_next = NULL;
r->read_watcher = r->write_watcher = NULL;
r->on_done_closure = NULL;
r->closed = 0;
r->released = 0;
......@@ -387,43 +326,6 @@ static bool fd_is_orphaned(grpc_fd *fd) {
return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
}
static void pollset_kick_locked(grpc_fd_watcher *watcher) {
gpr_mu_lock(&watcher->pollset->mu);
GPR_ASSERT(watcher->worker);
pollset_kick_ext(watcher->pollset, watcher->worker,
GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP);
gpr_mu_unlock(&watcher->pollset->mu);
}
static void maybe_wake_one_watcher_locked(grpc_fd *fd) {
if (fd->inactive_watcher_root.next != &fd->inactive_watcher_root) {
pollset_kick_locked(fd->inactive_watcher_root.next);
} else if (fd->read_watcher) {
pollset_kick_locked(fd->read_watcher);
} else if (fd->write_watcher) {
pollset_kick_locked(fd->write_watcher);
}
}
static void wake_all_watchers_locked(grpc_fd *fd) {
grpc_fd_watcher *watcher;
for (watcher = fd->inactive_watcher_root.next;
watcher != &fd->inactive_watcher_root; watcher = watcher->next) {
pollset_kick_locked(watcher);
}
if (fd->read_watcher) {
pollset_kick_locked(fd->read_watcher);
}
if (fd->write_watcher && fd->write_watcher != fd->read_watcher) {
pollset_kick_locked(fd->write_watcher);
}
}
static int has_watchers(grpc_fd *fd) {
return fd->read_watcher != NULL || fd->write_watcher != NULL ||
fd->inactive_watcher_root.next != &fd->inactive_watcher_root;
}
static void close_fd_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
fd->closed = 1;
if (!fd->released) {
......@@ -454,11 +356,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
}
gpr_mu_lock(&fd->mu);
REF_BY(fd, 1, reason); /* remove active status, but keep referenced */
if (!has_watchers(fd)) {
close_fd_locked(exec_ctx, fd);
} else {
wake_all_watchers_locked(fd);
}
close_fd_locked(exec_ctx, fd);
gpr_mu_unlock(&fd->mu);
UNREF_BY(fd, 2, reason); /* drop the reference */
}
......@@ -489,7 +387,6 @@ static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
/* already ready ==> queue the closure to run immediately */
*st = CLOSURE_NOT_READY;
grpc_exec_ctx_enqueue(exec_ctx, closure, !fd->shutdown, NULL);
maybe_wake_one_watcher_locked(fd);
} else {
/* upcallptr was set to a different closure. This is an error! */
gpr_log(GPR_ERROR,
......@@ -540,111 +437,6 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
gpr_mu_unlock(&fd->mu);
}
static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
grpc_pollset_worker *worker, uint32_t read_mask,
uint32_t write_mask, grpc_fd_watcher *watcher) {
uint32_t mask = 0;
grpc_closure *cur;
int requested;
/* keep track of pollers that have requested our events, in case they change
*/
GRPC_FD_REF(fd, "poll");
gpr_mu_lock(&fd->mu);
/* if we are shutdown, then don't add to the watcher set */
if (fd->shutdown) {
watcher->fd = NULL;
watcher->pollset = NULL;
watcher->worker = NULL;
gpr_mu_unlock(&fd->mu);
GRPC_FD_UNREF(fd, "poll");
return 0;
}
/* if there is nobody polling for read, but we need to, then start doing so */
cur = fd->read_closure;
requested = cur != CLOSURE_READY;
if (read_mask && fd->read_watcher == NULL && requested) {
fd->read_watcher = watcher;
mask |= read_mask;
}
/* if there is nobody polling for write, but we need to, then start doing so
*/
cur = fd->write_closure;
requested = cur != CLOSURE_READY;
if (write_mask && fd->write_watcher == NULL && requested) {
fd->write_watcher = watcher;
mask |= write_mask;
}
/* if not polling, remember this watcher in case we need someone to later */
if (mask == 0 && worker != NULL) {
watcher->next = &fd->inactive_watcher_root;
watcher->prev = watcher->next->prev;
watcher->next->prev = watcher->prev->next = watcher;
}
watcher->pollset = pollset;
watcher->worker = worker;
watcher->fd = fd;
gpr_mu_unlock(&fd->mu);
return mask;
}
static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher,
int got_read, int got_write) {
int was_polling = 0;
int kick = 0;
grpc_fd *fd = watcher->fd;
if (fd == NULL) {
return;
}
gpr_mu_lock(&fd->mu);
if (watcher == fd->read_watcher) {
/* remove read watcher, kick if we still need a read */
was_polling = 1;
if (!got_read) {
kick = 1;
}
fd->read_watcher = NULL;
}
if (watcher == fd->write_watcher) {
/* remove write watcher, kick if we still need a write */
was_polling = 1;
if (!got_write) {
kick = 1;
}
fd->write_watcher = NULL;
}
if (!was_polling && watcher->worker != NULL) {
/* remove from inactive list */
watcher->next->prev = watcher->prev;
watcher->prev->next = watcher->next;
}
if (got_read) {
if (set_ready_locked(exec_ctx, fd, &fd->read_closure)) {
kick = 1;
}
}
if (got_write) {
if (set_ready_locked(exec_ctx, fd, &fd->write_closure)) {
kick = 1;
}
}
if (kick) {
maybe_wake_one_watcher_locked(fd);
}
if (fd_is_orphaned(fd) && !has_watchers(fd) && !fd->closed) {
close_fd_locked(exec_ctx, fd);
}
gpr_mu_unlock(&fd->mu);
GRPC_FD_UNREF(fd, "poll");
}
/*******************************************************************************
* pollset_posix.c
*/
......@@ -1085,32 +877,45 @@ static void finally_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
epoll_hdr *h = pollset->data.ptr;
struct epoll_event ev;
int err;
grpc_fd_watcher watcher;
/* We pretend to be polling whilst adding an fd to keep the fd from being
closed during the add. This may result in a spurious wakeup being assigned
to this pollset whilst adding, but that should be benign. */
/* TODO (sreek). This fd_begin_poll() really seem to accomplish adding
* GRPC_FD_REF() (i.e adding a refcount to the fd) and checking that the
* fd is not shutting down (in which case watcher.fd will be NULL and no
* refcount is added). The ref count is added only durng hte duration of
* adding it to the epoll set (after which fd_end_poll would be called and
* the fd's ref count is decremented by 1. So do we still need fd_begin_poll
* ??? */
GPR_ASSERT(fd_begin_poll(fd, pollset, NULL, 0, 0, &watcher) == 0);
if (watcher.fd != NULL) {
ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
ev.data.ptr = fd;
err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev);
if (err < 0) {
/* FDs may be added to a pollset multiple times, so EEXIST is normal. */
if (errno != EEXIST) {
gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s", fd->fd,
strerror(errno));
}
/* Hold a ref to the fd to keep it from being closed during the add. This may
result in a spurious wakeup being assigned to this pollset whilst adding,
but that should be benign. */
/* TODO: (sreek): Understand how a spurious wake up migh be assinged to this
* pollset..and how holding a reference will prevent the fd from being closed
* (and perhaps more importantly, see how can an fd be closed while being
* added to the epollset */
GRPC_FD_REF(fd, "add fd");
gpr_mu_lock(&fd->mu);
if (fd->shutdown) {
gpr_mu_unlock(&fd->mu);
GRPC_FD_UNREF(fd, "add fd");
return;
}
gpr_mu_unlock(&fd->mu);
ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
ev.data.ptr = fd;
err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev);
if (err < 0) {
/* FDs may be added to a pollset multiple times, so EEXIST is normal. */
if (errno != EEXIST) {
gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s", fd->fd,
strerror(errno));
}
}
fd_end_poll(exec_ctx, &watcher, 0, 0);
/* The fd might have been orphaned while we were adding it to the epoll set.
Close the fd in such a case (which will also take care of removing it from
the epoll set */
gpr_mu_lock(&fd->mu);
if (fd_is_orphaned(fd) && !fd->closed) {
close_fd_locked(exec_ctx, fd);
}
gpr_mu_unlock(&fd->mu);
GRPC_FD_UNREF(fd, "add fd");
}
/* Creates an epoll fd and initializes the pollset */
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment