diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index 9c8133d2d4c3b59cf319fbc75e76799cc00bc3c7..63615ea25f74f9232259860257b0d66e8e41d3f4 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -96,8 +96,10 @@ static grpc_fd *alloc_fd(int fd) { gpr_atm_rel_store(&r->writest, NOT_READY); gpr_atm_rel_store(&r->shutdown, 0); r->fd = fd; - r->watcher_root.next = r->watcher_root.prev = &r->watcher_root; + r->inactive_watcher_root.next = r->inactive_watcher_root.prev = + &r->inactive_watcher_root; r->freelist_next = NULL; + r->read_watcher = r->write_watcher = NULL; return r; } @@ -147,14 +149,34 @@ int grpc_fd_is_orphaned(grpc_fd *fd) { return (gpr_atm_acq_load(&fd->refst) & 1) == 0; } -static void wake_watchers(grpc_fd *fd) { - grpc_fd_watcher *watcher; +static void maybe_wake_one_watcher_locked(grpc_fd *fd) { + if (fd->inactive_watcher_root.next != &fd->inactive_watcher_root) { + grpc_pollset_force_kick(fd->inactive_watcher_root.next->pollset); + } else if (fd->read_watcher) { + grpc_pollset_force_kick(fd->read_watcher->pollset); + } else if (fd->write_watcher) { + grpc_pollset_force_kick(fd->write_watcher->pollset); + } +} + +static void maybe_wake_one_watcher(grpc_fd *fd) { gpr_mu_lock(&fd->watcher_mu); - for (watcher = fd->watcher_root.next; watcher != &fd->watcher_root; - watcher = watcher->next) { + maybe_wake_one_watcher_locked(fd); + gpr_mu_unlock(&fd->watcher_mu); +} + +static void wake_all_watchers(grpc_fd *fd) { + grpc_fd_watcher *watcher; + for (watcher = fd->inactive_watcher_root.next; + watcher != &fd->inactive_watcher_root; watcher = watcher->next) { grpc_pollset_force_kick(watcher->pollset); } - gpr_mu_unlock(&fd->watcher_mu); + if (fd->read_watcher) { + grpc_pollset_force_kick(fd->read_watcher->pollset); + } + if (fd->write_watcher && fd->write_watcher != fd->read_watcher) { + grpc_pollset_force_kick(fd->write_watcher->pollset); + } } void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_cb_func on_done, void *user_data) { @@ -162,7 +184,7 @@ void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_cb_func on_done, void *user_data) { fd->on_done_user_data = user_data; shutdown(fd->fd, SHUT_RDWR); ref_by(fd, 1); /* remove active status, but keep referenced */ - wake_watchers(fd); + wake_all_watchers(fd); unref_by(fd, 2); /* drop the reference */ } @@ -204,7 +226,7 @@ static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_iomgr_closure *closure, set_ready call. NOTE: we don't have an ABA problem here, since we should never have concurrent calls to the same notify_on function. */ - wake_watchers(fd); + maybe_wake_one_watcher(fd); return; } /* swap was unsuccessful due to an intervening set_ready call. @@ -290,29 +312,65 @@ void grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_closure *closure) { gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, gpr_uint32 read_mask, gpr_uint32 write_mask, grpc_fd_watcher *watcher) { + gpr_uint32 mask = 0; /* keep track of pollers that have requested our events, in case they change */ grpc_fd_ref(fd); gpr_mu_lock(&fd->watcher_mu); - watcher->next = &fd->watcher_root; - watcher->prev = watcher->next->prev; - watcher->next->prev = watcher->prev->next = watcher; + /* if there is nobody polling for read, but we need to, then start doing so */ + if (!fd->read_watcher && gpr_atm_acq_load(&fd->readst) > READY) { + fd->read_watcher = watcher; + mask |= read_mask; + } + /* if there is nobody polling for write, but we need to, then start doing so + */ + if (!fd->write_watcher && gpr_atm_acq_load(&fd->writest) > READY) { + fd->write_watcher = watcher; + mask |= write_mask; + } + /* if not polling, remember this watcher in case we need someone to later */ + if (mask == 0) { + watcher->next = &fd->inactive_watcher_root; + watcher->prev = watcher->next->prev; + watcher->next->prev = watcher->prev->next = watcher; + } watcher->pollset = pollset; watcher->fd = fd; gpr_mu_unlock(&fd->watcher_mu); - return (gpr_atm_acq_load(&fd->readst) != READY ? read_mask : 0) | - (gpr_atm_acq_load(&fd->writest) != READY ? write_mask : 0); + return mask; } -void grpc_fd_end_poll(grpc_fd_watcher *watcher) { - gpr_mu_lock(&watcher->fd->watcher_mu); - watcher->next->prev = watcher->prev; - watcher->prev->next = watcher->next; - gpr_mu_unlock(&watcher->fd->watcher_mu); +void grpc_fd_end_poll(grpc_fd_watcher *watcher, int got_read, int got_write) { + int was_polling = 0; + int kick = 0; + grpc_fd *fd = watcher->fd; + + gpr_mu_lock(&fd->watcher_mu); + if (watcher == fd->read_watcher) { + /* remove read watcher, kick if we still need a read */ + was_polling = 1; + kick = kick || !got_read; + fd->read_watcher = NULL; + } + if (watcher == fd->write_watcher) { + /* remove write watcher, kick if we still need a write */ + was_polling = 1; + kick = kick || !got_write; + fd->write_watcher = NULL; + } + if (!was_polling) { + /* remove from inactive list */ + watcher->next->prev = watcher->prev; + watcher->prev->next = watcher->next; + } + if (kick) { + maybe_wake_one_watcher_locked(fd); + } + gpr_mu_unlock(&fd->watcher_mu); - grpc_fd_unref(watcher->fd); + grpc_fd_unref(fd); } void grpc_fd_become_readable(grpc_fd *fd, int allow_synchronous_callback) { diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h index be21f2b55f8ed899d3839b41ffa8deb07dceb996..cfc533b7f562c39aa0615efc2f46bed53892e98f 100644 --- a/src/core/iomgr/fd_posix.h +++ b/src/core/iomgr/fd_posix.h @@ -66,8 +66,32 @@ struct grpc_fd { gpr_mu set_state_mu; gpr_atm shutdown; + /* The watcher list. + + The following watcher related fields are protected by watcher_mu. + + An fd_watcher is an ephemeral object created when an fd wants to + begin polling, and destroyed after the poll. + + It denotes the fd's interest in whether to read poll or write poll + or both or neither on this fd. + + If a watcher is asked to poll for reads or writes, the read_watcher + or write_watcher fields are set respectively. A watcher may be asked + to poll for both, in which case both fields will be set. + + read_watcher and write_watcher may be NULL if no watcher has been + asked to poll for reads or writes. + + If an fd_watcher is not asked to poll for reads or writes, it's added + to a linked list of inactive watchers, rooted at inactive_watcher_root. + If at a later time there becomes need of a poller to poll, one of + the inactive pollers may be kicked out of their poll loops to take + that responsibility. */ gpr_mu watcher_mu; - grpc_fd_watcher watcher_root; + grpc_fd_watcher inactive_watcher_root; + grpc_fd_watcher *read_watcher; + grpc_fd_watcher *write_watcher; gpr_atm readst; gpr_atm writest; @@ -103,7 +127,7 @@ gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, gpr_uint32 read_mask, gpr_uint32 write_mask, grpc_fd_watcher *rec); /* Complete polling previously started with grpc_fd_begin_poll */ -void grpc_fd_end_poll(grpc_fd_watcher *rec); +void grpc_fd_end_poll(grpc_fd_watcher *rec, int got_read, int got_write); /* Return 1 if this fd is orphaned, 0 otherwise */ int grpc_fd_is_orphaned(grpc_fd *fd); diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c index 25b7cfda1a16d5cf5e5d609167eb0ffa5ca90b19..4d36107ab0fed41df0beb74d2367dcdae70e36d5 100644 --- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c +++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c @@ -98,7 +98,7 @@ static void end_polling(grpc_pollset *pollset) { pollset_hdr *h; h = pollset->data.ptr; for (i = 1; i < h->pfd_count; i++) { - grpc_fd_end_poll(&h->watchers[i]); + grpc_fd_end_poll(&h->watchers[i], h->pfds[i].revents & POLLIN, h->pfds[i].revents & POLLOUT); } } diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index f496ac5bfa7f98420b13b0ce57ff6408c1091059..826c792990eb31a3293818cdb5196bcb60be3044 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -420,10 +420,12 @@ static int unary_poll_pollset_maybe_work(grpc_pollset *pollset, pfd[1].events = grpc_fd_begin_poll(fd, pollset, POLLIN, POLLOUT, &fd_watcher); - r = poll(pfd, GPR_ARRAY_SIZE(pfd), timeout); + /* poll fd count (argument 2) is shortened by one if we have no events + to poll on - such that it only includes the kicker */ + r = poll(pfd, GPR_ARRAY_SIZE(pfd) - (pfd[1].events == 0), timeout); GRPC_TIMER_MARK(GRPC_PTAG_POLL_FINISHED, r); - grpc_fd_end_poll(&fd_watcher); + grpc_fd_end_poll(&fd_watcher, pfd[1].revents & POLLIN, pfd[1].revents & POLLOUT); if (r < 0) { if (errno != EINTR) {