diff --git a/BUILD b/BUILD index 2dc3b026d8909de178c64b3812c8c68bf2bb6084..e3ef97154123034d7cf2e43f412ed47e246d0c28 100644 --- a/BUILD +++ b/BUILD @@ -214,7 +214,6 @@ cc_library( "src/core/iomgr/iomgr_internal.h", "src/core/iomgr/iomgr_posix.h", "src/core/iomgr/pollset.h", - "src/core/iomgr/pollset_posix.h", "src/core/iomgr/pollset_set.h", "src/core/iomgr/pollset_set_posix.h", "src/core/iomgr/pollset_set_windows.h", @@ -353,9 +352,6 @@ cc_library( "src/core/iomgr/iomgr.c", "src/core/iomgr/iomgr_posix.c", "src/core/iomgr/iomgr_windows.c", - "src/core/iomgr/pollset_multipoller_with_epoll.c", - "src/core/iomgr/pollset_multipoller_with_poll_posix.c", - "src/core/iomgr/pollset_posix.c", "src/core/iomgr/pollset_set_posix.c", "src/core/iomgr/pollset_set_windows.c", "src/core/iomgr/pollset_windows.c", @@ -521,7 +517,6 @@ cc_library( "src/core/iomgr/iomgr_internal.h", "src/core/iomgr/iomgr_posix.h", "src/core/iomgr/pollset.h", - "src/core/iomgr/pollset_posix.h", "src/core/iomgr/pollset_set.h", "src/core/iomgr/pollset_set_posix.h", "src/core/iomgr/pollset_set_windows.h", @@ -640,9 +635,6 @@ cc_library( "src/core/iomgr/iomgr.c", "src/core/iomgr/iomgr_posix.c", "src/core/iomgr/iomgr_windows.c", - "src/core/iomgr/pollset_multipoller_with_epoll.c", - "src/core/iomgr/pollset_multipoller_with_poll_posix.c", - "src/core/iomgr/pollset_posix.c", "src/core/iomgr/pollset_set_posix.c", "src/core/iomgr/pollset_set_windows.c", "src/core/iomgr/pollset_windows.c", @@ -1319,9 +1311,6 @@ objc_library( "src/core/iomgr/iomgr.c", "src/core/iomgr/iomgr_posix.c", "src/core/iomgr/iomgr_windows.c", - "src/core/iomgr/pollset_multipoller_with_epoll.c", - "src/core/iomgr/pollset_multipoller_with_poll_posix.c", - "src/core/iomgr/pollset_posix.c", "src/core/iomgr/pollset_set_posix.c", "src/core/iomgr/pollset_set_windows.c", "src/core/iomgr/pollset_windows.c", @@ -1482,7 +1471,6 @@ objc_library( "src/core/iomgr/iomgr_internal.h", "src/core/iomgr/iomgr_posix.h", "src/core/iomgr/pollset.h", - "src/core/iomgr/pollset_posix.h", "src/core/iomgr/pollset_set.h", "src/core/iomgr/pollset_set_posix.h", "src/core/iomgr/pollset_set_windows.h", diff --git a/Makefile b/Makefile index 2d46699d81f7df3a9cae4dd4bf81e8bc41062dd2..6b713feac4c6c6e43a0b0effff4b6c9e5a9f6b4a 100644 --- a/Makefile +++ b/Makefile @@ -2386,9 +2386,6 @@ LIBGRPC_SRC = \ src/core/iomgr/iomgr.c \ src/core/iomgr/iomgr_posix.c \ src/core/iomgr/iomgr_windows.c \ - src/core/iomgr/pollset_multipoller_with_epoll.c \ - src/core/iomgr/pollset_multipoller_with_poll_posix.c \ - src/core/iomgr/pollset_posix.c \ src/core/iomgr/pollset_set_posix.c \ src/core/iomgr/pollset_set_windows.c \ src/core/iomgr/pollset_windows.c \ @@ -2673,9 +2670,6 @@ LIBGRPC_UNSECURE_SRC = \ src/core/iomgr/iomgr.c \ src/core/iomgr/iomgr_posix.c \ src/core/iomgr/iomgr_windows.c \ - src/core/iomgr/pollset_multipoller_with_epoll.c \ - src/core/iomgr/pollset_multipoller_with_poll_posix.c \ - src/core/iomgr/pollset_posix.c \ src/core/iomgr/pollset_set_posix.c \ src/core/iomgr/pollset_set_windows.c \ src/core/iomgr/pollset_windows.c \ diff --git a/binding.gyp b/binding.gyp index 89f4a8bbde27f2cc2b6023edd491cc122be8b855..5fb40d2e05ef362dc49883c4152b32bdc137546a 100644 --- a/binding.gyp +++ b/binding.gyp @@ -625,9 +625,6 @@ 'src/core/iomgr/iomgr.c', 'src/core/iomgr/iomgr_posix.c', 'src/core/iomgr/iomgr_windows.c', - 'src/core/iomgr/pollset_multipoller_with_epoll.c', - 'src/core/iomgr/pollset_multipoller_with_poll_posix.c', - 'src/core/iomgr/pollset_posix.c', 'src/core/iomgr/pollset_set_posix.c', 'src/core/iomgr/pollset_set_windows.c', 'src/core/iomgr/pollset_windows.c', diff --git a/build.yaml b/build.yaml index acbc8de4eaa79dc7e3f5ecb62f2e9ee7ddfcc2e1..6075f01c4b78697ff2dd9fc88fea4be21c2b8b4d 100644 --- a/build.yaml +++ b/build.yaml @@ -290,7 +290,6 @@ filegroups: - src/core/iomgr/iomgr_internal.h - src/core/iomgr/iomgr_posix.h - src/core/iomgr/pollset.h - - src/core/iomgr/pollset_posix.h - src/core/iomgr/pollset_set.h - src/core/iomgr/pollset_set_posix.h - src/core/iomgr/pollset_set_windows.h @@ -406,9 +405,6 @@ filegroups: - src/core/iomgr/iomgr.c - src/core/iomgr/iomgr_posix.c - src/core/iomgr/iomgr_windows.c - - src/core/iomgr/pollset_multipoller_with_epoll.c - - src/core/iomgr/pollset_multipoller_with_poll_posix.c - - src/core/iomgr/pollset_posix.c - src/core/iomgr/pollset_set_posix.c - src/core/iomgr/pollset_set_windows.c - src/core/iomgr/pollset_windows.c diff --git a/gRPC.podspec b/gRPC.podspec index cce0d53aad15394a8555e3643e5577ea1359de6b..173145d1b22d1bae2369b251a45773845d08ef71 100644 --- a/gRPC.podspec +++ b/gRPC.podspec @@ -218,7 +218,6 @@ Pod::Spec.new do |s| 'src/core/iomgr/iomgr_internal.h', 'src/core/iomgr/iomgr_posix.h', 'src/core/iomgr/pollset.h', - 'src/core/iomgr/pollset_posix.h', 'src/core/iomgr/pollset_set.h', 'src/core/iomgr/pollset_set_posix.h', 'src/core/iomgr/pollset_set_windows.h', @@ -370,9 +369,6 @@ Pod::Spec.new do |s| 'src/core/iomgr/iomgr.c', 'src/core/iomgr/iomgr_posix.c', 'src/core/iomgr/iomgr_windows.c', - 'src/core/iomgr/pollset_multipoller_with_epoll.c', - 'src/core/iomgr/pollset_multipoller_with_poll_posix.c', - 'src/core/iomgr/pollset_posix.c', 'src/core/iomgr/pollset_set_posix.c', 'src/core/iomgr/pollset_set_windows.c', 'src/core/iomgr/pollset_windows.c', @@ -529,7 +525,6 @@ Pod::Spec.new do |s| 'src/core/iomgr/iomgr_internal.h', 'src/core/iomgr/iomgr_posix.h', 'src/core/iomgr/pollset.h', - 'src/core/iomgr/pollset_posix.h', 'src/core/iomgr/pollset_set.h', 'src/core/iomgr/pollset_set_posix.h', 'src/core/iomgr/pollset_set_windows.h', diff --git a/grpc.gemspec b/grpc.gemspec index 314e2179669c1565764c1a07d3680d169b7afb23..d632f8f62734e25316f8ca1deaa42cf5e72e5df8 100755 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -214,7 +214,6 @@ Gem::Specification.new do |s| s.files += %w( src/core/iomgr/iomgr_internal.h ) s.files += %w( src/core/iomgr/iomgr_posix.h ) s.files += %w( src/core/iomgr/pollset.h ) - s.files += %w( src/core/iomgr/pollset_posix.h ) s.files += %w( src/core/iomgr/pollset_set.h ) s.files += %w( src/core/iomgr/pollset_set_posix.h ) s.files += %w( src/core/iomgr/pollset_set_windows.h ) @@ -353,9 +352,6 @@ Gem::Specification.new do |s| s.files += %w( src/core/iomgr/iomgr.c ) s.files += %w( src/core/iomgr/iomgr_posix.c ) s.files += %w( src/core/iomgr/iomgr_windows.c ) - s.files += %w( src/core/iomgr/pollset_multipoller_with_epoll.c ) - s.files += %w( src/core/iomgr/pollset_multipoller_with_poll_posix.c ) - s.files += %w( src/core/iomgr/pollset_posix.c ) s.files += %w( src/core/iomgr/pollset_set_posix.c ) s.files += %w( src/core/iomgr/pollset_set_windows.c ) s.files += %w( src/core/iomgr/pollset_windows.c ) diff --git a/package.json b/package.json index 661839f1c6496a8e2cb73d270f45b9f1bfedb5f1..24d7d850b26f7aa0d4ad913a638d9832d4158362 100644 --- a/package.json +++ b/package.json @@ -159,7 +159,6 @@ "src/core/iomgr/iomgr_internal.h", "src/core/iomgr/iomgr_posix.h", "src/core/iomgr/pollset.h", - "src/core/iomgr/pollset_posix.h", "src/core/iomgr/pollset_set.h", "src/core/iomgr/pollset_set_posix.h", "src/core/iomgr/pollset_set_windows.h", @@ -298,9 +297,6 @@ "src/core/iomgr/iomgr.c", "src/core/iomgr/iomgr_posix.c", "src/core/iomgr/iomgr_windows.c", - "src/core/iomgr/pollset_multipoller_with_epoll.c", - "src/core/iomgr/pollset_multipoller_with_poll_posix.c", - "src/core/iomgr/pollset_posix.c", "src/core/iomgr/pollset_set_posix.c", "src/core/iomgr/pollset_set_windows.c", "src/core/iomgr/pollset_windows.c", diff --git a/src/core/iomgr/ev_poll_and_epoll_posix.c b/src/core/iomgr/ev_poll_and_epoll_posix.c index 9b2fc2056ddd94b27e8eb8f3d4f371872972f33f..e9ed7144c440a00a4d05211efefdbd8b6deb60f3 100644 --- a/src/core/iomgr/ev_poll_and_epoll_posix.c +++ b/src/core/iomgr/ev_poll_and_epoll_posix.c @@ -49,14 +49,27 @@ #include "src/core/iomgr/ev_poll_and_epoll_posix.h" #include <assert.h> +#include <errno.h> +#include <poll.h> +#include <string.h> #include <sys/socket.h> #include <unistd.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/string_util.h> +#include <grpc/support/tls.h> #include <grpc/support/useful.h> +#include "src/core/iomgr/iomgr_internal.h" +#include "src/core/iomgr/wakeup_fd_posix.h" +#include "src/core/profiling/timers.h" +#include "src/core/support/block_annotate.h" + +/******************************************************************************* + * FD declarations + */ + typedef struct grpc_fd_watcher { struct grpc_fd_watcher *next; struct grpc_fd_watcher *prev; @@ -166,6 +179,112 @@ void grpc_fd_global_shutdown(void); #define CLOSURE_NOT_READY ((grpc_closure *)0) #define CLOSURE_READY ((grpc_closure *)1) +/******************************************************************************* + * pollset declarations + */ + +typedef struct grpc_pollset_vtable grpc_pollset_vtable; + +typedef struct grpc_cached_wakeup_fd { + grpc_wakeup_fd fd; + struct grpc_cached_wakeup_fd *next; +} grpc_cached_wakeup_fd; + +struct grpc_pollset_worker { + grpc_cached_wakeup_fd *wakeup_fd; + int reevaluate_polling_on_wakeup; + int kicked_specifically; + struct grpc_pollset_worker *next; + struct grpc_pollset_worker *prev; +}; + +struct grpc_pollset { + /* pollsets under posix can mutate representation as fds are added and + removed. + For example, we may choose a poll() based implementation on linux for + few fds, and an epoll() based implementation for many fds */ + const grpc_pollset_vtable *vtable; + gpr_mu *mu; + grpc_pollset_worker root_worker; + int in_flight_cbs; + int shutting_down; + int called_shutdown; + int kicked_without_pollers; + grpc_closure *shutdown_done; + grpc_closure_list idle_jobs; + union { + int fd; + void *ptr; + } data; + /* Local cache of eventfds for workers */ + grpc_cached_wakeup_fd *local_wakeup_cache; +}; + +struct grpc_pollset_vtable { + void (*add_fd)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + struct grpc_fd *fd, int and_unlock_pollset); + void (*maybe_work_and_unlock)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_pollset_worker *worker, + gpr_timespec deadline, gpr_timespec now); + void (*finish_shutdown)(grpc_pollset *pollset); + void (*destroy)(grpc_pollset *pollset); +}; + +/* Add an fd to a pollset */ +void grpc_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + struct grpc_fd *fd); + +/* Returns the fd to listen on for kicks */ +int grpc_kick_read_fd(grpc_pollset *p); +/* Call after polling has been kicked to leave the kicked state */ +void grpc_kick_drain(grpc_pollset *p); + +/* Convert a timespec to milliseconds: + - very small or negative poll times are clamped to zero to do a + non-blocking poll (which becomes spin polling) + - other small values are rounded up to one millisecond + - longer than a millisecond polls are rounded up to the next nearest + millisecond to avoid spinning + - infinite timeouts are converted to -1 */ +int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline, + gpr_timespec now); + +/* Allow kick to wakeup the currently polling worker */ +#define GRPC_POLLSET_CAN_KICK_SELF 1 +/* Force the wakee to repoll when awoken */ +#define GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP 2 +/* As per grpc_pollset_kick, with an extended set of flags (defined above) + -- mostly for fd_posix's use. */ +void grpc_pollset_kick_ext(grpc_pollset *p, + grpc_pollset_worker *specific_worker, + uint32_t flags); + +/* turn a pollset into a multipoller: platform specific */ +typedef void (*grpc_platform_become_multipoller_type)(grpc_exec_ctx *exec_ctx, + grpc_pollset *pollset, + struct grpc_fd **fds, + size_t fd_count); +extern grpc_platform_become_multipoller_type grpc_platform_become_multipoller; + +void grpc_poll_become_multipoller(grpc_exec_ctx *exec_ctx, + grpc_pollset *pollset, struct grpc_fd **fds, + size_t fd_count); + +/* Return 1 if the pollset has active threads in grpc_pollset_work (pollset must + * be locked) */ +int grpc_pollset_has_workers(grpc_pollset *pollset); + +void grpc_remove_fd_from_all_epoll_sets(int fd); + +/* override to allow tests to hook poll() usage */ +typedef int (*grpc_poll_function_type)(struct pollfd *, nfds_t, int); +extern grpc_poll_function_type grpc_poll_function; +extern grpc_wakeup_fd grpc_global_wakeup_fd; + +/******************************************************************************* + * fd_posix.c + */ + /* We need to keep a freelist not because of any concerns of malloc performance * but instead so that implementations with multiple threads in (for example) * epoll_wait deal with the race between pollset removal and incoming poll @@ -563,4 +682,1054 @@ void grpc_fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { set_ready(exec_ctx, fd, &fd->write_closure); } +/******************************************************************************* + * pollset_posix.c + */ + +GPR_TLS_DECL(g_current_thread_poller); +GPR_TLS_DECL(g_current_thread_worker); + +/** Default poll() function - a pointer so that it can be overridden by some + * tests */ +grpc_poll_function_type grpc_poll_function = poll; + +/** The alarm system needs to be able to wakeup 'some poller' sometimes + * (specifically when a new alarm needs to be triggered earlier than the next + * alarm 'epoch'). + * This wakeup_fd gives us something to alert on when such a case occurs. */ +grpc_wakeup_fd grpc_global_wakeup_fd; + +static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) { + worker->prev->next = worker->next; + worker->next->prev = worker->prev; +} + +int grpc_pollset_has_workers(grpc_pollset *p) { + return p->root_worker.next != &p->root_worker; +} + +static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) { + if (grpc_pollset_has_workers(p)) { + grpc_pollset_worker *w = p->root_worker.next; + remove_worker(p, w); + return w; + } else { + return NULL; + } +} + +static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) { + worker->next = &p->root_worker; + worker->prev = worker->next->prev; + worker->prev->next = worker->next->prev = worker; +} + +static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) { + worker->prev = &p->root_worker; + worker->next = worker->prev->next; + worker->prev->next = worker->next->prev = worker; +} + +size_t grpc_pollset_size(void) { return sizeof(grpc_pollset); } + +void grpc_pollset_kick_ext(grpc_pollset *p, + grpc_pollset_worker *specific_worker, + uint32_t flags) { + GPR_TIMER_BEGIN("grpc_pollset_kick_ext", 0); + + /* pollset->mu already held */ + if (specific_worker != NULL) { + if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) { + GPR_TIMER_BEGIN("grpc_pollset_kick_ext.broadcast", 0); + GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0); + for (specific_worker = p->root_worker.next; + specific_worker != &p->root_worker; + specific_worker = specific_worker->next) { + grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); + } + p->kicked_without_pollers = 1; + GPR_TIMER_END("grpc_pollset_kick_ext.broadcast", 0); + } else if (gpr_tls_get(&g_current_thread_worker) != + (intptr_t)specific_worker) { + GPR_TIMER_MARK("different_thread_worker", 0); + if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) { + specific_worker->reevaluate_polling_on_wakeup = 1; + } + specific_worker->kicked_specifically = 1; + grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); + } else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) { + GPR_TIMER_MARK("kick_yoself", 0); + if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) { + specific_worker->reevaluate_polling_on_wakeup = 1; + } + specific_worker->kicked_specifically = 1; + grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); + } + } else if (gpr_tls_get(&g_current_thread_poller) != (intptr_t)p) { + GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0); + GPR_TIMER_MARK("kick_anonymous", 0); + specific_worker = pop_front_worker(p); + if (specific_worker != NULL) { + if (gpr_tls_get(&g_current_thread_worker) == (intptr_t)specific_worker) { + GPR_TIMER_MARK("kick_anonymous_not_self", 0); + push_back_worker(p, specific_worker); + specific_worker = pop_front_worker(p); + if ((flags & GRPC_POLLSET_CAN_KICK_SELF) == 0 && + gpr_tls_get(&g_current_thread_worker) == + (intptr_t)specific_worker) { + push_back_worker(p, specific_worker); + specific_worker = NULL; + } + } + if (specific_worker != NULL) { + GPR_TIMER_MARK("finally_kick", 0); + push_back_worker(p, specific_worker); + grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); + } + } else { + GPR_TIMER_MARK("kicked_no_pollers", 0); + p->kicked_without_pollers = 1; + } + } + + GPR_TIMER_END("grpc_pollset_kick_ext", 0); +} + +void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) { + grpc_pollset_kick_ext(p, specific_worker, 0); +} + +/* global state management */ + +void grpc_pollset_global_init(void) { + gpr_tls_init(&g_current_thread_poller); + gpr_tls_init(&g_current_thread_worker); + grpc_wakeup_fd_global_init(); + grpc_wakeup_fd_init(&grpc_global_wakeup_fd); +} + +void grpc_pollset_global_shutdown(void) { + grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd); + gpr_tls_destroy(&g_current_thread_poller); + gpr_tls_destroy(&g_current_thread_worker); + grpc_wakeup_fd_global_destroy(); +} + +void grpc_kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); } + +/* main interface */ + +static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null); + +void grpc_pollset_init(grpc_pollset *pollset, gpr_mu *mu) { + pollset->mu = mu; + pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker; + pollset->in_flight_cbs = 0; + pollset->shutting_down = 0; + pollset->called_shutdown = 0; + pollset->kicked_without_pollers = 0; + pollset->idle_jobs.head = pollset->idle_jobs.tail = NULL; + pollset->local_wakeup_cache = NULL; + pollset->kicked_without_pollers = 0; + become_basic_pollset(pollset, NULL); +} + +void grpc_pollset_destroy(grpc_pollset *pollset) { + GPR_ASSERT(pollset->in_flight_cbs == 0); + GPR_ASSERT(!grpc_pollset_has_workers(pollset)); + GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail); + pollset->vtable->destroy(pollset); + while (pollset->local_wakeup_cache) { + grpc_cached_wakeup_fd *next = pollset->local_wakeup_cache->next; + grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd); + gpr_free(pollset->local_wakeup_cache); + pollset->local_wakeup_cache = next; + } +} + +void grpc_pollset_reset(grpc_pollset *pollset) { + GPR_ASSERT(pollset->shutting_down); + GPR_ASSERT(pollset->in_flight_cbs == 0); + GPR_ASSERT(!grpc_pollset_has_workers(pollset)); + GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail); + pollset->vtable->destroy(pollset); + pollset->shutting_down = 0; + pollset->called_shutdown = 0; + pollset->kicked_without_pollers = 0; + become_basic_pollset(pollset, NULL); +} + +void grpc_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_fd *fd) { + gpr_mu_lock(pollset->mu); + pollset->vtable->add_fd(exec_ctx, pollset, fd, 1); +/* the following (enabled only in debug) will reacquire and then release + our lock - meaning that if the unlocking flag passed to add_fd above is + not respected, the code will deadlock (in a way that we have a chance of + debugging) */ +#ifndef NDEBUG + gpr_mu_lock(pollset->mu); + gpr_mu_unlock(pollset->mu); +#endif +} + +static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { + GPR_ASSERT(grpc_closure_list_empty(pollset->idle_jobs)); + pollset->vtable->finish_shutdown(pollset); + grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, true, NULL); +} + +void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_pollset_worker **worker_hdl, gpr_timespec now, + gpr_timespec deadline) { + grpc_pollset_worker worker; + *worker_hdl = &worker; + + /* pollset->mu already held */ + int added_worker = 0; + int locked = 1; + int queued_work = 0; + int keep_polling = 0; + GPR_TIMER_BEGIN("grpc_pollset_work", 0); + /* this must happen before we (potentially) drop pollset->mu */ + worker.next = worker.prev = NULL; + worker.reevaluate_polling_on_wakeup = 0; + if (pollset->local_wakeup_cache != NULL) { + worker.wakeup_fd = pollset->local_wakeup_cache; + pollset->local_wakeup_cache = worker.wakeup_fd->next; + } else { + worker.wakeup_fd = gpr_malloc(sizeof(*worker.wakeup_fd)); + grpc_wakeup_fd_init(&worker.wakeup_fd->fd); + } + worker.kicked_specifically = 0; + /* If there's work waiting for the pollset to be idle, and the + pollset is idle, then do that work */ + if (!grpc_pollset_has_workers(pollset) && + !grpc_closure_list_empty(pollset->idle_jobs)) { + GPR_TIMER_MARK("grpc_pollset_work.idle_jobs", 0); + grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL); + goto done; + } + /* If we're shutting down then we don't execute any extended work */ + if (pollset->shutting_down) { + GPR_TIMER_MARK("grpc_pollset_work.shutting_down", 0); + goto done; + } + /* Give do_promote priority so we don't starve it out */ + if (pollset->in_flight_cbs) { + GPR_TIMER_MARK("grpc_pollset_work.in_flight_cbs", 0); + gpr_mu_unlock(pollset->mu); + locked = 0; + goto done; + } + /* Start polling, and keep doing so while we're being asked to + re-evaluate our pollers (this allows poll() based pollers to + ensure they don't miss wakeups) */ + keep_polling = 1; + while (keep_polling) { + keep_polling = 0; + if (!pollset->kicked_without_pollers) { + if (!added_worker) { + push_front_worker(pollset, &worker); + added_worker = 1; + gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker); + } + gpr_tls_set(&g_current_thread_poller, (intptr_t)pollset); + GPR_TIMER_BEGIN("maybe_work_and_unlock", 0); + pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, &worker, + deadline, now); + GPR_TIMER_END("maybe_work_and_unlock", 0); + locked = 0; + gpr_tls_set(&g_current_thread_poller, 0); + } else { + GPR_TIMER_MARK("grpc_pollset_work.kicked_without_pollers", 0); + pollset->kicked_without_pollers = 0; + } + /* Finished execution - start cleaning up. + Note that we may arrive here from outside the enclosing while() loop. + In that case we won't loop though as we haven't added worker to the + worker list, which means nobody could ask us to re-evaluate polling). */ + done: + if (!locked) { + queued_work |= grpc_exec_ctx_flush(exec_ctx); + gpr_mu_lock(pollset->mu); + locked = 1; + } + /* If we're forced to re-evaluate polling (via grpc_pollset_kick with + GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force + a loop */ + if (worker.reevaluate_polling_on_wakeup) { + worker.reevaluate_polling_on_wakeup = 0; + pollset->kicked_without_pollers = 0; + if (queued_work || worker.kicked_specifically) { + /* If there's queued work on the list, then set the deadline to be + immediate so we get back out of the polling loop quickly */ + deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC); + } + keep_polling = 1; + } + } + if (added_worker) { + remove_worker(pollset, &worker); + gpr_tls_set(&g_current_thread_worker, 0); + } + /* release wakeup fd to the local pool */ + worker.wakeup_fd->next = pollset->local_wakeup_cache; + pollset->local_wakeup_cache = worker.wakeup_fd; + /* check shutdown conditions */ + if (pollset->shutting_down) { + if (grpc_pollset_has_workers(pollset)) { + grpc_pollset_kick(pollset, NULL); + } else if (!pollset->called_shutdown && pollset->in_flight_cbs == 0) { + pollset->called_shutdown = 1; + gpr_mu_unlock(pollset->mu); + finish_shutdown(exec_ctx, pollset); + grpc_exec_ctx_flush(exec_ctx); + /* Continuing to access pollset here is safe -- it is the caller's + * responsibility to not destroy when it has outstanding calls to + * grpc_pollset_work. + * TODO(dklempner): Can we refactor the shutdown logic to avoid this? */ + gpr_mu_lock(pollset->mu); + } else if (!grpc_closure_list_empty(pollset->idle_jobs)) { + grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL); + gpr_mu_unlock(pollset->mu); + grpc_exec_ctx_flush(exec_ctx); + gpr_mu_lock(pollset->mu); + } + } + *worker_hdl = NULL; + GPR_TIMER_END("grpc_pollset_work", 0); +} + +void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_closure *closure) { + GPR_ASSERT(!pollset->shutting_down); + pollset->shutting_down = 1; + pollset->shutdown_done = closure; + grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); + if (!grpc_pollset_has_workers(pollset)) { + grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL); + } + if (!pollset->called_shutdown && pollset->in_flight_cbs == 0 && + !grpc_pollset_has_workers(pollset)) { + pollset->called_shutdown = 1; + finish_shutdown(exec_ctx, pollset); + } +} + +int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline, + gpr_timespec now) { + gpr_timespec timeout; + static const int64_t max_spin_polling_us = 10; + if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) { + return -1; + } + if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros( + max_spin_polling_us, + GPR_TIMESPAN))) <= 0) { + return 0; + } + timeout = gpr_time_sub(deadline, now); + return gpr_time_to_millis(gpr_time_add( + timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN))); +} + +/* + * basic_pollset - a vtable that provides polling for zero or one file + * descriptor via poll() + */ + +typedef struct grpc_unary_promote_args { + const grpc_pollset_vtable *original_vtable; + grpc_pollset *pollset; + grpc_fd *fd; + grpc_closure promotion_closure; +} grpc_unary_promote_args; + +static void basic_do_promote(grpc_exec_ctx *exec_ctx, void *args, + bool success) { + grpc_unary_promote_args *up_args = args; + const grpc_pollset_vtable *original_vtable = up_args->original_vtable; + grpc_pollset *pollset = up_args->pollset; + grpc_fd *fd = up_args->fd; + + /* + * This is quite tricky. There are a number of cases to keep in mind here: + * 1. fd may have been orphaned + * 2. The pollset may no longer be a unary poller (and we can't let case #1 + * leak to other pollset types!) + * 3. pollset's fd (which may have changed) may have been orphaned + * 4. The pollset may be shutting down. + */ + + gpr_mu_lock(pollset->mu); + /* First we need to ensure that nobody is polling concurrently */ + GPR_ASSERT(!grpc_pollset_has_workers(pollset)); + + gpr_free(up_args); + /* At this point the pollset may no longer be a unary poller. In that case + * we should just call the right add function and be done. */ + /* TODO(klempner): If we're not careful this could cause infinite recursion. + * That's not a problem for now because empty_pollset has a trivial poller + * and we don't have any mechanism to unbecome multipoller. */ + pollset->in_flight_cbs--; + if (pollset->shutting_down) { + /* We don't care about this pollset anymore. */ + if (pollset->in_flight_cbs == 0 && !pollset->called_shutdown) { + pollset->called_shutdown = 1; + finish_shutdown(exec_ctx, pollset); + } + } else if (grpc_fd_is_orphaned(fd)) { + /* Don't try to add it to anything, we'll drop our ref on it below */ + } else if (pollset->vtable != original_vtable) { + pollset->vtable->add_fd(exec_ctx, pollset, fd, 0); + } else if (fd != pollset->data.ptr) { + grpc_fd *fds[2]; + fds[0] = pollset->data.ptr; + fds[1] = fd; + + if (fds[0] && !grpc_fd_is_orphaned(fds[0])) { + grpc_platform_become_multipoller(exec_ctx, pollset, fds, + GPR_ARRAY_SIZE(fds)); + GRPC_FD_UNREF(fds[0], "basicpoll"); + } else { + /* old fd is orphaned and we haven't cleaned it up until now, so remain a + * unary poller */ + /* Note that it is possible that fds[1] is also orphaned at this point. + * That's okay, we'll correct it at the next add or poll. */ + if (fds[0]) GRPC_FD_UNREF(fds[0], "basicpoll"); + pollset->data.ptr = fd; + GRPC_FD_REF(fd, "basicpoll"); + } + } + + gpr_mu_unlock(pollset->mu); + + /* Matching ref in basic_pollset_add_fd */ + GRPC_FD_UNREF(fd, "basicpoll_add"); +} + +static void basic_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_fd *fd, int and_unlock_pollset) { + grpc_unary_promote_args *up_args; + GPR_ASSERT(fd); + if (fd == pollset->data.ptr) goto exit; + + if (!grpc_pollset_has_workers(pollset)) { + /* Fast path -- no in flight cbs */ + /* TODO(klempner): Comment this out and fix any test failures or establish + * they are due to timing issues */ + grpc_fd *fds[2]; + fds[0] = pollset->data.ptr; + fds[1] = fd; + + if (fds[0] == NULL) { + pollset->data.ptr = fd; + GRPC_FD_REF(fd, "basicpoll"); + } else if (!grpc_fd_is_orphaned(fds[0])) { + grpc_platform_become_multipoller(exec_ctx, pollset, fds, + GPR_ARRAY_SIZE(fds)); + GRPC_FD_UNREF(fds[0], "basicpoll"); + } else { + /* old fd is orphaned and we haven't cleaned it up until now, so remain a + * unary poller */ + GRPC_FD_UNREF(fds[0], "basicpoll"); + pollset->data.ptr = fd; + GRPC_FD_REF(fd, "basicpoll"); + } + goto exit; + } + + /* Now we need to promote. This needs to happen when we're not polling. Since + * this may be called from poll, the wait needs to happen asynchronously. */ + GRPC_FD_REF(fd, "basicpoll_add"); + pollset->in_flight_cbs++; + up_args = gpr_malloc(sizeof(*up_args)); + up_args->fd = fd; + up_args->original_vtable = pollset->vtable; + up_args->pollset = pollset; + up_args->promotion_closure.cb = basic_do_promote; + up_args->promotion_closure.cb_arg = up_args; + + grpc_closure_list_add(&pollset->idle_jobs, &up_args->promotion_closure, 1); + grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); + +exit: + if (and_unlock_pollset) { + gpr_mu_unlock(pollset->mu); + } +} + +static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, + grpc_pollset *pollset, + grpc_pollset_worker *worker, + gpr_timespec deadline, + gpr_timespec now) { +#define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR) +#define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR) + + struct pollfd pfd[3]; + grpc_fd *fd; + grpc_fd_watcher fd_watcher; + int timeout; + int r; + nfds_t nfds; + + fd = pollset->data.ptr; + if (fd && grpc_fd_is_orphaned(fd)) { + GRPC_FD_UNREF(fd, "basicpoll"); + fd = pollset->data.ptr = NULL; + } + timeout = grpc_poll_deadline_to_millis_timeout(deadline, now); + pfd[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd); + pfd[0].events = POLLIN; + pfd[0].revents = 0; + pfd[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd->fd); + pfd[1].events = POLLIN; + pfd[1].revents = 0; + nfds = 2; + if (fd) { + pfd[2].fd = fd->fd; + pfd[2].revents = 0; + GRPC_FD_REF(fd, "basicpoll_begin"); + gpr_mu_unlock(pollset->mu); + pfd[2].events = (short)grpc_fd_begin_poll(fd, pollset, worker, POLLIN, + POLLOUT, &fd_watcher); + if (pfd[2].events != 0) { + nfds++; + } + } else { + gpr_mu_unlock(pollset->mu); + } + + /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid + even going into the blocking annotation if possible */ + /* poll fd count (argument 2) is shortened by one if we have no events + to poll on - such that it only includes the kicker */ + GPR_TIMER_BEGIN("poll", 0); + GRPC_SCHEDULING_START_BLOCKING_REGION; + r = grpc_poll_function(pfd, nfds, timeout); + GRPC_SCHEDULING_END_BLOCKING_REGION; + GPR_TIMER_END("poll", 0); + + if (r < 0) { + if (errno != EINTR) { + gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno)); + } + if (fd) { + grpc_fd_end_poll(exec_ctx, &fd_watcher, 0, 0); + } + } else if (r == 0) { + if (fd) { + grpc_fd_end_poll(exec_ctx, &fd_watcher, 0, 0); + } + } else { + if (pfd[0].revents & POLLIN_CHECK) { + grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd); + } + if (pfd[1].revents & POLLIN_CHECK) { + grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd); + } + if (nfds > 2) { + grpc_fd_end_poll(exec_ctx, &fd_watcher, pfd[2].revents & POLLIN_CHECK, + pfd[2].revents & POLLOUT_CHECK); + } else if (fd) { + grpc_fd_end_poll(exec_ctx, &fd_watcher, 0, 0); + } + } + + if (fd) { + GRPC_FD_UNREF(fd, "basicpoll_begin"); + } +} + +static void basic_pollset_destroy(grpc_pollset *pollset) { + if (pollset->data.ptr != NULL) { + GRPC_FD_UNREF(pollset->data.ptr, "basicpoll"); + pollset->data.ptr = NULL; + } +} + +static const grpc_pollset_vtable basic_pollset = { + basic_pollset_add_fd, basic_pollset_maybe_work_and_unlock, + basic_pollset_destroy, basic_pollset_destroy}; + +static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null) { + pollset->vtable = &basic_pollset; + pollset->data.ptr = fd_or_null; + if (fd_or_null != NULL) { + GRPC_FD_REF(fd_or_null, "basicpoll"); + } +} + +/******************************************************************************* + * pollset_multipoller_with_poll_posix.c + */ + +typedef struct { + /* all polled fds */ + size_t fd_count; + size_t fd_capacity; + grpc_fd **fds; + /* fds that have been removed from the pollset explicitly */ + size_t del_count; + size_t del_capacity; + grpc_fd **dels; +} poll_hdr; + +static void multipoll_with_poll_pollset_add_fd(grpc_exec_ctx *exec_ctx, + grpc_pollset *pollset, + grpc_fd *fd, + int and_unlock_pollset) { + size_t i; + poll_hdr *h = pollset->data.ptr; + /* TODO(ctiller): this is O(num_fds^2); maybe switch to a hash set here */ + for (i = 0; i < h->fd_count; i++) { + if (h->fds[i] == fd) goto exit; + } + if (h->fd_count == h->fd_capacity) { + h->fd_capacity = GPR_MAX(h->fd_capacity + 8, h->fd_count * 3 / 2); + h->fds = gpr_realloc(h->fds, sizeof(grpc_fd *) * h->fd_capacity); + } + h->fds[h->fd_count++] = fd; + GRPC_FD_REF(fd, "multipoller"); +exit: + if (and_unlock_pollset) { + gpr_mu_unlock(pollset->mu); + } +} + +static void multipoll_with_poll_pollset_maybe_work_and_unlock( + grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, + gpr_timespec deadline, gpr_timespec now) { +#define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR) +#define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR) + + int timeout; + int r; + size_t i, j, fd_count; + nfds_t pfd_count; + poll_hdr *h; + /* TODO(ctiller): inline some elements to avoid an allocation */ + grpc_fd_watcher *watchers; + struct pollfd *pfds; + + h = pollset->data.ptr; + timeout = grpc_poll_deadline_to_millis_timeout(deadline, now); + /* TODO(ctiller): perform just one malloc here if we exceed the inline case */ + pfds = gpr_malloc(sizeof(*pfds) * (h->fd_count + 2)); + watchers = gpr_malloc(sizeof(*watchers) * (h->fd_count + 2)); + fd_count = 0; + pfd_count = 2; + pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd); + pfds[0].events = POLLIN; + pfds[0].revents = 0; + pfds[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd->fd); + pfds[1].events = POLLIN; + pfds[1].revents = 0; + for (i = 0; i < h->fd_count; i++) { + int remove = grpc_fd_is_orphaned(h->fds[i]); + for (j = 0; !remove && j < h->del_count; j++) { + if (h->fds[i] == h->dels[j]) remove = 1; + } + if (remove) { + GRPC_FD_UNREF(h->fds[i], "multipoller"); + } else { + h->fds[fd_count++] = h->fds[i]; + watchers[pfd_count].fd = h->fds[i]; + pfds[pfd_count].fd = h->fds[i]->fd; + pfds[pfd_count].revents = 0; + pfd_count++; + } + } + for (j = 0; j < h->del_count; j++) { + GRPC_FD_UNREF(h->dels[j], "multipoller_del"); + } + h->del_count = 0; + h->fd_count = fd_count; + gpr_mu_unlock(pollset->mu); + + for (i = 2; i < pfd_count; i++) { + pfds[i].events = (short)grpc_fd_begin_poll(watchers[i].fd, pollset, worker, + POLLIN, POLLOUT, &watchers[i]); + } + + /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid + even going into the blocking annotation if possible */ + GRPC_SCHEDULING_START_BLOCKING_REGION; + r = grpc_poll_function(pfds, pfd_count, timeout); + GRPC_SCHEDULING_END_BLOCKING_REGION; + + if (r < 0) { + if (errno != EINTR) { + gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno)); + } + for (i = 2; i < pfd_count; i++) { + grpc_fd_end_poll(exec_ctx, &watchers[i], 0, 0); + } + } else if (r == 0) { + for (i = 2; i < pfd_count; i++) { + grpc_fd_end_poll(exec_ctx, &watchers[i], 0, 0); + } + } else { + if (pfds[0].revents & POLLIN_CHECK) { + grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd); + } + if (pfds[1].revents & POLLIN_CHECK) { + grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd); + } + for (i = 2; i < pfd_count; i++) { + if (watchers[i].fd == NULL) { + grpc_fd_end_poll(exec_ctx, &watchers[i], 0, 0); + continue; + } + grpc_fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN_CHECK, + pfds[i].revents & POLLOUT_CHECK); + } + } + + gpr_free(pfds); + gpr_free(watchers); +} + +static void multipoll_with_poll_pollset_finish_shutdown(grpc_pollset *pollset) { + size_t i; + poll_hdr *h = pollset->data.ptr; + for (i = 0; i < h->fd_count; i++) { + GRPC_FD_UNREF(h->fds[i], "multipoller"); + } + for (i = 0; i < h->del_count; i++) { + GRPC_FD_UNREF(h->dels[i], "multipoller_del"); + } + h->fd_count = 0; + h->del_count = 0; +} + +static void multipoll_with_poll_pollset_destroy(grpc_pollset *pollset) { + poll_hdr *h = pollset->data.ptr; + multipoll_with_poll_pollset_finish_shutdown(pollset); + gpr_free(h->fds); + gpr_free(h->dels); + gpr_free(h); +} + +static const grpc_pollset_vtable multipoll_with_poll_pollset = { + multipoll_with_poll_pollset_add_fd, + multipoll_with_poll_pollset_maybe_work_and_unlock, + multipoll_with_poll_pollset_finish_shutdown, + multipoll_with_poll_pollset_destroy}; + +void grpc_poll_become_multipoller(grpc_exec_ctx *exec_ctx, + grpc_pollset *pollset, grpc_fd **fds, + size_t nfds) { + size_t i; + poll_hdr *h = gpr_malloc(sizeof(poll_hdr)); + pollset->vtable = &multipoll_with_poll_pollset; + pollset->data.ptr = h; + h->fd_count = nfds; + h->fd_capacity = nfds; + h->fds = gpr_malloc(nfds * sizeof(grpc_fd *)); + h->del_count = 0; + h->del_capacity = 0; + h->dels = NULL; + for (i = 0; i < nfds; i++) { + h->fds[i] = fds[i]; + GRPC_FD_REF(fds[i], "multipoller"); + } +} + +/******************************************************************************* + * pollset_multipoller_with_epoll_posix.c + */ + +#ifdef GPR_LINUX_MULTIPOLL_WITH_EPOLL + +#include <errno.h> +#include <poll.h> +#include <string.h> +#include <sys/epoll.h> +#include <unistd.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/useful.h> + +#include "src/core/iomgr/ev_posix.h" +#include "src/core/profiling/timers.h" +#include "src/core/support/block_annotate.h" + +struct epoll_fd_list { + int *epoll_fds; + size_t count; + size_t capacity; +}; + +static struct epoll_fd_list epoll_fd_global_list; +static gpr_once init_epoll_fd_list_mu = GPR_ONCE_INIT; +static gpr_mu epoll_fd_list_mu; + +static void init_mu(void) { gpr_mu_init(&epoll_fd_list_mu); } + +static void add_epoll_fd_to_global_list(int epoll_fd) { + gpr_once_init(&init_epoll_fd_list_mu, init_mu); + + gpr_mu_lock(&epoll_fd_list_mu); + if (epoll_fd_global_list.count == epoll_fd_global_list.capacity) { + epoll_fd_global_list.capacity = + GPR_MAX((size_t)8, epoll_fd_global_list.capacity * 2); + epoll_fd_global_list.epoll_fds = + gpr_realloc(epoll_fd_global_list.epoll_fds, + epoll_fd_global_list.capacity * sizeof(int)); + } + epoll_fd_global_list.epoll_fds[epoll_fd_global_list.count++] = epoll_fd; + gpr_mu_unlock(&epoll_fd_list_mu); +} + +static void remove_epoll_fd_from_global_list(int epoll_fd) { + gpr_mu_lock(&epoll_fd_list_mu); + GPR_ASSERT(epoll_fd_global_list.count > 0); + for (size_t i = 0; i < epoll_fd_global_list.count; i++) { + if (epoll_fd == epoll_fd_global_list.epoll_fds[i]) { + epoll_fd_global_list.epoll_fds[i] = + epoll_fd_global_list.epoll_fds[--(epoll_fd_global_list.count)]; + break; + } + } + gpr_mu_unlock(&epoll_fd_list_mu); +} + +void grpc_remove_fd_from_all_epoll_sets(int fd) { + int err; + gpr_once_init(&init_epoll_fd_list_mu, init_mu); + gpr_mu_lock(&epoll_fd_list_mu); + if (epoll_fd_global_list.count == 0) { + gpr_mu_unlock(&epoll_fd_list_mu); + return; + } + for (size_t i = 0; i < epoll_fd_global_list.count; i++) { + err = epoll_ctl(epoll_fd_global_list.epoll_fds[i], EPOLL_CTL_DEL, fd, NULL); + if (err < 0 && errno != ENOENT) { + gpr_log(GPR_ERROR, "epoll_ctl del for %d failed: %s", fd, + strerror(errno)); + } + } + gpr_mu_unlock(&epoll_fd_list_mu); +} + +typedef struct { + grpc_pollset *pollset; + grpc_fd *fd; + grpc_closure closure; +} delayed_add; + +typedef struct { int epoll_fd; } epoll_hdr; + +static void finally_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_fd *fd) { + epoll_hdr *h = pollset->data.ptr; + struct epoll_event ev; + int err; + grpc_fd_watcher watcher; + + /* We pretend to be polling whilst adding an fd to keep the fd from being + closed during the add. This may result in a spurious wakeup being assigned + to this pollset whilst adding, but that should be benign. */ + GPR_ASSERT(grpc_fd_begin_poll(fd, pollset, NULL, 0, 0, &watcher) == 0); + if (watcher.fd != NULL) { + ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET); + ev.data.ptr = fd; + err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev); + if (err < 0) { + /* FDs may be added to a pollset multiple times, so EEXIST is normal. */ + if (errno != EEXIST) { + gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s", fd->fd, + strerror(errno)); + } + } + } + grpc_fd_end_poll(exec_ctx, &watcher, 0, 0); +} + +static void perform_delayed_add(grpc_exec_ctx *exec_ctx, void *arg, + bool iomgr_status) { + delayed_add *da = arg; + + if (!grpc_fd_is_orphaned(da->fd)) { + finally_add_fd(exec_ctx, da->pollset, da->fd); + } + + gpr_mu_lock(da->pollset->mu); + da->pollset->in_flight_cbs--; + if (da->pollset->shutting_down) { + /* We don't care about this pollset anymore. */ + if (da->pollset->in_flight_cbs == 0 && !da->pollset->called_shutdown) { + da->pollset->called_shutdown = 1; + grpc_exec_ctx_enqueue(exec_ctx, da->pollset->shutdown_done, true, NULL); + } + } + gpr_mu_unlock(da->pollset->mu); + + GRPC_FD_UNREF(da->fd, "delayed_add"); + + gpr_free(da); +} + +static void multipoll_with_epoll_pollset_add_fd(grpc_exec_ctx *exec_ctx, + grpc_pollset *pollset, + grpc_fd *fd, + int and_unlock_pollset) { + if (and_unlock_pollset) { + gpr_mu_unlock(pollset->mu); + finally_add_fd(exec_ctx, pollset, fd); + } else { + delayed_add *da = gpr_malloc(sizeof(*da)); + da->pollset = pollset; + da->fd = fd; + GRPC_FD_REF(fd, "delayed_add"); + grpc_closure_init(&da->closure, perform_delayed_add, da); + pollset->in_flight_cbs++; + grpc_exec_ctx_enqueue(exec_ctx, &da->closure, true, NULL); + } +} + +/* TODO(klempner): We probably want to turn this down a bit */ +#define GRPC_EPOLL_MAX_EVENTS 1000 + +static void multipoll_with_epoll_pollset_maybe_work_and_unlock( + grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, + gpr_timespec deadline, gpr_timespec now) { + struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS]; + int ep_rv; + int poll_rv; + epoll_hdr *h = pollset->data.ptr; + int timeout_ms; + struct pollfd pfds[2]; + + /* If you want to ignore epoll's ability to sanely handle parallel pollers, + * for a more apples-to-apples performance comparison with poll, add a + * if (pollset->counter != 0) { return 0; } + * here. + */ + + gpr_mu_unlock(pollset->mu); + + timeout_ms = grpc_poll_deadline_to_millis_timeout(deadline, now); + + pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd->fd); + pfds[0].events = POLLIN; + pfds[0].revents = 0; + pfds[1].fd = h->epoll_fd; + pfds[1].events = POLLIN; + pfds[1].revents = 0; + + /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid + even going into the blocking annotation if possible */ + GPR_TIMER_BEGIN("poll", 0); + GRPC_SCHEDULING_START_BLOCKING_REGION; + poll_rv = grpc_poll_function(pfds, 2, timeout_ms); + GRPC_SCHEDULING_END_BLOCKING_REGION; + GPR_TIMER_END("poll", 0); + + if (poll_rv < 0) { + if (errno != EINTR) { + gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno)); + } + } else if (poll_rv == 0) { + /* do nothing */ + } else { + if (pfds[0].revents) { + grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd); + } + if (pfds[1].revents) { + do { + /* The following epoll_wait never blocks; it has a timeout of 0 */ + ep_rv = epoll_wait(h->epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0); + if (ep_rv < 0) { + if (errno != EINTR) { + gpr_log(GPR_ERROR, "epoll_wait() failed: %s", strerror(errno)); + } + } else { + int i; + for (i = 0; i < ep_rv; ++i) { + grpc_fd *fd = ep_ev[i].data.ptr; + /* TODO(klempner): We might want to consider making err and pri + * separate events */ + int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP); + int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI); + int write_ev = ep_ev[i].events & EPOLLOUT; + if (fd == NULL) { + grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd); + } else { + if (read_ev || cancel) { + grpc_fd_become_readable(exec_ctx, fd); + } + if (write_ev || cancel) { + grpc_fd_become_writable(exec_ctx, fd); + } + } + } + } + } while (ep_rv == GRPC_EPOLL_MAX_EVENTS); + } + } +} + +static void multipoll_with_epoll_pollset_finish_shutdown( + grpc_pollset *pollset) {} + +static void multipoll_with_epoll_pollset_destroy(grpc_pollset *pollset) { + epoll_hdr *h = pollset->data.ptr; + close(h->epoll_fd); + remove_epoll_fd_from_global_list(h->epoll_fd); + gpr_free(h); +} + +static const grpc_pollset_vtable multipoll_with_epoll_pollset = { + multipoll_with_epoll_pollset_add_fd, + multipoll_with_epoll_pollset_maybe_work_and_unlock, + multipoll_with_epoll_pollset_finish_shutdown, + multipoll_with_epoll_pollset_destroy}; + +static void epoll_become_multipoller(grpc_exec_ctx *exec_ctx, + grpc_pollset *pollset, grpc_fd **fds, + size_t nfds) { + size_t i; + epoll_hdr *h = gpr_malloc(sizeof(epoll_hdr)); + struct epoll_event ev; + int err; + + pollset->vtable = &multipoll_with_epoll_pollset; + pollset->data.ptr = h; + h->epoll_fd = epoll_create1(EPOLL_CLOEXEC); + if (h->epoll_fd < 0) { + /* TODO(klempner): Fall back to poll here, especially on ENOSYS */ + gpr_log(GPR_ERROR, "epoll_create1 failed: %s", strerror(errno)); + abort(); + } + add_epoll_fd_to_global_list(h->epoll_fd); + + ev.events = (uint32_t)(EPOLLIN | EPOLLET); + ev.data.ptr = NULL; + err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, + GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), &ev); + if (err < 0) { + gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s", + GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), + strerror(errno)); + } + + for (i = 0; i < nfds; i++) { + multipoll_with_epoll_pollset_add_fd(exec_ctx, pollset, fds[i], 0); + } +} + +grpc_platform_become_multipoller_type grpc_platform_become_multipoller = + epoll_become_multipoller; + +#else /* GPR_LINUX_MULTIPOLL_WITH_EPOLL */ + +void grpc_remove_fd_from_all_epoll_sets(int fd) {} + +#endif /* GPR_LINUX_MULTIPOLL_WITH_EPOLL */ + #endif diff --git a/src/core/iomgr/ev_poll_and_epoll_posix.h b/src/core/iomgr/ev_poll_and_epoll_posix.h index e70807a21c9aea3a7109b67a47756151ccdc1a15..72a2576415181c8c2893535730847bede31de0bb 100644 --- a/src/core/iomgr/ev_poll_and_epoll_posix.h +++ b/src/core/iomgr/ev_poll_and_epoll_posix.h @@ -31,7 +31,9 @@ * */ - #ifndef GRPC_INTERNAL_CORE_IOMGR_EV_POLL_AND_EPOLL_POSIX_H - #define GRPC_INTERNAL_CORE_IOMGR_EV_POLL_AND_EPOLL_POSIX_H +#ifndef GRPC_INTERNAL_CORE_IOMGR_EV_POLL_AND_EPOLL_POSIX_H +#define GRPC_INTERNAL_CORE_IOMGR_EV_POLL_AND_EPOLL_POSIX_H - #endif // GRPC_INTERNAL_CORE_IOMGR_EV_POLL_AND_EPOLL_POSIX_H +#include "src/core/iomgr/ev_posix.h" + +#endif // GRPC_INTERNAL_CORE_IOMGR_EV_POLL_AND_EPOLL_POSIX_H diff --git a/src/core/iomgr/ev_posix.h b/src/core/iomgr/ev_posix.h index 4df1331f1f2947f4e04b2b8c451bd1e34b315f17..03e2b0c01c6c6c0126a049e3176942cc402f9266 100644 --- a/src/core/iomgr/ev_posix.h +++ b/src/core/iomgr/ev_posix.h @@ -35,12 +35,13 @@ #define GRPC_INTERNAL_CORE_IOMGR_EV_POSIX_H #include "src/core/iomgr/exec_ctx.h" +#include "src/core/iomgr/pollset.h" typedef struct grpc_fd grpc_fd; -typedef struct grpc_pollset grpc_pollset; -typedef struct grpc_pollset_worker grpc_pollset_worker; typedef struct grpc_event_engine_vtable { + size_t pollset_size; + grpc_fd *(*fd_create)(int fd, const char *name); int (*fd_wrapped_fd)(grpc_fd *fd); void (*fd_orphan)(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done, @@ -50,6 +51,17 @@ typedef struct grpc_event_engine_vtable { grpc_closure *closure); void (*fd_notify_on_write)(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure); + + void (*pollset_init)(grpc_pollset *pollset, gpr_mu *mu); + void (*pollset_shutdown)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_closure *closure); + void (*pollset_reset)(grpc_pollset *pollset); + void (*pollset_destroy)(grpc_pollset *pollset); + void (*pollset_work)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_pollset_worker **worker, gpr_timespec now, + gpr_timespec deadline); + void (*pollset_kick)(grpc_pollset *pollset, + grpc_pollset_worker *specific_worker); } grpc_event_engine_vtable; extern const grpc_event_engine_vtable *grpc_event_engine; diff --git a/src/core/iomgr/iomgr_posix.c b/src/core/iomgr/iomgr_posix.c index f1094f5a48e440077a90bafade0858ddb7714040..6e8e09a8514bf5df4269571c03e21844b69eb726 100644 --- a/src/core/iomgr/iomgr_posix.c +++ b/src/core/iomgr/iomgr_posix.c @@ -35,18 +35,18 @@ #ifdef GPR_POSIX_SOCKET -#include "src/core/iomgr/iomgr_posix.h" #include "src/core/debug/trace.h" #include "src/core/iomgr/ev_posix.h" +#include "src/core/iomgr/iomgr_posix.h" #include "src/core/iomgr/tcp_posix.h" void grpc_iomgr_platform_init(void) { - grpc_fd_global_init(); + grpc_event_engine_init(); grpc_register_tracer("tcp", &grpc_tcp_trace); } void grpc_iomgr_platform_flush(void) {} -void grpc_iomgr_platform_shutdown(void) { grpc_fd_global_shutdown(); } +void grpc_iomgr_platform_shutdown(void) { grpc_event_engine_shutdown(); } #endif /* GRPC_POSIX_SOCKET */ diff --git a/src/core/iomgr/pollset.h b/src/core/iomgr/pollset.h index 5e185cf5f3233e439504837acefdcb998a8d301a..39e4ff2440837f7b2878c6d0407f7ef74f7df945 100644 --- a/src/core/iomgr/pollset.h +++ b/src/core/iomgr/pollset.h @@ -49,18 +49,8 @@ - a completion queue might keep a pollset with an entry for each transport that is servicing a call that it's tracking */ -<<<<<<< HEAD -#ifdef GPR_POSIX_SOCKET -#include "src/core/iomgr/ev_posix.h" -#endif - -#ifdef GPR_WIN32 -#include "src/core/iomgr/pollset_windows.h" -#endif -======= typedef struct grpc_pollset grpc_pollset; typedef struct grpc_pollset_worker grpc_pollset_worker; ->>>>>>> c605c62b30ca15c83a7c4e98386062c62de0d36d size_t grpc_pollset_size(void); void grpc_pollset_init(grpc_pollset *pollset, gpr_mu *mu); diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c deleted file mode 100644 index 27884166dc757881c5a81bbee14c8ff50225b77b..0000000000000000000000000000000000000000 --- a/src/core/iomgr/pollset_multipoller_with_epoll.c +++ /dev/null @@ -1,324 +0,0 @@ -/* - * - * Copyright 2015-2016, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include <grpc/support/port_platform.h> - -#ifdef GPR_LINUX_MULTIPOLL_WITH_EPOLL - -#include <errno.h> -#include <poll.h> -#include <string.h> -#include <sys/epoll.h> -#include <unistd.h> - -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include <grpc/support/useful.h> - -#include "src/core/iomgr/ev_posix.h" -#include "src/core/profiling/timers.h" -#include "src/core/support/block_annotate.h" - -struct epoll_fd_list { - int *epoll_fds; - size_t count; - size_t capacity; -}; - -static struct epoll_fd_list epoll_fd_global_list; -static gpr_once init_epoll_fd_list_mu = GPR_ONCE_INIT; -static gpr_mu epoll_fd_list_mu; - -static void init_mu(void) { gpr_mu_init(&epoll_fd_list_mu); } - -static void add_epoll_fd_to_global_list(int epoll_fd) { - gpr_once_init(&init_epoll_fd_list_mu, init_mu); - - gpr_mu_lock(&epoll_fd_list_mu); - if (epoll_fd_global_list.count == epoll_fd_global_list.capacity) { - epoll_fd_global_list.capacity = - GPR_MAX((size_t)8, epoll_fd_global_list.capacity * 2); - epoll_fd_global_list.epoll_fds = - gpr_realloc(epoll_fd_global_list.epoll_fds, - epoll_fd_global_list.capacity * sizeof(int)); - } - epoll_fd_global_list.epoll_fds[epoll_fd_global_list.count++] = epoll_fd; - gpr_mu_unlock(&epoll_fd_list_mu); -} - -static void remove_epoll_fd_from_global_list(int epoll_fd) { - gpr_mu_lock(&epoll_fd_list_mu); - GPR_ASSERT(epoll_fd_global_list.count > 0); - for (size_t i = 0; i < epoll_fd_global_list.count; i++) { - if (epoll_fd == epoll_fd_global_list.epoll_fds[i]) { - epoll_fd_global_list.epoll_fds[i] = - epoll_fd_global_list.epoll_fds[--(epoll_fd_global_list.count)]; - break; - } - } - gpr_mu_unlock(&epoll_fd_list_mu); -} - -void grpc_remove_fd_from_all_epoll_sets(int fd) { - int err; - gpr_once_init(&init_epoll_fd_list_mu, init_mu); - gpr_mu_lock(&epoll_fd_list_mu); - if (epoll_fd_global_list.count == 0) { - gpr_mu_unlock(&epoll_fd_list_mu); - return; - } - for (size_t i = 0; i < epoll_fd_global_list.count; i++) { - err = epoll_ctl(epoll_fd_global_list.epoll_fds[i], EPOLL_CTL_DEL, fd, NULL); - if (err < 0 && errno != ENOENT) { - gpr_log(GPR_ERROR, "epoll_ctl del for %d failed: %s", fd, - strerror(errno)); - } - } - gpr_mu_unlock(&epoll_fd_list_mu); -} - -typedef struct { - grpc_pollset *pollset; - grpc_fd *fd; - grpc_closure closure; -} delayed_add; - -typedef struct { int epoll_fd; } pollset_hdr; - -static void finally_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_fd *fd) { - pollset_hdr *h = pollset->data.ptr; - struct epoll_event ev; - int err; - grpc_fd_watcher watcher; - - /* We pretend to be polling whilst adding an fd to keep the fd from being - closed during the add. This may result in a spurious wakeup being assigned - to this pollset whilst adding, but that should be benign. */ - GPR_ASSERT(grpc_fd_begin_poll(fd, pollset, NULL, 0, 0, &watcher) == 0); - if (watcher.fd != NULL) { - ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET); - ev.data.ptr = fd; - err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev); - if (err < 0) { - /* FDs may be added to a pollset multiple times, so EEXIST is normal. */ - if (errno != EEXIST) { - gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s", fd->fd, - strerror(errno)); - } - } - } - grpc_fd_end_poll(exec_ctx, &watcher, 0, 0); -} - -static void perform_delayed_add(grpc_exec_ctx *exec_ctx, void *arg, - bool iomgr_status) { - delayed_add *da = arg; - - if (!grpc_fd_is_orphaned(da->fd)) { - finally_add_fd(exec_ctx, da->pollset, da->fd); - } - - gpr_mu_lock(da->pollset->mu); - da->pollset->in_flight_cbs--; - if (da->pollset->shutting_down) { - /* We don't care about this pollset anymore. */ - if (da->pollset->in_flight_cbs == 0 && !da->pollset->called_shutdown) { - da->pollset->called_shutdown = 1; - grpc_exec_ctx_enqueue(exec_ctx, da->pollset->shutdown_done, true, NULL); - } - } - gpr_mu_unlock(da->pollset->mu); - - GRPC_FD_UNREF(da->fd, "delayed_add"); - - gpr_free(da); -} - -static void multipoll_with_epoll_pollset_add_fd(grpc_exec_ctx *exec_ctx, - grpc_pollset *pollset, - grpc_fd *fd, - int and_unlock_pollset) { - if (and_unlock_pollset) { - gpr_mu_unlock(pollset->mu); - finally_add_fd(exec_ctx, pollset, fd); - } else { - delayed_add *da = gpr_malloc(sizeof(*da)); - da->pollset = pollset; - da->fd = fd; - GRPC_FD_REF(fd, "delayed_add"); - grpc_closure_init(&da->closure, perform_delayed_add, da); - pollset->in_flight_cbs++; - grpc_exec_ctx_enqueue(exec_ctx, &da->closure, true, NULL); - } -} - -/* TODO(klempner): We probably want to turn this down a bit */ -#define GRPC_EPOLL_MAX_EVENTS 1000 - -static void multipoll_with_epoll_pollset_maybe_work_and_unlock( - grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, - gpr_timespec deadline, gpr_timespec now) { - struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS]; - int ep_rv; - int poll_rv; - pollset_hdr *h = pollset->data.ptr; - int timeout_ms; - struct pollfd pfds[2]; - - /* If you want to ignore epoll's ability to sanely handle parallel pollers, - * for a more apples-to-apples performance comparison with poll, add a - * if (pollset->counter != 0) { return 0; } - * here. - */ - - gpr_mu_unlock(pollset->mu); - - timeout_ms = grpc_poll_deadline_to_millis_timeout(deadline, now); - - pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd->fd); - pfds[0].events = POLLIN; - pfds[0].revents = 0; - pfds[1].fd = h->epoll_fd; - pfds[1].events = POLLIN; - pfds[1].revents = 0; - - /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid - even going into the blocking annotation if possible */ - GPR_TIMER_BEGIN("poll", 0); - GRPC_SCHEDULING_START_BLOCKING_REGION; - poll_rv = grpc_poll_function(pfds, 2, timeout_ms); - GRPC_SCHEDULING_END_BLOCKING_REGION; - GPR_TIMER_END("poll", 0); - - if (poll_rv < 0) { - if (errno != EINTR) { - gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno)); - } - } else if (poll_rv == 0) { - /* do nothing */ - } else { - if (pfds[0].revents) { - grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd); - } - if (pfds[1].revents) { - do { - /* The following epoll_wait never blocks; it has a timeout of 0 */ - ep_rv = epoll_wait(h->epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0); - if (ep_rv < 0) { - if (errno != EINTR) { - gpr_log(GPR_ERROR, "epoll_wait() failed: %s", strerror(errno)); - } - } else { - int i; - for (i = 0; i < ep_rv; ++i) { - grpc_fd *fd = ep_ev[i].data.ptr; - /* TODO(klempner): We might want to consider making err and pri - * separate events */ - int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP); - int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI); - int write_ev = ep_ev[i].events & EPOLLOUT; - if (fd == NULL) { - grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd); - } else { - if (read_ev || cancel) { - grpc_fd_become_readable(exec_ctx, fd); - } - if (write_ev || cancel) { - grpc_fd_become_writable(exec_ctx, fd); - } - } - } - } - } while (ep_rv == GRPC_EPOLL_MAX_EVENTS); - } - } -} - -static void multipoll_with_epoll_pollset_finish_shutdown( - grpc_pollset *pollset) {} - -static void multipoll_with_epoll_pollset_destroy(grpc_pollset *pollset) { - pollset_hdr *h = pollset->data.ptr; - close(h->epoll_fd); - remove_epoll_fd_from_global_list(h->epoll_fd); - gpr_free(h); -} - -static const grpc_pollset_vtable multipoll_with_epoll_pollset = { - multipoll_with_epoll_pollset_add_fd, - multipoll_with_epoll_pollset_maybe_work_and_unlock, - multipoll_with_epoll_pollset_finish_shutdown, - multipoll_with_epoll_pollset_destroy}; - -static void epoll_become_multipoller(grpc_exec_ctx *exec_ctx, - grpc_pollset *pollset, grpc_fd **fds, - size_t nfds) { - size_t i; - pollset_hdr *h = gpr_malloc(sizeof(pollset_hdr)); - struct epoll_event ev; - int err; - - pollset->vtable = &multipoll_with_epoll_pollset; - pollset->data.ptr = h; - h->epoll_fd = epoll_create1(EPOLL_CLOEXEC); - if (h->epoll_fd < 0) { - /* TODO(klempner): Fall back to poll here, especially on ENOSYS */ - gpr_log(GPR_ERROR, "epoll_create1 failed: %s", strerror(errno)); - abort(); - } - add_epoll_fd_to_global_list(h->epoll_fd); - - ev.events = (uint32_t)(EPOLLIN | EPOLLET); - ev.data.ptr = NULL; - err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, - GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), &ev); - if (err < 0) { - gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s", - GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), - strerror(errno)); - } - - for (i = 0; i < nfds; i++) { - multipoll_with_epoll_pollset_add_fd(exec_ctx, pollset, fds[i], 0); - } -} - -grpc_platform_become_multipoller_type grpc_platform_become_multipoller = - epoll_become_multipoller; - -#else /* GPR_LINUX_MULTIPOLL_WITH_EPOLL */ - -void grpc_remove_fd_from_all_epoll_sets(int fd) {} - -#endif /* GPR_LINUX_MULTIPOLL_WITH_EPOLL */ diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c deleted file mode 100644 index ef3d8c26132ad889db900bcbc8704a3cad56569d..0000000000000000000000000000000000000000 --- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c +++ /dev/null @@ -1,230 +0,0 @@ -/* - * - * Copyright 2015-2016, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include <grpc/support/port_platform.h> - -#ifdef GPR_POSIX_SOCKET - -#include "src/core/iomgr/pollset_posix.h" - -#include <errno.h> -#include <poll.h> -#include <stdlib.h> -#include <string.h> - -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include <grpc/support/useful.h> - -#include "src/core/iomgr/ev_posix.h" -#include "src/core/iomgr/iomgr_internal.h" -#include "src/core/support/block_annotate.h" - -typedef struct { - /* all polled fds */ - size_t fd_count; - size_t fd_capacity; - grpc_fd **fds; - /* fds that have been removed from the pollset explicitly */ - size_t del_count; - size_t del_capacity; - grpc_fd **dels; -} pollset_hdr; - -static void multipoll_with_poll_pollset_add_fd(grpc_exec_ctx *exec_ctx, - grpc_pollset *pollset, - grpc_fd *fd, - int and_unlock_pollset) { - size_t i; - pollset_hdr *h = pollset->data.ptr; - /* TODO(ctiller): this is O(num_fds^2); maybe switch to a hash set here */ - for (i = 0; i < h->fd_count; i++) { - if (h->fds[i] == fd) goto exit; - } - if (h->fd_count == h->fd_capacity) { - h->fd_capacity = GPR_MAX(h->fd_capacity + 8, h->fd_count * 3 / 2); - h->fds = gpr_realloc(h->fds, sizeof(grpc_fd *) * h->fd_capacity); - } - h->fds[h->fd_count++] = fd; - GRPC_FD_REF(fd, "multipoller"); -exit: - if (and_unlock_pollset) { - gpr_mu_unlock(pollset->mu); - } -} - -static void multipoll_with_poll_pollset_maybe_work_and_unlock( - grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, - gpr_timespec deadline, gpr_timespec now) { -#define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR) -#define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR) - - int timeout; - int r; - size_t i, j, fd_count; - nfds_t pfd_count; - pollset_hdr *h; - /* TODO(ctiller): inline some elements to avoid an allocation */ - grpc_fd_watcher *watchers; - struct pollfd *pfds; - - h = pollset->data.ptr; - timeout = grpc_poll_deadline_to_millis_timeout(deadline, now); - /* TODO(ctiller): perform just one malloc here if we exceed the inline case */ - pfds = gpr_malloc(sizeof(*pfds) * (h->fd_count + 2)); - watchers = gpr_malloc(sizeof(*watchers) * (h->fd_count + 2)); - fd_count = 0; - pfd_count = 2; - pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd); - pfds[0].events = POLLIN; - pfds[0].revents = 0; - pfds[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd->fd); - pfds[1].events = POLLIN; - pfds[1].revents = 0; - for (i = 0; i < h->fd_count; i++) { - int remove = grpc_fd_is_orphaned(h->fds[i]); - for (j = 0; !remove && j < h->del_count; j++) { - if (h->fds[i] == h->dels[j]) remove = 1; - } - if (remove) { - GRPC_FD_UNREF(h->fds[i], "multipoller"); - } else { - h->fds[fd_count++] = h->fds[i]; - watchers[pfd_count].fd = h->fds[i]; - pfds[pfd_count].fd = h->fds[i]->fd; - pfds[pfd_count].revents = 0; - pfd_count++; - } - } - for (j = 0; j < h->del_count; j++) { - GRPC_FD_UNREF(h->dels[j], "multipoller_del"); - } - h->del_count = 0; - h->fd_count = fd_count; - gpr_mu_unlock(pollset->mu); - - for (i = 2; i < pfd_count; i++) { - pfds[i].events = (short)grpc_fd_begin_poll(watchers[i].fd, pollset, worker, - POLLIN, POLLOUT, &watchers[i]); - } - - /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid - even going into the blocking annotation if possible */ - GRPC_SCHEDULING_START_BLOCKING_REGION; - r = grpc_poll_function(pfds, pfd_count, timeout); - GRPC_SCHEDULING_END_BLOCKING_REGION; - - if (r < 0) { - if (errno != EINTR) { - gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno)); - } - for (i = 2; i < pfd_count; i++) { - grpc_fd_end_poll(exec_ctx, &watchers[i], 0, 0); - } - } else if (r == 0) { - for (i = 2; i < pfd_count; i++) { - grpc_fd_end_poll(exec_ctx, &watchers[i], 0, 0); - } - } else { - if (pfds[0].revents & POLLIN_CHECK) { - grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd); - } - if (pfds[1].revents & POLLIN_CHECK) { - grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd); - } - for (i = 2; i < pfd_count; i++) { - if (watchers[i].fd == NULL) { - grpc_fd_end_poll(exec_ctx, &watchers[i], 0, 0); - continue; - } - grpc_fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN_CHECK, - pfds[i].revents & POLLOUT_CHECK); - } - } - - gpr_free(pfds); - gpr_free(watchers); -} - -static void multipoll_with_poll_pollset_finish_shutdown(grpc_pollset *pollset) { - size_t i; - pollset_hdr *h = pollset->data.ptr; - for (i = 0; i < h->fd_count; i++) { - GRPC_FD_UNREF(h->fds[i], "multipoller"); - } - for (i = 0; i < h->del_count; i++) { - GRPC_FD_UNREF(h->dels[i], "multipoller_del"); - } - h->fd_count = 0; - h->del_count = 0; -} - -static void multipoll_with_poll_pollset_destroy(grpc_pollset *pollset) { - pollset_hdr *h = pollset->data.ptr; - multipoll_with_poll_pollset_finish_shutdown(pollset); - gpr_free(h->fds); - gpr_free(h->dels); - gpr_free(h); -} - -static const grpc_pollset_vtable multipoll_with_poll_pollset = { - multipoll_with_poll_pollset_add_fd, - multipoll_with_poll_pollset_maybe_work_and_unlock, - multipoll_with_poll_pollset_finish_shutdown, - multipoll_with_poll_pollset_destroy}; - -void grpc_poll_become_multipoller(grpc_exec_ctx *exec_ctx, - grpc_pollset *pollset, grpc_fd **fds, - size_t nfds) { - size_t i; - pollset_hdr *h = gpr_malloc(sizeof(pollset_hdr)); - pollset->vtable = &multipoll_with_poll_pollset; - pollset->data.ptr = h; - h->fd_count = nfds; - h->fd_capacity = nfds; - h->fds = gpr_malloc(nfds * sizeof(grpc_fd *)); - h->del_count = 0; - h->del_capacity = 0; - h->dels = NULL; - for (i = 0; i < nfds; i++) { - h->fds[i] = fds[i]; - GRPC_FD_REF(fds[i], "multipoller"); - } -} - -#endif /* GPR_POSIX_SOCKET */ - -#ifdef GPR_POSIX_MULTIPOLL_WITH_POLL -grpc_platform_become_multipoller_type grpc_platform_become_multipoller = - grpc_poll_become_multipoller; -#endif diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c deleted file mode 100644 index 20200113cdbd2d55d90cfcabfc2d18a7da7b22e4..0000000000000000000000000000000000000000 --- a/src/core/iomgr/pollset_posix.c +++ /dev/null @@ -1,633 +0,0 @@ -/* - * - * Copyright 2015-2016, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include <grpc/support/port_platform.h> - -#ifdef GPR_POSIX_SOCKET - -#include "src/core/iomgr/pollset_posix.h" - -#include <errno.h> -#include <stdlib.h> -#include <string.h> -#include <unistd.h> - -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include <grpc/support/thd.h> -#include <grpc/support/tls.h> -#include <grpc/support/useful.h> - -#include "src/core/iomgr/ev_posix.h" -#include "src/core/iomgr/iomgr_internal.h" -#include "src/core/iomgr/socket_utils_posix.h" -#include "src/core/profiling/timers.h" -#include "src/core/support/block_annotate.h" - -GPR_TLS_DECL(g_current_thread_poller); -GPR_TLS_DECL(g_current_thread_worker); - -/** Default poll() function - a pointer so that it can be overridden by some - * tests */ -grpc_poll_function_type grpc_poll_function = poll; - -/** The alarm system needs to be able to wakeup 'some poller' sometimes - * (specifically when a new alarm needs to be triggered earlier than the next - * alarm 'epoch'). - * This wakeup_fd gives us something to alert on when such a case occurs. */ -grpc_wakeup_fd grpc_global_wakeup_fd; - -static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) { - worker->prev->next = worker->next; - worker->next->prev = worker->prev; -} - -int grpc_pollset_has_workers(grpc_pollset *p) { - return p->root_worker.next != &p->root_worker; -} - -static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) { - if (grpc_pollset_has_workers(p)) { - grpc_pollset_worker *w = p->root_worker.next; - remove_worker(p, w); - return w; - } else { - return NULL; - } -} - -static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) { - worker->next = &p->root_worker; - worker->prev = worker->next->prev; - worker->prev->next = worker->next->prev = worker; -} - -static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) { - worker->prev = &p->root_worker; - worker->next = worker->prev->next; - worker->prev->next = worker->next->prev = worker; -} - -size_t grpc_pollset_size(void) { return sizeof(grpc_pollset); } - -void grpc_pollset_kick_ext(grpc_pollset *p, - grpc_pollset_worker *specific_worker, - uint32_t flags) { - GPR_TIMER_BEGIN("grpc_pollset_kick_ext", 0); - - /* pollset->mu already held */ - if (specific_worker != NULL) { - if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) { - GPR_TIMER_BEGIN("grpc_pollset_kick_ext.broadcast", 0); - GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0); - for (specific_worker = p->root_worker.next; - specific_worker != &p->root_worker; - specific_worker = specific_worker->next) { - grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); - } - p->kicked_without_pollers = 1; - GPR_TIMER_END("grpc_pollset_kick_ext.broadcast", 0); - } else if (gpr_tls_get(&g_current_thread_worker) != - (intptr_t)specific_worker) { - GPR_TIMER_MARK("different_thread_worker", 0); - if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) { - specific_worker->reevaluate_polling_on_wakeup = 1; - } - specific_worker->kicked_specifically = 1; - grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); - } else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) { - GPR_TIMER_MARK("kick_yoself", 0); - if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) { - specific_worker->reevaluate_polling_on_wakeup = 1; - } - specific_worker->kicked_specifically = 1; - grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); - } - } else if (gpr_tls_get(&g_current_thread_poller) != (intptr_t)p) { - GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0); - GPR_TIMER_MARK("kick_anonymous", 0); - specific_worker = pop_front_worker(p); - if (specific_worker != NULL) { - if (gpr_tls_get(&g_current_thread_worker) == (intptr_t)specific_worker) { - GPR_TIMER_MARK("kick_anonymous_not_self", 0); - push_back_worker(p, specific_worker); - specific_worker = pop_front_worker(p); - if ((flags & GRPC_POLLSET_CAN_KICK_SELF) == 0 && - gpr_tls_get(&g_current_thread_worker) == - (intptr_t)specific_worker) { - push_back_worker(p, specific_worker); - specific_worker = NULL; - } - } - if (specific_worker != NULL) { - GPR_TIMER_MARK("finally_kick", 0); - push_back_worker(p, specific_worker); - grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); - } - } else { - GPR_TIMER_MARK("kicked_no_pollers", 0); - p->kicked_without_pollers = 1; - } - } - - GPR_TIMER_END("grpc_pollset_kick_ext", 0); -} - -void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) { - grpc_pollset_kick_ext(p, specific_worker, 0); -} - -/* global state management */ - -void grpc_pollset_global_init(void) { - gpr_tls_init(&g_current_thread_poller); - gpr_tls_init(&g_current_thread_worker); - grpc_wakeup_fd_global_init(); - grpc_wakeup_fd_init(&grpc_global_wakeup_fd); -} - -void grpc_pollset_global_shutdown(void) { - grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd); - gpr_tls_destroy(&g_current_thread_poller); - gpr_tls_destroy(&g_current_thread_worker); - grpc_wakeup_fd_global_destroy(); -} - -void grpc_kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); } - -/* main interface */ - -static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null); - -void grpc_pollset_init(grpc_pollset *pollset, gpr_mu *mu) { - pollset->mu = mu; - pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker; - pollset->in_flight_cbs = 0; - pollset->shutting_down = 0; - pollset->called_shutdown = 0; - pollset->kicked_without_pollers = 0; - pollset->idle_jobs.head = pollset->idle_jobs.tail = NULL; - pollset->local_wakeup_cache = NULL; - pollset->kicked_without_pollers = 0; - become_basic_pollset(pollset, NULL); -} - -void grpc_pollset_destroy(grpc_pollset *pollset) { - GPR_ASSERT(pollset->in_flight_cbs == 0); - GPR_ASSERT(!grpc_pollset_has_workers(pollset)); - GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail); - pollset->vtable->destroy(pollset); - while (pollset->local_wakeup_cache) { - grpc_cached_wakeup_fd *next = pollset->local_wakeup_cache->next; - grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd); - gpr_free(pollset->local_wakeup_cache); - pollset->local_wakeup_cache = next; - } -} - -void grpc_pollset_reset(grpc_pollset *pollset) { - GPR_ASSERT(pollset->shutting_down); - GPR_ASSERT(pollset->in_flight_cbs == 0); - GPR_ASSERT(!grpc_pollset_has_workers(pollset)); - GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail); - pollset->vtable->destroy(pollset); - pollset->shutting_down = 0; - pollset->called_shutdown = 0; - pollset->kicked_without_pollers = 0; - become_basic_pollset(pollset, NULL); -} - -void grpc_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_fd *fd) { - gpr_mu_lock(pollset->mu); - pollset->vtable->add_fd(exec_ctx, pollset, fd, 1); -/* the following (enabled only in debug) will reacquire and then release - our lock - meaning that if the unlocking flag passed to add_fd above is - not respected, the code will deadlock (in a way that we have a chance of - debugging) */ -#ifndef NDEBUG - gpr_mu_lock(pollset->mu); - gpr_mu_unlock(pollset->mu); -#endif -} - -static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { - GPR_ASSERT(grpc_closure_list_empty(pollset->idle_jobs)); - pollset->vtable->finish_shutdown(pollset); - grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, true, NULL); -} - -void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker **worker_hdl, gpr_timespec now, - gpr_timespec deadline) { - grpc_pollset_worker worker; - *worker_hdl = &worker; - - /* pollset->mu already held */ - int added_worker = 0; - int locked = 1; - int queued_work = 0; - int keep_polling = 0; - GPR_TIMER_BEGIN("grpc_pollset_work", 0); - /* this must happen before we (potentially) drop pollset->mu */ - worker.next = worker.prev = NULL; - worker.reevaluate_polling_on_wakeup = 0; - if (pollset->local_wakeup_cache != NULL) { - worker.wakeup_fd = pollset->local_wakeup_cache; - pollset->local_wakeup_cache = worker.wakeup_fd->next; - } else { - worker.wakeup_fd = gpr_malloc(sizeof(*worker.wakeup_fd)); - grpc_wakeup_fd_init(&worker.wakeup_fd->fd); - } - worker.kicked_specifically = 0; - /* If there's work waiting for the pollset to be idle, and the - pollset is idle, then do that work */ - if (!grpc_pollset_has_workers(pollset) && - !grpc_closure_list_empty(pollset->idle_jobs)) { - GPR_TIMER_MARK("grpc_pollset_work.idle_jobs", 0); - grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL); - goto done; - } - /* If we're shutting down then we don't execute any extended work */ - if (pollset->shutting_down) { - GPR_TIMER_MARK("grpc_pollset_work.shutting_down", 0); - goto done; - } - /* Give do_promote priority so we don't starve it out */ - if (pollset->in_flight_cbs) { - GPR_TIMER_MARK("grpc_pollset_work.in_flight_cbs", 0); - gpr_mu_unlock(pollset->mu); - locked = 0; - goto done; - } - /* Start polling, and keep doing so while we're being asked to - re-evaluate our pollers (this allows poll() based pollers to - ensure they don't miss wakeups) */ - keep_polling = 1; - while (keep_polling) { - keep_polling = 0; - if (!pollset->kicked_without_pollers) { - if (!added_worker) { - push_front_worker(pollset, &worker); - added_worker = 1; - gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker); - } - gpr_tls_set(&g_current_thread_poller, (intptr_t)pollset); - GPR_TIMER_BEGIN("maybe_work_and_unlock", 0); - pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, &worker, - deadline, now); - GPR_TIMER_END("maybe_work_and_unlock", 0); - locked = 0; - gpr_tls_set(&g_current_thread_poller, 0); - } else { - GPR_TIMER_MARK("grpc_pollset_work.kicked_without_pollers", 0); - pollset->kicked_without_pollers = 0; - } - /* Finished execution - start cleaning up. - Note that we may arrive here from outside the enclosing while() loop. - In that case we won't loop though as we haven't added worker to the - worker list, which means nobody could ask us to re-evaluate polling). */ - done: - if (!locked) { - queued_work |= grpc_exec_ctx_flush(exec_ctx); - gpr_mu_lock(pollset->mu); - locked = 1; - } - /* If we're forced to re-evaluate polling (via grpc_pollset_kick with - GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force - a loop */ - if (worker.reevaluate_polling_on_wakeup) { - worker.reevaluate_polling_on_wakeup = 0; - pollset->kicked_without_pollers = 0; - if (queued_work || worker.kicked_specifically) { - /* If there's queued work on the list, then set the deadline to be - immediate so we get back out of the polling loop quickly */ - deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC); - } - keep_polling = 1; - } - } - if (added_worker) { - remove_worker(pollset, &worker); - gpr_tls_set(&g_current_thread_worker, 0); - } - /* release wakeup fd to the local pool */ - worker.wakeup_fd->next = pollset->local_wakeup_cache; - pollset->local_wakeup_cache = worker.wakeup_fd; - /* check shutdown conditions */ - if (pollset->shutting_down) { - if (grpc_pollset_has_workers(pollset)) { - grpc_pollset_kick(pollset, NULL); - } else if (!pollset->called_shutdown && pollset->in_flight_cbs == 0) { - pollset->called_shutdown = 1; - gpr_mu_unlock(pollset->mu); - finish_shutdown(exec_ctx, pollset); - grpc_exec_ctx_flush(exec_ctx); - /* Continuing to access pollset here is safe -- it is the caller's - * responsibility to not destroy when it has outstanding calls to - * grpc_pollset_work. - * TODO(dklempner): Can we refactor the shutdown logic to avoid this? */ - gpr_mu_lock(pollset->mu); - } else if (!grpc_closure_list_empty(pollset->idle_jobs)) { - grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL); - gpr_mu_unlock(pollset->mu); - grpc_exec_ctx_flush(exec_ctx); - gpr_mu_lock(pollset->mu); - } - } - *worker_hdl = NULL; - GPR_TIMER_END("grpc_pollset_work", 0); -} - -void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_closure *closure) { - GPR_ASSERT(!pollset->shutting_down); - pollset->shutting_down = 1; - pollset->shutdown_done = closure; - grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); - if (!grpc_pollset_has_workers(pollset)) { - grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL); - } - if (!pollset->called_shutdown && pollset->in_flight_cbs == 0 && - !grpc_pollset_has_workers(pollset)) { - pollset->called_shutdown = 1; - finish_shutdown(exec_ctx, pollset); - } -} - -int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline, - gpr_timespec now) { - gpr_timespec timeout; - static const int64_t max_spin_polling_us = 10; - if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) { - return -1; - } - if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros( - max_spin_polling_us, - GPR_TIMESPAN))) <= 0) { - return 0; - } - timeout = gpr_time_sub(deadline, now); - return gpr_time_to_millis(gpr_time_add( - timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN))); -} - -/* - * basic_pollset - a vtable that provides polling for zero or one file - * descriptor via poll() - */ - -typedef struct grpc_unary_promote_args { - const grpc_pollset_vtable *original_vtable; - grpc_pollset *pollset; - grpc_fd *fd; - grpc_closure promotion_closure; -} grpc_unary_promote_args; - -static void basic_do_promote(grpc_exec_ctx *exec_ctx, void *args, - bool success) { - grpc_unary_promote_args *up_args = args; - const grpc_pollset_vtable *original_vtable = up_args->original_vtable; - grpc_pollset *pollset = up_args->pollset; - grpc_fd *fd = up_args->fd; - - /* - * This is quite tricky. There are a number of cases to keep in mind here: - * 1. fd may have been orphaned - * 2. The pollset may no longer be a unary poller (and we can't let case #1 - * leak to other pollset types!) - * 3. pollset's fd (which may have changed) may have been orphaned - * 4. The pollset may be shutting down. - */ - - gpr_mu_lock(pollset->mu); - /* First we need to ensure that nobody is polling concurrently */ - GPR_ASSERT(!grpc_pollset_has_workers(pollset)); - - gpr_free(up_args); - /* At this point the pollset may no longer be a unary poller. In that case - * we should just call the right add function and be done. */ - /* TODO(klempner): If we're not careful this could cause infinite recursion. - * That's not a problem for now because empty_pollset has a trivial poller - * and we don't have any mechanism to unbecome multipoller. */ - pollset->in_flight_cbs--; - if (pollset->shutting_down) { - /* We don't care about this pollset anymore. */ - if (pollset->in_flight_cbs == 0 && !pollset->called_shutdown) { - pollset->called_shutdown = 1; - finish_shutdown(exec_ctx, pollset); - } - } else if (grpc_fd_is_orphaned(fd)) { - /* Don't try to add it to anything, we'll drop our ref on it below */ - } else if (pollset->vtable != original_vtable) { - pollset->vtable->add_fd(exec_ctx, pollset, fd, 0); - } else if (fd != pollset->data.ptr) { - grpc_fd *fds[2]; - fds[0] = pollset->data.ptr; - fds[1] = fd; - - if (fds[0] && !grpc_fd_is_orphaned(fds[0])) { - grpc_platform_become_multipoller(exec_ctx, pollset, fds, - GPR_ARRAY_SIZE(fds)); - GRPC_FD_UNREF(fds[0], "basicpoll"); - } else { - /* old fd is orphaned and we haven't cleaned it up until now, so remain a - * unary poller */ - /* Note that it is possible that fds[1] is also orphaned at this point. - * That's okay, we'll correct it at the next add or poll. */ - if (fds[0]) GRPC_FD_UNREF(fds[0], "basicpoll"); - pollset->data.ptr = fd; - GRPC_FD_REF(fd, "basicpoll"); - } - } - - gpr_mu_unlock(pollset->mu); - - /* Matching ref in basic_pollset_add_fd */ - GRPC_FD_UNREF(fd, "basicpoll_add"); -} - -static void basic_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_fd *fd, int and_unlock_pollset) { - grpc_unary_promote_args *up_args; - GPR_ASSERT(fd); - if (fd == pollset->data.ptr) goto exit; - - if (!grpc_pollset_has_workers(pollset)) { - /* Fast path -- no in flight cbs */ - /* TODO(klempner): Comment this out and fix any test failures or establish - * they are due to timing issues */ - grpc_fd *fds[2]; - fds[0] = pollset->data.ptr; - fds[1] = fd; - - if (fds[0] == NULL) { - pollset->data.ptr = fd; - GRPC_FD_REF(fd, "basicpoll"); - } else if (!grpc_fd_is_orphaned(fds[0])) { - grpc_platform_become_multipoller(exec_ctx, pollset, fds, - GPR_ARRAY_SIZE(fds)); - GRPC_FD_UNREF(fds[0], "basicpoll"); - } else { - /* old fd is orphaned and we haven't cleaned it up until now, so remain a - * unary poller */ - GRPC_FD_UNREF(fds[0], "basicpoll"); - pollset->data.ptr = fd; - GRPC_FD_REF(fd, "basicpoll"); - } - goto exit; - } - - /* Now we need to promote. This needs to happen when we're not polling. Since - * this may be called from poll, the wait needs to happen asynchronously. */ - GRPC_FD_REF(fd, "basicpoll_add"); - pollset->in_flight_cbs++; - up_args = gpr_malloc(sizeof(*up_args)); - up_args->fd = fd; - up_args->original_vtable = pollset->vtable; - up_args->pollset = pollset; - up_args->promotion_closure.cb = basic_do_promote; - up_args->promotion_closure.cb_arg = up_args; - - grpc_closure_list_add(&pollset->idle_jobs, &up_args->promotion_closure, 1); - grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); - -exit: - if (and_unlock_pollset) { - gpr_mu_unlock(pollset->mu); - } -} - -static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, - grpc_pollset *pollset, - grpc_pollset_worker *worker, - gpr_timespec deadline, - gpr_timespec now) { -#define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR) -#define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR) - - struct pollfd pfd[3]; - grpc_fd *fd; - grpc_fd_watcher fd_watcher; - int timeout; - int r; - nfds_t nfds; - - fd = pollset->data.ptr; - if (fd && grpc_fd_is_orphaned(fd)) { - GRPC_FD_UNREF(fd, "basicpoll"); - fd = pollset->data.ptr = NULL; - } - timeout = grpc_poll_deadline_to_millis_timeout(deadline, now); - pfd[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd); - pfd[0].events = POLLIN; - pfd[0].revents = 0; - pfd[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd->fd); - pfd[1].events = POLLIN; - pfd[1].revents = 0; - nfds = 2; - if (fd) { - pfd[2].fd = fd->fd; - pfd[2].revents = 0; - GRPC_FD_REF(fd, "basicpoll_begin"); - gpr_mu_unlock(pollset->mu); - pfd[2].events = (short)grpc_fd_begin_poll(fd, pollset, worker, POLLIN, - POLLOUT, &fd_watcher); - if (pfd[2].events != 0) { - nfds++; - } - } else { - gpr_mu_unlock(pollset->mu); - } - - /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid - even going into the blocking annotation if possible */ - /* poll fd count (argument 2) is shortened by one if we have no events - to poll on - such that it only includes the kicker */ - GPR_TIMER_BEGIN("poll", 0); - GRPC_SCHEDULING_START_BLOCKING_REGION; - r = grpc_poll_function(pfd, nfds, timeout); - GRPC_SCHEDULING_END_BLOCKING_REGION; - GPR_TIMER_END("poll", 0); - - if (r < 0) { - if (errno != EINTR) { - gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno)); - } - if (fd) { - grpc_fd_end_poll(exec_ctx, &fd_watcher, 0, 0); - } - } else if (r == 0) { - if (fd) { - grpc_fd_end_poll(exec_ctx, &fd_watcher, 0, 0); - } - } else { - if (pfd[0].revents & POLLIN_CHECK) { - grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd); - } - if (pfd[1].revents & POLLIN_CHECK) { - grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd); - } - if (nfds > 2) { - grpc_fd_end_poll(exec_ctx, &fd_watcher, pfd[2].revents & POLLIN_CHECK, - pfd[2].revents & POLLOUT_CHECK); - } else if (fd) { - grpc_fd_end_poll(exec_ctx, &fd_watcher, 0, 0); - } - } - - if (fd) { - GRPC_FD_UNREF(fd, "basicpoll_begin"); - } -} - -static void basic_pollset_destroy(grpc_pollset *pollset) { - if (pollset->data.ptr != NULL) { - GRPC_FD_UNREF(pollset->data.ptr, "basicpoll"); - pollset->data.ptr = NULL; - } -} - -static const grpc_pollset_vtable basic_pollset = { - basic_pollset_add_fd, basic_pollset_maybe_work_and_unlock, - basic_pollset_destroy, basic_pollset_destroy}; - -static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null) { - pollset->vtable = &basic_pollset; - pollset->data.ptr = fd_or_null; - if (fd_or_null != NULL) { - GRPC_FD_REF(fd_or_null, "basicpoll"); - } -} - -#endif /* GPR_POSIX_POLLSET */ diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h deleted file mode 100644 index 3cb3a06c274ffff2306c5f36a9d3dec5883ad28c..0000000000000000000000000000000000000000 --- a/src/core/iomgr/pollset_posix.h +++ /dev/null @@ -1,151 +0,0 @@ -/* - * - * Copyright 2015-2016, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H -#define GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H - -#if 0 -#include <poll.h> - -#include <grpc/support/sync.h> - -#include "src/core/iomgr/exec_ctx.h" -#include "src/core/iomgr/iomgr.h" -#include "src/core/iomgr/pollset.h" -#include "src/core/iomgr/wakeup_fd_posix.h" - -typedef struct grpc_pollset_vtable grpc_pollset_vtable; - -/* forward declare only in this file to avoid leaking impl details via - pollset.h; real users of grpc_fd should always include 'fd_posix.h' and not - use the struct tag */ -struct grpc_fd; - -typedef struct grpc_cached_wakeup_fd { - grpc_wakeup_fd fd; - struct grpc_cached_wakeup_fd *next; -} grpc_cached_wakeup_fd; - -struct grpc_pollset_worker { - grpc_cached_wakeup_fd *wakeup_fd; - int reevaluate_polling_on_wakeup; - int kicked_specifically; - struct grpc_pollset_worker *next; - struct grpc_pollset_worker *prev; -}; - -struct grpc_pollset { - /* pollsets under posix can mutate representation as fds are added and - removed. - For example, we may choose a poll() based implementation on linux for - few fds, and an epoll() based implementation for many fds */ - const grpc_pollset_vtable *vtable; - gpr_mu *mu; - grpc_pollset_worker root_worker; - int in_flight_cbs; - int shutting_down; - int called_shutdown; - int kicked_without_pollers; - grpc_closure *shutdown_done; - grpc_closure_list idle_jobs; - union { - int fd; - void *ptr; - } data; - /* Local cache of eventfds for workers */ - grpc_cached_wakeup_fd *local_wakeup_cache; -}; - -struct grpc_pollset_vtable { - void (*add_fd)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - struct grpc_fd *fd, int and_unlock_pollset); - void (*maybe_work_and_unlock)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker *worker, - gpr_timespec deadline, gpr_timespec now); - void (*finish_shutdown)(grpc_pollset *pollset); - void (*destroy)(grpc_pollset *pollset); -}; - -/* Add an fd to a pollset */ -void grpc_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - struct grpc_fd *fd); - -/* Returns the fd to listen on for kicks */ -int grpc_kick_read_fd(grpc_pollset *p); -/* Call after polling has been kicked to leave the kicked state */ -void grpc_kick_drain(grpc_pollset *p); - -/* Convert a timespec to milliseconds: - - very small or negative poll times are clamped to zero to do a - non-blocking poll (which becomes spin polling) - - other small values are rounded up to one millisecond - - longer than a millisecond polls are rounded up to the next nearest - millisecond to avoid spinning - - infinite timeouts are converted to -1 */ -int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline, - gpr_timespec now); - -/* Allow kick to wakeup the currently polling worker */ -#define GRPC_POLLSET_CAN_KICK_SELF 1 -/* Force the wakee to repoll when awoken */ -#define GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP 2 -/* As per grpc_pollset_kick, with an extended set of flags (defined above) - -- mostly for fd_posix's use. */ -void grpc_pollset_kick_ext(grpc_pollset *p, - grpc_pollset_worker *specific_worker, - uint32_t flags); - -/* turn a pollset into a multipoller: platform specific */ -typedef void (*grpc_platform_become_multipoller_type)(grpc_exec_ctx *exec_ctx, - grpc_pollset *pollset, - struct grpc_fd **fds, - size_t fd_count); -extern grpc_platform_become_multipoller_type grpc_platform_become_multipoller; - -void grpc_poll_become_multipoller(grpc_exec_ctx *exec_ctx, - grpc_pollset *pollset, struct grpc_fd **fds, - size_t fd_count); - -/* Return 1 if the pollset has active threads in grpc_pollset_work (pollset must - * be locked) */ -int grpc_pollset_has_workers(grpc_pollset *pollset); - -void grpc_remove_fd_from_all_epoll_sets(int fd); - -/* override to allow tests to hook poll() usage */ -typedef int (*grpc_poll_function_type)(struct pollfd *, nfds_t, int); -extern grpc_poll_function_type grpc_poll_function; -extern grpc_wakeup_fd grpc_global_wakeup_fd; -#endif - -#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H */ diff --git a/src/core/iomgr/pollset_set_posix.h b/src/core/iomgr/pollset_set_posix.h index 4c583b603b73bd67dec2c210622c16f7a3cdea1c..a8f78314ed226f331293cd9c4f38c7bb050f0de2 100644 --- a/src/core/iomgr/pollset_set_posix.h +++ b/src/core/iomgr/pollset_set_posix.h @@ -34,12 +34,8 @@ #ifndef GRPC_INTERNAL_CORE_IOMGR_POLLSET_SET_POSIX_H #define GRPC_INTERNAL_CORE_IOMGR_POLLSET_SET_POSIX_H -<<<<<<< HEAD #include <grpc/support/sync.h> #include "src/core/iomgr/ev_posix.h" -======= -#include "src/core/iomgr/fd_posix.h" ->>>>>>> c605c62b30ca15c83a7c4e98386062c62de0d36d typedef struct grpc_pollset_set { gpr_mu mu; diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index d18aff0058e95ea37a30f9224523d8d76dae550d..40efb0f2970de9e6f98da3c717bc221641c6e60f 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -141,9 +141,6 @@ CORE_SOURCE_FILES = [ 'src/core/iomgr/iomgr.c', 'src/core/iomgr/iomgr_posix.c', 'src/core/iomgr/iomgr_windows.c', - 'src/core/iomgr/pollset_multipoller_with_epoll.c', - 'src/core/iomgr/pollset_multipoller_with_poll_posix.c', - 'src/core/iomgr/pollset_posix.c', 'src/core/iomgr/pollset_set_posix.c', 'src/core/iomgr/pollset_set_windows.c', 'src/core/iomgr/pollset_windows.c', diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index edd6d41aac8e2db156de38f692e4882ce623af71..a3e7d4925619b1503470e8e81a16e6d06c6ac59a 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -833,7 +833,6 @@ src/core/iomgr/iomgr.h \ src/core/iomgr/iomgr_internal.h \ src/core/iomgr/iomgr_posix.h \ src/core/iomgr/pollset.h \ -src/core/iomgr/pollset_posix.h \ src/core/iomgr/pollset_set.h \ src/core/iomgr/pollset_set_posix.h \ src/core/iomgr/pollset_set_windows.h \ @@ -972,9 +971,6 @@ src/core/iomgr/iocp_windows.c \ src/core/iomgr/iomgr.c \ src/core/iomgr/iomgr_posix.c \ src/core/iomgr/iomgr_windows.c \ -src/core/iomgr/pollset_multipoller_with_epoll.c \ -src/core/iomgr/pollset_multipoller_with_poll_posix.c \ -src/core/iomgr/pollset_posix.c \ src/core/iomgr/pollset_set_posix.c \ src/core/iomgr/pollset_set_windows.c \ src/core/iomgr/pollset_windows.c \ diff --git a/tools/run_tests/sources_and_headers.json b/tools/run_tests/sources_and_headers.json index 160d06ad6444dbac7fed41bb093022e3b08e1f24..64c2b68756c75015472a4eee8ba05145b49c190d 100644 --- a/tools/run_tests/sources_and_headers.json +++ b/tools/run_tests/sources_and_headers.json @@ -3030,7 +3030,6 @@ "src/core/iomgr/iomgr_internal.h", "src/core/iomgr/iomgr_posix.h", "src/core/iomgr/pollset.h", - "src/core/iomgr/pollset_posix.h", "src/core/iomgr/pollset_set.h", "src/core/iomgr/pollset_set_posix.h", "src/core/iomgr/pollset_set_windows.h", @@ -3232,10 +3231,6 @@ "src/core/iomgr/iomgr_posix.h", "src/core/iomgr/iomgr_windows.c", "src/core/iomgr/pollset.h", - "src/core/iomgr/pollset_multipoller_with_epoll.c", - "src/core/iomgr/pollset_multipoller_with_poll_posix.c", - "src/core/iomgr/pollset_posix.c", - "src/core/iomgr/pollset_posix.h", "src/core/iomgr/pollset_set.h", "src/core/iomgr/pollset_set_posix.c", "src/core/iomgr/pollset_set_posix.h", @@ -3562,7 +3557,6 @@ "src/core/iomgr/iomgr_internal.h", "src/core/iomgr/iomgr_posix.h", "src/core/iomgr/pollset.h", - "src/core/iomgr/pollset_posix.h", "src/core/iomgr/pollset_set.h", "src/core/iomgr/pollset_set_posix.h", "src/core/iomgr/pollset_set_windows.h", @@ -3748,10 +3742,6 @@ "src/core/iomgr/iomgr_posix.h", "src/core/iomgr/iomgr_windows.c", "src/core/iomgr/pollset.h", - "src/core/iomgr/pollset_multipoller_with_epoll.c", - "src/core/iomgr/pollset_multipoller_with_poll_posix.c", - "src/core/iomgr/pollset_posix.c", - "src/core/iomgr/pollset_posix.h", "src/core/iomgr/pollset_set.h", "src/core/iomgr/pollset_set_posix.c", "src/core/iomgr/pollset_set_posix.h", diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj b/vsprojects/vcxproj/grpc/grpc.vcxproj index 2986005782d1866a866250a12b25f7f6a367ca86..90a2558570a8ec0f183d3aee72d39b4ad025eb71 100644 --- a/vsprojects/vcxproj/grpc/grpc.vcxproj +++ b/vsprojects/vcxproj/grpc/grpc.vcxproj @@ -342,7 +342,6 @@ <ClInclude Include="$(SolutionDir)\..\src\core\iomgr\iomgr_internal.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\iomgr\iomgr_posix.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\iomgr\pollset.h" /> - <ClInclude Include="$(SolutionDir)\..\src\core\iomgr\pollset_posix.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\iomgr\pollset_set.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\iomgr\pollset_set_posix.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\iomgr\pollset_set_windows.h" /> @@ -551,12 +550,6 @@ </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\iomgr\iomgr_windows.c"> </ClCompile> - <ClCompile Include="$(SolutionDir)\..\src\core\iomgr\pollset_multipoller_with_epoll.c"> - </ClCompile> - <ClCompile Include="$(SolutionDir)\..\src\core\iomgr\pollset_multipoller_with_poll_posix.c"> - </ClCompile> - <ClCompile Include="$(SolutionDir)\..\src\core\iomgr\pollset_posix.c"> - </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\iomgr\pollset_set_posix.c"> </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\iomgr\pollset_set_windows.c"> diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters index 1805a21f85cfb94a7d81a085bcf2bdc51d5e56c8..69e22de0515884ddedc1bfe87dc05df96d8af88f 100644 --- a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters +++ b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters @@ -205,15 +205,6 @@ <ClCompile Include="$(SolutionDir)\..\src\core\iomgr\iomgr_windows.c"> <Filter>src\core\iomgr</Filter> </ClCompile> - <ClCompile Include="$(SolutionDir)\..\src\core\iomgr\pollset_multipoller_with_epoll.c"> - <Filter>src\core\iomgr</Filter> - </ClCompile> - <ClCompile Include="$(SolutionDir)\..\src\core\iomgr\pollset_multipoller_with_poll_posix.c"> - <Filter>src\core\iomgr</Filter> - </ClCompile> - <ClCompile Include="$(SolutionDir)\..\src\core\iomgr\pollset_posix.c"> - <Filter>src\core\iomgr</Filter> - </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\iomgr\pollset_set_posix.c"> <Filter>src\core\iomgr</Filter> </ClCompile> @@ -692,9 +683,6 @@ <ClInclude Include="$(SolutionDir)\..\src\core\iomgr\pollset.h"> <Filter>src\core\iomgr</Filter> </ClInclude> - <ClInclude Include="$(SolutionDir)\..\src\core\iomgr\pollset_posix.h"> - <Filter>src\core\iomgr</Filter> - </ClInclude> <ClInclude Include="$(SolutionDir)\..\src\core\iomgr\pollset_set.h"> <Filter>src\core\iomgr</Filter> </ClInclude> diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj index f754facc8d6960a5436ef848d514e4b9c655b970..6d1aa094a1f4ff1b5545646f0625382b3f63ee8a 100644 --- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj +++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj @@ -318,7 +318,6 @@ <ClInclude Include="$(SolutionDir)\..\src\core\iomgr\iomgr_internal.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\iomgr\iomgr_posix.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\iomgr\pollset.h" /> - <ClInclude Include="$(SolutionDir)\..\src\core\iomgr\pollset_posix.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\iomgr\pollset_set.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\iomgr\pollset_set_posix.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\iomgr\pollset_set_windows.h" /> @@ -487,12 +486,6 @@ </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\iomgr\iomgr_windows.c"> </ClCompile> - <ClCompile Include="$(SolutionDir)\..\src\core\iomgr\pollset_multipoller_with_epoll.c"> - </ClCompile> - <ClCompile Include="$(SolutionDir)\..\src\core\iomgr\pollset_multipoller_with_poll_posix.c"> - </ClCompile> - <ClCompile Include="$(SolutionDir)\..\src\core\iomgr\pollset_posix.c"> - </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\iomgr\pollset_set_posix.c"> </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\iomgr\pollset_set_windows.c"> diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters index 3a5c3fb6fb348d0bb40fc17cde57ef887b6c2f5e..c672405e0f9d39aa6131af524b65c3fa55bb2131 100644 --- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters +++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters @@ -145,15 +145,6 @@ <ClCompile Include="$(SolutionDir)\..\src\core\iomgr\iomgr_windows.c"> <Filter>src\core\iomgr</Filter> </ClCompile> - <ClCompile Include="$(SolutionDir)\..\src\core\iomgr\pollset_multipoller_with_epoll.c"> - <Filter>src\core\iomgr</Filter> - </ClCompile> - <ClCompile Include="$(SolutionDir)\..\src\core\iomgr\pollset_multipoller_with_poll_posix.c"> - <Filter>src\core\iomgr</Filter> - </ClCompile> - <ClCompile Include="$(SolutionDir)\..\src\core\iomgr\pollset_posix.c"> - <Filter>src\core\iomgr</Filter> - </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\iomgr\pollset_set_posix.c"> <Filter>src\core\iomgr</Filter> </ClCompile> @@ -587,9 +578,6 @@ <ClInclude Include="$(SolutionDir)\..\src\core\iomgr\pollset.h"> <Filter>src\core\iomgr</Filter> </ClInclude> - <ClInclude Include="$(SolutionDir)\..\src\core\iomgr\pollset_posix.h"> - <Filter>src\core\iomgr</Filter> - </ClInclude> <ClInclude Include="$(SolutionDir)\..\src\core\iomgr\pollset_set.h"> <Filter>src\core\iomgr</Filter> </ClInclude>