From 97c2d6a269472a8a6e56d9e3d88f89bd27aff5ac Mon Sep 17 00:00:00 2001
From: Sree Kuchibhotla <sreek@google.com>
Date: Sat, 14 May 2016 16:33:16 -0700
Subject: [PATCH] Remove grpc_fd_watcher and related code from ev_epoll_posix.c

---
 src/core/lib/iomgr/ev_epoll_posix.c | 271 ++++------------------------
 1 file changed, 38 insertions(+), 233 deletions(-)

diff --git a/src/core/lib/iomgr/ev_epoll_posix.c b/src/core/lib/iomgr/ev_epoll_posix.c
index 5776b6c1cf..920be1951f 100644
--- a/src/core/lib/iomgr/ev_epoll_posix.c
+++ b/src/core/lib/iomgr/ev_epoll_posix.c
@@ -59,17 +59,6 @@
  * FD declarations
  */
 
-/* TODO(sreek) : Check if grpc_fd_watcher is needed (and if so, check if we can
- * share this between ev_poll_posix.h and ev_epoll_posix versions */
-
-typedef struct grpc_fd_watcher {
-  struct grpc_fd_watcher *next;
-  struct grpc_fd_watcher *prev;
-  grpc_pollset *pollset;
-  grpc_pollset_worker *worker;
-  grpc_fd *fd;
-} grpc_fd_watcher;
-
 struct grpc_fd {
   int fd;
   /* refst format:
@@ -84,32 +73,6 @@ struct grpc_fd {
   int closed;
   int released;
 
-  /* The watcher list.
-
-     The following watcher related fields are protected by watcher_mu.
-
-     An fd_watcher is an ephemeral object created when an fd wants to
-     begin polling, and destroyed after the poll.
-
-     It denotes the fd's interest in whether to read poll or write poll
-     or both or neither on this fd.
-
-     If a watcher is asked to poll for reads or writes, the read_watcher
-     or write_watcher fields are set respectively. A watcher may be asked
-     to poll for both, in which case both fields will be set.
-
-     read_watcher and write_watcher may be NULL if no watcher has been
-     asked to poll for reads or writes.
-
-     If an fd_watcher is not asked to poll for reads or writes, it's added
-     to a linked list of inactive watchers, rooted at inactive_watcher_root.
-     If at a later time there becomes need of a poller to poll, one of
-     the inactive pollers may be kicked out of their poll loops to take
-     that responsibility. */
-  grpc_fd_watcher inactive_watcher_root;
-  grpc_fd_watcher *read_watcher;
-  grpc_fd_watcher *write_watcher;
-
   grpc_closure *read_closure;
   grpc_closure *write_closure;
 
@@ -120,27 +83,6 @@ struct grpc_fd {
   grpc_iomgr_object iomgr_object;
 };
 
-/* Begin polling on an fd.
-   Registers that the given pollset is interested in this fd - so that if read
-   or writability interest changes, the pollset can be kicked to pick up that
-   new interest.
-   Return value is:
-     (fd_needs_read? read_mask : 0) | (fd_needs_write? write_mask : 0)
-   i.e. a combination of read_mask and write_mask determined by the fd's current
-   interest in said events.
-   Polling strategies that do not need to alter their behavior depending on the
-   fd's current interest (such as epoll) do not need to call this function.
-   MUST NOT be called with a pollset lock taken */
-static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
-                              grpc_pollset_worker *worker, uint32_t read_mask,
-                              uint32_t write_mask, grpc_fd_watcher *rec);
-/* Complete polling previously started with fd_begin_poll
-   MUST NOT be called with a pollset lock taken
-   if got_read or got_write are 1, also does the become_{readable,writable} as
-   appropriate. */
-static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *rec,
-                        int got_read, int got_write);
-
 /* Return 1 if this fd is orphaned, 0 otherwise */
 static bool fd_is_orphaned(grpc_fd *fd);
 
@@ -307,10 +249,7 @@ static grpc_fd *alloc_fd(int fd) {
   r->read_closure = CLOSURE_NOT_READY;
   r->write_closure = CLOSURE_NOT_READY;
   r->fd = fd;
-  r->inactive_watcher_root.next = r->inactive_watcher_root.prev =
-      &r->inactive_watcher_root;
   r->freelist_next = NULL;
-  r->read_watcher = r->write_watcher = NULL;
   r->on_done_closure = NULL;
   r->closed = 0;
   r->released = 0;
@@ -387,43 +326,6 @@ static bool fd_is_orphaned(grpc_fd *fd) {
   return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
 }
 
-static void pollset_kick_locked(grpc_fd_watcher *watcher) {
-  gpr_mu_lock(&watcher->pollset->mu);
-  GPR_ASSERT(watcher->worker);
-  pollset_kick_ext(watcher->pollset, watcher->worker,
-                   GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP);
-  gpr_mu_unlock(&watcher->pollset->mu);
-}
-
-static void maybe_wake_one_watcher_locked(grpc_fd *fd) {
-  if (fd->inactive_watcher_root.next != &fd->inactive_watcher_root) {
-    pollset_kick_locked(fd->inactive_watcher_root.next);
-  } else if (fd->read_watcher) {
-    pollset_kick_locked(fd->read_watcher);
-  } else if (fd->write_watcher) {
-    pollset_kick_locked(fd->write_watcher);
-  }
-}
-
-static void wake_all_watchers_locked(grpc_fd *fd) {
-  grpc_fd_watcher *watcher;
-  for (watcher = fd->inactive_watcher_root.next;
-       watcher != &fd->inactive_watcher_root; watcher = watcher->next) {
-    pollset_kick_locked(watcher);
-  }
-  if (fd->read_watcher) {
-    pollset_kick_locked(fd->read_watcher);
-  }
-  if (fd->write_watcher && fd->write_watcher != fd->read_watcher) {
-    pollset_kick_locked(fd->write_watcher);
-  }
-}
-
-static int has_watchers(grpc_fd *fd) {
-  return fd->read_watcher != NULL || fd->write_watcher != NULL ||
-         fd->inactive_watcher_root.next != &fd->inactive_watcher_root;
-}
-
 static void close_fd_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
   fd->closed = 1;
   if (!fd->released) {
@@ -454,11 +356,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
   }
   gpr_mu_lock(&fd->mu);
   REF_BY(fd, 1, reason); /* remove active status, but keep referenced */
-  if (!has_watchers(fd)) {
-    close_fd_locked(exec_ctx, fd);
-  } else {
-    wake_all_watchers_locked(fd);
-  }
+  close_fd_locked(exec_ctx, fd);
   gpr_mu_unlock(&fd->mu);
   UNREF_BY(fd, 2, reason); /* drop the reference */
 }
@@ -489,7 +387,6 @@ static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
     /* already ready ==> queue the closure to run immediately */
     *st = CLOSURE_NOT_READY;
     grpc_exec_ctx_enqueue(exec_ctx, closure, !fd->shutdown, NULL);
-    maybe_wake_one_watcher_locked(fd);
   } else {
     /* upcallptr was set to a different closure.  This is an error! */
     gpr_log(GPR_ERROR,
@@ -540,111 +437,6 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
   gpr_mu_unlock(&fd->mu);
 }
 
-static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
-                              grpc_pollset_worker *worker, uint32_t read_mask,
-                              uint32_t write_mask, grpc_fd_watcher *watcher) {
-  uint32_t mask = 0;
-  grpc_closure *cur;
-  int requested;
-  /* keep track of pollers that have requested our events, in case they change
-   */
-  GRPC_FD_REF(fd, "poll");
-
-  gpr_mu_lock(&fd->mu);
-
-  /* if we are shutdown, then don't add to the watcher set */
-  if (fd->shutdown) {
-    watcher->fd = NULL;
-    watcher->pollset = NULL;
-    watcher->worker = NULL;
-    gpr_mu_unlock(&fd->mu);
-    GRPC_FD_UNREF(fd, "poll");
-    return 0;
-  }
-
-  /* if there is nobody polling for read, but we need to, then start doing so */
-  cur = fd->read_closure;
-  requested = cur != CLOSURE_READY;
-  if (read_mask && fd->read_watcher == NULL && requested) {
-    fd->read_watcher = watcher;
-    mask |= read_mask;
-  }
-  /* if there is nobody polling for write, but we need to, then start doing so
-   */
-  cur = fd->write_closure;
-  requested = cur != CLOSURE_READY;
-  if (write_mask && fd->write_watcher == NULL && requested) {
-    fd->write_watcher = watcher;
-    mask |= write_mask;
-  }
-  /* if not polling, remember this watcher in case we need someone to later */
-  if (mask == 0 && worker != NULL) {
-    watcher->next = &fd->inactive_watcher_root;
-    watcher->prev = watcher->next->prev;
-    watcher->next->prev = watcher->prev->next = watcher;
-  }
-  watcher->pollset = pollset;
-  watcher->worker = worker;
-  watcher->fd = fd;
-  gpr_mu_unlock(&fd->mu);
-
-  return mask;
-}
-
-static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher,
-                        int got_read, int got_write) {
-  int was_polling = 0;
-  int kick = 0;
-  grpc_fd *fd = watcher->fd;
-
-  if (fd == NULL) {
-    return;
-  }
-
-  gpr_mu_lock(&fd->mu);
-
-  if (watcher == fd->read_watcher) {
-    /* remove read watcher, kick if we still need a read */
-    was_polling = 1;
-    if (!got_read) {
-      kick = 1;
-    }
-    fd->read_watcher = NULL;
-  }
-  if (watcher == fd->write_watcher) {
-    /* remove write watcher, kick if we still need a write */
-    was_polling = 1;
-    if (!got_write) {
-      kick = 1;
-    }
-    fd->write_watcher = NULL;
-  }
-  if (!was_polling && watcher->worker != NULL) {
-    /* remove from inactive list */
-    watcher->next->prev = watcher->prev;
-    watcher->prev->next = watcher->next;
-  }
-  if (got_read) {
-    if (set_ready_locked(exec_ctx, fd, &fd->read_closure)) {
-      kick = 1;
-    }
-  }
-  if (got_write) {
-    if (set_ready_locked(exec_ctx, fd, &fd->write_closure)) {
-      kick = 1;
-    }
-  }
-  if (kick) {
-    maybe_wake_one_watcher_locked(fd);
-  }
-  if (fd_is_orphaned(fd) && !has_watchers(fd) && !fd->closed) {
-    close_fd_locked(exec_ctx, fd);
-  }
-  gpr_mu_unlock(&fd->mu);
-
-  GRPC_FD_UNREF(fd, "poll");
-}
-
 /*******************************************************************************
  * pollset_posix.c
  */
@@ -1085,32 +877,45 @@ static void finally_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
   epoll_hdr *h = pollset->data.ptr;
   struct epoll_event ev;
   int err;
-  grpc_fd_watcher watcher;
-
-  /* We pretend to be polling whilst adding an fd to keep the fd from being
-     closed during the add. This may result in a spurious wakeup being assigned
-     to this pollset whilst adding, but that should be benign. */
-  /* TODO (sreek). This fd_begin_poll() really seem to accomplish adding
-   * GRPC_FD_REF() (i.e adding a refcount to the fd) and checking that the
-   * fd is not shutting down (in which case watcher.fd will be NULL and no
-   * refcount is added). The ref count is added only durng hte duration of
-   * adding it to the epoll set (after which fd_end_poll would be called and
-   * the fd's ref count is decremented by 1. So do we still need fd_begin_poll
-   * ??? */
-  GPR_ASSERT(fd_begin_poll(fd, pollset, NULL, 0, 0, &watcher) == 0);
-  if (watcher.fd != NULL) {
-    ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
-    ev.data.ptr = fd;
-    err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev);
-    if (err < 0) {
-      /* FDs may be added to a pollset multiple times, so EEXIST is normal. */
-      if (errno != EEXIST) {
-        gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s", fd->fd,
-                strerror(errno));
-      }
+
+  /* Hold a ref to the fd to keep it from being closed during the add. This may
+     result in a spurious wakeup being assigned to this pollset whilst adding,
+     but that should be benign. */
+  /* TODO: (sreek): Understand how a spurious wake up migh be assinged to this
+   * pollset..and how holding a reference will prevent the fd from being closed
+   * (and perhaps more importantly, see how can an fd be closed while being
+   * added to the epollset */
+  GRPC_FD_REF(fd, "add fd");
+
+  gpr_mu_lock(&fd->mu);
+  if (fd->shutdown) {
+    gpr_mu_unlock(&fd->mu);
+    GRPC_FD_UNREF(fd, "add fd");
+    return;
+  }
+  gpr_mu_unlock(&fd->mu);
+
+  ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
+  ev.data.ptr = fd;
+  err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev);
+  if (err < 0) {
+    /* FDs may be added to a pollset multiple times, so EEXIST is normal. */
+    if (errno != EEXIST) {
+      gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s", fd->fd,
+              strerror(errno));
     }
   }
-  fd_end_poll(exec_ctx, &watcher, 0, 0);
+
+  /* The fd might have been orphaned while we were adding it to the epoll set.
+     Close the fd in such a case (which will also take care of removing it from
+     the epoll set */
+  gpr_mu_lock(&fd->mu);
+  if (fd_is_orphaned(fd) && !fd->closed) {
+    close_fd_locked(exec_ctx, fd);
+  }
+  gpr_mu_unlock(&fd->mu);
+
+  GRPC_FD_UNREF(fd, "add fd");
 }
 
 /* Creates an epoll fd and initializes the pollset */
-- 
GitLab