diff --git a/BUILD b/BUILD index 1898b4231b40ccf5ffdbf6e30cd2412bcdf7dafc..e0fc55e5f7e4ed27ee966d63ac3ca3e78d8832ed 100644 --- a/BUILD +++ b/BUILD @@ -194,6 +194,7 @@ cc_library( "src/core/iomgr/endpoint.h", "src/core/iomgr/endpoint_pair.h", "src/core/iomgr/ev_poll_and_epoll_posix.h", + "src/core/iomgr/ev_poll_posix.h", "src/core/iomgr/ev_posix.h", "src/core/iomgr/exec_ctx.h", "src/core/iomgr/executor.h", @@ -331,6 +332,7 @@ cc_library( "src/core/iomgr/endpoint_pair_posix.c", "src/core/iomgr/endpoint_pair_windows.c", "src/core/iomgr/ev_poll_and_epoll_posix.c", + "src/core/iomgr/ev_poll_posix.c", "src/core/iomgr/ev_posix.c", "src/core/iomgr/exec_ctx.c", "src/core/iomgr/executor.c", @@ -520,6 +522,7 @@ cc_library( "src/core/iomgr/endpoint.h", "src/core/iomgr/endpoint_pair.h", "src/core/iomgr/ev_poll_and_epoll_posix.h", + "src/core/iomgr/ev_poll_posix.h", "src/core/iomgr/ev_posix.h", "src/core/iomgr/exec_ctx.h", "src/core/iomgr/executor.h", @@ -644,6 +647,7 @@ cc_library( "src/core/iomgr/endpoint_pair_posix.c", "src/core/iomgr/endpoint_pair_windows.c", "src/core/iomgr/ev_poll_and_epoll_posix.c", + "src/core/iomgr/ev_poll_posix.c", "src/core/iomgr/ev_posix.c", "src/core/iomgr/exec_ctx.c", "src/core/iomgr/executor.c", @@ -1304,6 +1308,7 @@ objc_library( "src/core/iomgr/endpoint_pair_posix.c", "src/core/iomgr/endpoint_pair_windows.c", "src/core/iomgr/ev_poll_and_epoll_posix.c", + "src/core/iomgr/ev_poll_posix.c", "src/core/iomgr/ev_posix.c", "src/core/iomgr/exec_ctx.c", "src/core/iomgr/executor.c", @@ -1474,6 +1479,7 @@ objc_library( "src/core/iomgr/endpoint.h", "src/core/iomgr/endpoint_pair.h", "src/core/iomgr/ev_poll_and_epoll_posix.h", + "src/core/iomgr/ev_poll_posix.h", "src/core/iomgr/ev_posix.h", "src/core/iomgr/exec_ctx.h", "src/core/iomgr/executor.h", diff --git a/Makefile b/Makefile index 2cbb34d85e2174330e8ff55487d90734009981db..11b2bfbb4db6c6dea4b759c14c3ef4563a419afc 100644 --- a/Makefile +++ b/Makefile @@ -2381,6 +2381,7 @@ LIBGRPC_SRC = \ src/core/iomgr/endpoint_pair_posix.c \ src/core/iomgr/endpoint_pair_windows.c \ src/core/iomgr/ev_poll_and_epoll_posix.c \ + src/core/iomgr/ev_poll_posix.c \ src/core/iomgr/ev_posix.c \ src/core/iomgr/exec_ctx.c \ src/core/iomgr/executor.c \ @@ -2690,6 +2691,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/iomgr/endpoint_pair_posix.c \ src/core/iomgr/endpoint_pair_windows.c \ src/core/iomgr/ev_poll_and_epoll_posix.c \ + src/core/iomgr/ev_poll_posix.c \ src/core/iomgr/ev_posix.c \ src/core/iomgr/exec_ctx.c \ src/core/iomgr/executor.c \ diff --git a/binding.gyp b/binding.gyp index ee2b361f1422a8420a9b28f4fcaa617faa7780e3..18df01225751d4890f890870ac0b63b59d37a2ed 100644 --- a/binding.gyp +++ b/binding.gyp @@ -598,6 +598,7 @@ 'src/core/iomgr/endpoint_pair_posix.c', 'src/core/iomgr/endpoint_pair_windows.c', 'src/core/iomgr/ev_poll_and_epoll_posix.c', + 'src/core/iomgr/ev_poll_posix.c', 'src/core/iomgr/ev_posix.c', 'src/core/iomgr/exec_ctx.c', 'src/core/iomgr/executor.c', diff --git a/build.yaml b/build.yaml index e7afc03db40bbee98a34e407c599271849611ca2..d19bf67d523e2fa4f97d990c9cf530830ffb8f1f 100644 --- a/build.yaml +++ b/build.yaml @@ -284,6 +284,7 @@ filegroups: - src/core/iomgr/endpoint.h - src/core/iomgr/endpoint_pair.h - src/core/iomgr/ev_poll_and_epoll_posix.h + - src/core/iomgr/ev_poll_posix.h - src/core/iomgr/ev_posix.h - src/core/iomgr/exec_ctx.h - src/core/iomgr/executor.h @@ -401,6 +402,7 @@ filegroups: - src/core/iomgr/endpoint_pair_posix.c - src/core/iomgr/endpoint_pair_windows.c - src/core/iomgr/ev_poll_and_epoll_posix.c + - src/core/iomgr/ev_poll_posix.c - src/core/iomgr/ev_posix.c - src/core/iomgr/exec_ctx.c - src/core/iomgr/executor.c diff --git a/config.m4 b/config.m4 index ac2c2cb1abc9917c57cfe74c3d7cf5d232eaca46..25d3cb0e9fc2dbb62354ffa352275356123c5ebe 100644 --- a/config.m4 +++ b/config.m4 @@ -120,6 +120,7 @@ if test "$PHP_GRPC" != "no"; then src/core/iomgr/endpoint_pair_posix.c \ src/core/iomgr/endpoint_pair_windows.c \ src/core/iomgr/ev_poll_and_epoll_posix.c \ + src/core/iomgr/ev_poll_posix.c \ src/core/iomgr/ev_posix.c \ src/core/iomgr/exec_ctx.c \ src/core/iomgr/executor.c \ diff --git a/gRPC.podspec b/gRPC.podspec index 0f56518d6bc16bfa7fffbc0dfdcea59dc6ce41a1..e3db166881006bad27e176f5e8a9fad05102428f 100644 --- a/gRPC.podspec +++ b/gRPC.podspec @@ -198,6 +198,7 @@ Pod::Spec.new do |s| 'src/core/iomgr/endpoint.h', 'src/core/iomgr/endpoint_pair.h', 'src/core/iomgr/ev_poll_and_epoll_posix.h', + 'src/core/iomgr/ev_poll_posix.h', 'src/core/iomgr/ev_posix.h', 'src/core/iomgr/exec_ctx.h', 'src/core/iomgr/executor.h', @@ -348,6 +349,7 @@ Pod::Spec.new do |s| 'src/core/iomgr/endpoint_pair_posix.c', 'src/core/iomgr/endpoint_pair_windows.c', 'src/core/iomgr/ev_poll_and_epoll_posix.c', + 'src/core/iomgr/ev_poll_posix.c', 'src/core/iomgr/ev_posix.c', 'src/core/iomgr/exec_ctx.c', 'src/core/iomgr/executor.c', @@ -515,6 +517,7 @@ Pod::Spec.new do |s| 'src/core/iomgr/endpoint.h', 'src/core/iomgr/endpoint_pair.h', 'src/core/iomgr/ev_poll_and_epoll_posix.h', + 'src/core/iomgr/ev_poll_posix.h', 'src/core/iomgr/ev_posix.h', 'src/core/iomgr/exec_ctx.h', 'src/core/iomgr/executor.h', diff --git a/grpc.gemspec b/grpc.gemspec index de41947c0955840d2dd8747f7413756d9d1aee57..4f95d4cc9aea9588994615254e1f1ec3013d5681 100755 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -194,6 +194,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/iomgr/endpoint.h ) s.files += %w( src/core/iomgr/endpoint_pair.h ) s.files += %w( src/core/iomgr/ev_poll_and_epoll_posix.h ) + s.files += %w( src/core/iomgr/ev_poll_posix.h ) s.files += %w( src/core/iomgr/ev_posix.h ) s.files += %w( src/core/iomgr/exec_ctx.h ) s.files += %w( src/core/iomgr/executor.h ) @@ -331,6 +332,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/iomgr/endpoint_pair_posix.c ) s.files += %w( src/core/iomgr/endpoint_pair_windows.c ) s.files += %w( src/core/iomgr/ev_poll_and_epoll_posix.c ) + s.files += %w( src/core/iomgr/ev_poll_posix.c ) s.files += %w( src/core/iomgr/ev_posix.c ) s.files += %w( src/core/iomgr/exec_ctx.c ) s.files += %w( src/core/iomgr/executor.c ) diff --git a/package.json b/package.json index 1741d0cf20224ed31a7538f7a58c686f0bea505f..e6c18bf1e12fee6bb700b46629dbb6dd88409a70 100644 --- a/package.json +++ b/package.json @@ -138,6 +138,7 @@ "src/core/iomgr/endpoint.h", "src/core/iomgr/endpoint_pair.h", "src/core/iomgr/ev_poll_and_epoll_posix.h", + "src/core/iomgr/ev_poll_posix.h", "src/core/iomgr/ev_posix.h", "src/core/iomgr/exec_ctx.h", "src/core/iomgr/executor.h", @@ -275,6 +276,7 @@ "src/core/iomgr/endpoint_pair_posix.c", "src/core/iomgr/endpoint_pair_windows.c", "src/core/iomgr/ev_poll_and_epoll_posix.c", + "src/core/iomgr/ev_poll_posix.c", "src/core/iomgr/ev_posix.c", "src/core/iomgr/exec_ctx.c", "src/core/iomgr/executor.c", diff --git a/package.xml b/package.xml index 316cc34519fd53577fa7d0136e8a8e81f79fd6fe..28287c6fa7d48a205db48d170ac2cd500279dc19 100644 --- a/package.xml +++ b/package.xml @@ -198,6 +198,7 @@ <file baseinstalldir="/" name="src/core/iomgr/endpoint.h" role="src" /> <file baseinstalldir="/" name="src/core/iomgr/endpoint_pair.h" role="src" /> <file baseinstalldir="/" name="src/core/iomgr/ev_poll_and_epoll_posix.h" role="src" /> + <file baseinstalldir="/" name="src/core/iomgr/ev_poll_posix.h" role="src" /> <file baseinstalldir="/" name="src/core/iomgr/ev_posix.h" role="src" /> <file baseinstalldir="/" name="src/core/iomgr/exec_ctx.h" role="src" /> <file baseinstalldir="/" name="src/core/iomgr/executor.h" role="src" /> @@ -335,6 +336,7 @@ <file baseinstalldir="/" name="src/core/iomgr/endpoint_pair_posix.c" role="src" /> <file baseinstalldir="/" name="src/core/iomgr/endpoint_pair_windows.c" role="src" /> <file baseinstalldir="/" name="src/core/iomgr/ev_poll_and_epoll_posix.c" role="src" /> + <file baseinstalldir="/" name="src/core/iomgr/ev_poll_posix.c" role="src" /> <file baseinstalldir="/" name="src/core/iomgr/ev_posix.c" role="src" /> <file baseinstalldir="/" name="src/core/iomgr/exec_ctx.c" role="src" /> <file baseinstalldir="/" name="src/core/iomgr/executor.c" role="src" /> diff --git a/src/core/iomgr/ev_poll_and_epoll_posix.c b/src/core/iomgr/ev_poll_and_epoll_posix.c index 72a8f9f96972a6d040821537c25c4eee86d1f196..7de8c665e239adb0f89ee8444e08df2305a01154 100644 --- a/src/core/iomgr/ev_poll_and_epoll_posix.c +++ b/src/core/iomgr/ev_poll_and_epoll_posix.c @@ -271,11 +271,6 @@ static int pollset_has_workers(grpc_pollset *pollset); static void remove_fd_from_all_epoll_sets(int fd); -/* override to allow tests to hook poll() usage */ -typedef int (*grpc_poll_function_type)(struct pollfd *, nfds_t, int); -extern grpc_poll_function_type grpc_poll_function; -extern grpc_wakeup_fd grpc_global_wakeup_fd; - /******************************************************************************* * pollset_set definitions */ @@ -706,10 +701,6 @@ static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { GPR_TLS_DECL(g_current_thread_poller); GPR_TLS_DECL(g_current_thread_worker); -/** Default poll() function - a pointer so that it can be overridden by some - * tests */ -grpc_poll_function_type grpc_poll_function = poll; - /** The alarm system needs to be able to wakeup 'some poller' sometimes * (specifically when a new alarm needs to be triggered earlier than the next * alarm 'epoch'). diff --git a/src/core/iomgr/ev_poll_posix.c b/src/core/iomgr/ev_poll_posix.c index 3693e137299bbe6cc15ea4f2172f4d3a3222524f..e2b0b08ac34b37d97d7d461ff2dbe2c997c9e099 100644 --- a/src/core/iomgr/ev_poll_posix.c +++ b/src/core/iomgr/ev_poll_posix.c @@ -121,8 +121,6 @@ struct grpc_fd { grpc_closure *read_closure; grpc_closure *write_closure; - struct grpc_fd *freelist_next; - grpc_closure *on_done_closure; grpc_iomgr_object iomgr_object; @@ -152,13 +150,6 @@ static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *rec, /* Return 1 if this fd is orphaned, 0 otherwise */ static bool fd_is_orphaned(grpc_fd *fd); -/* Notification from the poller to an fd that it has become readable or - writable. - If allow_synchronous_callback is 1, allow running the fd callback inline - in this callstack, otherwise register an asynchronous callback and return */ -static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd); -static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd); - /* Reference counting for fds */ /*#define GRPC_FD_REF_COUNT_DEBUG*/ #ifdef GRPC_FD_REF_COUNT_DEBUG @@ -174,9 +165,6 @@ static void fd_unref(grpc_fd *fd); #define GRPC_FD_UNREF(fd, reason) fd_unref(fd) #endif -static void fd_global_init(void); -static void fd_global_shutdown(void); - #define CLOSURE_NOT_READY ((grpc_closure *)0) #define CLOSURE_READY ((grpc_closure *)1) @@ -184,8 +172,6 @@ static void fd_global_shutdown(void); * pollset declarations */ -typedef struct grpc_pollset_vtable grpc_pollset_vtable; - typedef struct grpc_cached_wakeup_fd { grpc_wakeup_fd fd; struct grpc_cached_wakeup_fd *next; @@ -200,11 +186,6 @@ struct grpc_pollset_worker { }; struct grpc_pollset { - /* pollsets under posix can mutate representation as fds are added and - removed. - For example, we may choose a poll() based implementation on linux for - few fds, and an epoll() based implementation for many fds */ - const grpc_pollset_vtable *vtable; gpr_mu *mu; grpc_pollset_worker root_worker; int in_flight_cbs; @@ -213,10 +194,14 @@ struct grpc_pollset { int kicked_without_pollers; grpc_closure *shutdown_done; grpc_closure_list idle_jobs; - union { - int fd; - void *ptr; - } data; + /* all polled fds */ + size_t fd_count; + size_t fd_capacity; + grpc_fd **fds; + /* fds that have been removed from the pollset explicitly */ + size_t del_count; + size_t del_capacity; + grpc_fd **dels; /* Local cache of eventfds for workers */ grpc_cached_wakeup_fd *local_wakeup_cache; }; @@ -258,24 +243,10 @@ static void pollset_kick_ext(grpc_pollset *p, grpc_pollset_worker *specific_worker, uint32_t flags); -/* turn a pollset into a multipoller: platform specific */ -typedef void (*platform_become_multipoller_type)(grpc_exec_ctx *exec_ctx, - grpc_pollset *pollset, - struct grpc_fd **fds, - size_t fd_count); -static platform_become_multipoller_type platform_become_multipoller; - /* Return 1 if the pollset has active threads in pollset_work (pollset must * be locked) */ static int pollset_has_workers(grpc_pollset *pollset); -static void remove_fd_from_all_epoll_sets(int fd); - -/* override to allow tests to hook poll() usage */ -typedef int (*grpc_poll_function_type)(struct pollfd *, nfds_t, int); -extern grpc_poll_function_type grpc_poll_function; -extern grpc_wakeup_fd grpc_global_wakeup_fd; - /******************************************************************************* * pollset_set definitions */ @@ -300,67 +271,6 @@ struct grpc_pollset_set { * fd_posix.c */ -/* We need to keep a freelist not because of any concerns of malloc performance - * but instead so that implementations with multiple threads in (for example) - * epoll_wait deal with the race between pollset removal and incoming poll - * notifications. - * - * The problem is that the poller ultimately holds a reference to this - * object, so it is very difficult to know when is safe to free it, at least - * without some expensive synchronization. - * - * If we keep the object freelisted, in the worst case losing this race just - * becomes a spurious read notification on a reused fd. - */ -/* TODO(klempner): We could use some form of polling generation count to know - * when these are safe to free. */ -/* TODO(klempner): Consider disabling freelisting if we don't have multiple - * threads in poll on the same fd */ -/* TODO(klempner): Batch these allocations to reduce fragmentation */ -static grpc_fd *fd_freelist = NULL; -static gpr_mu fd_freelist_mu; - -static void freelist_fd(grpc_fd *fd) { - gpr_mu_lock(&fd_freelist_mu); - fd->freelist_next = fd_freelist; - fd_freelist = fd; - grpc_iomgr_unregister_object(&fd->iomgr_object); - gpr_mu_unlock(&fd_freelist_mu); -} - -static grpc_fd *alloc_fd(int fd) { - grpc_fd *r = NULL; - gpr_mu_lock(&fd_freelist_mu); - if (fd_freelist != NULL) { - r = fd_freelist; - fd_freelist = fd_freelist->freelist_next; - } - gpr_mu_unlock(&fd_freelist_mu); - if (r == NULL) { - r = gpr_malloc(sizeof(grpc_fd)); - gpr_mu_init(&r->mu); - } - - gpr_atm_rel_store(&r->refst, 1); - r->shutdown = 0; - r->read_closure = CLOSURE_NOT_READY; - r->write_closure = CLOSURE_NOT_READY; - r->fd = fd; - r->inactive_watcher_root.next = r->inactive_watcher_root.prev = - &r->inactive_watcher_root; - r->freelist_next = NULL; - r->read_watcher = r->write_watcher = NULL; - r->on_done_closure = NULL; - r->closed = 0; - r->released = 0; - return r; -} - -static void destroy(grpc_fd *fd) { - gpr_mu_destroy(&fd->mu); - gpr_free(fd); -} - #ifdef GRPC_FD_REF_COUNT_DEBUG #define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__) #define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__) @@ -390,27 +300,28 @@ static void unref_by(grpc_fd *fd, int n) { #endif old = gpr_atm_full_fetch_add(&fd->refst, -n); if (old == n) { - freelist_fd(fd); + gpr_mu_destroy(&fd->mu); + gpr_free(fd); } else { GPR_ASSERT(old > n); } } -static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); } - -static void fd_global_shutdown(void) { - gpr_mu_lock(&fd_freelist_mu); - gpr_mu_unlock(&fd_freelist_mu); - while (fd_freelist != NULL) { - grpc_fd *fd = fd_freelist; - fd_freelist = fd_freelist->freelist_next; - destroy(fd); - } - gpr_mu_destroy(&fd_freelist_mu); -} - static grpc_fd *fd_create(int fd, const char *name) { - grpc_fd *r = alloc_fd(fd); + grpc_fd *r = gpr_malloc(sizeof(*r)); + gpr_mu_init(&r->mu); + gpr_atm_rel_store(&r->refst, 1); + r->shutdown = 0; + r->read_closure = CLOSURE_NOT_READY; + r->write_closure = CLOSURE_NOT_READY; + r->fd = fd; + r->inactive_watcher_root.next = r->inactive_watcher_root.prev = + &r->inactive_watcher_root; + r->read_watcher = r->write_watcher = NULL; + r->on_done_closure = NULL; + r->closed = 0; + r->released = 0; + char *name2; gpr_asprintf(&name2, "%s fd=%d", name, fd); grpc_iomgr_register_object(&r->iomgr_object, name2); @@ -466,8 +377,6 @@ static void close_fd_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { fd->closed = 1; if (!fd->released) { close(fd->fd); - } else { - remove_fd_from_all_epoll_sets(fd->fd); } grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, true, NULL); } @@ -555,14 +464,6 @@ static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, } } -static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st) { - /* only one set_ready can be active at once (but there may be a racing - notify_on) */ - gpr_mu_lock(&fd->mu); - set_ready_locked(exec_ctx, fd, st); - gpr_mu_unlock(&fd->mu); -} - static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { gpr_mu_lock(&fd->mu); GPR_ASSERT(!fd->shutdown); @@ -691,14 +592,6 @@ static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher, GRPC_FD_UNREF(fd, "poll"); } -static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { - set_ready(exec_ctx, fd, &fd->read_closure); -} - -static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { - set_ready(exec_ctx, fd, &fd->write_closure); -} - /******************************************************************************* * pollset_posix.c */ @@ -706,16 +599,6 @@ static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { GPR_TLS_DECL(g_current_thread_poller); GPR_TLS_DECL(g_current_thread_worker); -/** Default poll() function - a pointer so that it can be overridden by some - * tests */ -grpc_poll_function_type grpc_poll_function = poll; - -/** The alarm system needs to be able to wakeup 'some poller' sometimes - * (specifically when a new alarm needs to be triggered earlier than the next - * alarm 'epoch'). - * This wakeup_fd gives us something to alert on when such a case occurs. */ -grpc_wakeup_fd grpc_global_wakeup_fd; - static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) { worker->prev->next = worker->next; worker->next->prev = worker->prev; @@ -835,8 +718,6 @@ static void kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); } /* main interface */ -static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null); - static void pollset_init(grpc_pollset *pollset, gpr_mu *mu) { pollset->mu = mu; pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker; @@ -847,20 +728,24 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu *mu) { pollset->idle_jobs.head = pollset->idle_jobs.tail = NULL; pollset->local_wakeup_cache = NULL; pollset->kicked_without_pollers = 0; - become_basic_pollset(pollset, NULL); + pollset->fd_count = 0; + pollset->del_count = 0; + pollset->fds = NULL; + pollset->dels = NULL; } static void pollset_destroy(grpc_pollset *pollset) { GPR_ASSERT(pollset->in_flight_cbs == 0); GPR_ASSERT(!pollset_has_workers(pollset)); GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail); - pollset->vtable->destroy(pollset); while (pollset->local_wakeup_cache) { grpc_cached_wakeup_fd *next = pollset->local_wakeup_cache->next; grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd); gpr_free(pollset->local_wakeup_cache); pollset->local_wakeup_cache = next; } + gpr_free(pollset->fds); + gpr_free(pollset->dels); } static void pollset_reset(grpc_pollset *pollset) { @@ -868,30 +753,44 @@ static void pollset_reset(grpc_pollset *pollset) { GPR_ASSERT(pollset->in_flight_cbs == 0); GPR_ASSERT(!pollset_has_workers(pollset)); GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail); - pollset->vtable->destroy(pollset); + GPR_ASSERT(pollset->fd_count == 0); + GPR_ASSERT(pollset->del_count == 0); pollset->shutting_down = 0; pollset->called_shutdown = 0; pollset->kicked_without_pollers = 0; - become_basic_pollset(pollset, NULL); } static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_fd *fd) { gpr_mu_lock(pollset->mu); - pollset->vtable->add_fd(exec_ctx, pollset, fd, 1); -/* the following (enabled only in debug) will reacquire and then release - our lock - meaning that if the unlocking flag passed to add_fd above is - not respected, the code will deadlock (in a way that we have a chance of - debugging) */ -#ifndef NDEBUG - gpr_mu_lock(pollset->mu); + size_t i; + /* TODO(ctiller): this is O(num_fds^2); maybe switch to a hash set here */ + for (i = 0; i < pollset->fd_count; i++) { + if (pollset->fds[i] == fd) goto exit; + } + if (pollset->fd_count == pollset->fd_capacity) { + pollset->fd_capacity = + GPR_MAX(pollset->fd_capacity + 8, pollset->fd_count * 3 / 2); + pollset->fds = + gpr_realloc(pollset->fds, sizeof(grpc_fd *) * pollset->fd_capacity); + } + pollset->fds[pollset->fd_count++] = fd; + GRPC_FD_REF(fd, "multipoller"); +exit: gpr_mu_unlock(pollset->mu); -#endif } static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { GPR_ASSERT(grpc_closure_list_empty(pollset->idle_jobs)); - pollset->vtable->finish_shutdown(pollset); + size_t i; + for (i = 0; i < pollset->fd_count; i++) { + GRPC_FD_UNREF(pollset->fds[i], "multipoller"); + } + for (i = 0; i < pollset->del_count; i++) { + GRPC_FD_UNREF(pollset->dels[i], "multipoller_del"); + } + pollset->fd_count = 0; + pollset->del_count = 0; grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, true, NULL); } @@ -952,8 +851,93 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } gpr_tls_set(&g_current_thread_poller, (intptr_t)pollset); GPR_TIMER_BEGIN("maybe_work_and_unlock", 0); - pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, &worker, - deadline, now); +#define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR) +#define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR) + + int timeout; + int r; + size_t i, j, fd_count; + nfds_t pfd_count; + /* TODO(ctiller): inline some elements to avoid an allocation */ + grpc_fd_watcher *watchers; + struct pollfd *pfds; + + timeout = poll_deadline_to_millis_timeout(deadline, now); + /* TODO(ctiller): perform just one malloc here if we exceed the inline + * case */ + pfds = gpr_malloc(sizeof(*pfds) * (pollset->fd_count + 2)); + watchers = gpr_malloc(sizeof(*watchers) * (pollset->fd_count + 2)); + fd_count = 0; + pfd_count = 2; + pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd); + pfds[0].events = POLLIN; + pfds[0].revents = 0; + pfds[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker.wakeup_fd->fd); + pfds[1].events = POLLIN; + pfds[1].revents = 0; + for (i = 0; i < pollset->fd_count; i++) { + int remove = fd_is_orphaned(pollset->fds[i]); + for (j = 0; !remove && j < pollset->del_count; j++) { + if (pollset->fds[i] == pollset->dels[j]) remove = 1; + } + if (remove) { + GRPC_FD_UNREF(pollset->fds[i], "multipoller"); + } else { + pollset->fds[fd_count++] = pollset->fds[i]; + watchers[pfd_count].fd = pollset->fds[i]; + pfds[pfd_count].fd = pollset->fds[i]->fd; + pfds[pfd_count].revents = 0; + pfd_count++; + } + } + for (j = 0; j < pollset->del_count; j++) { + GRPC_FD_UNREF(pollset->dels[j], "multipoller_del"); + } + pollset->del_count = 0; + pollset->fd_count = fd_count; + gpr_mu_unlock(pollset->mu); + + for (i = 2; i < pfd_count; i++) { + pfds[i].events = (short)fd_begin_poll(watchers[i].fd, pollset, &worker, + POLLIN, POLLOUT, &watchers[i]); + } + + /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid + even going into the blocking annotation if possible */ + GRPC_SCHEDULING_START_BLOCKING_REGION; + r = grpc_poll_function(pfds, pfd_count, timeout); + GRPC_SCHEDULING_END_BLOCKING_REGION; + + if (r < 0) { + if (errno != EINTR) { + gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno)); + } + for (i = 2; i < pfd_count; i++) { + fd_end_poll(exec_ctx, &watchers[i], 0, 0); + } + } else if (r == 0) { + for (i = 2; i < pfd_count; i++) { + fd_end_poll(exec_ctx, &watchers[i], 0, 0); + } + } else { + if (pfds[0].revents & POLLIN_CHECK) { + grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd); + } + if (pfds[1].revents & POLLIN_CHECK) { + grpc_wakeup_fd_consume_wakeup(&worker.wakeup_fd->fd); + } + for (i = 2; i < pfd_count; i++) { + if (watchers[i].fd == NULL) { + fd_end_poll(exec_ctx, &watchers[i], 0, 0); + continue; + } + fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN_CHECK, + pfds[i].revents & POLLOUT_CHECK); + } + } + + gpr_free(pfds); + gpr_free(watchers); GPR_TIMER_END("maybe_work_and_unlock", 0); locked = 0; gpr_tls_set(&g_current_thread_poller, 0); @@ -1050,408 +1034,6 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline, timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN))); } -/* - * basic_pollset - a vtable that provides polling for zero or one file - * descriptor via poll() - */ - -typedef struct grpc_unary_promote_args { - const grpc_pollset_vtable *original_vtable; - grpc_pollset *pollset; - grpc_fd *fd; - grpc_closure promotion_closure; -} grpc_unary_promote_args; - -static void basic_do_promote(grpc_exec_ctx *exec_ctx, void *args, - bool success) { - grpc_unary_promote_args *up_args = args; - const grpc_pollset_vtable *original_vtable = up_args->original_vtable; - grpc_pollset *pollset = up_args->pollset; - grpc_fd *fd = up_args->fd; - - /* - * This is quite tricky. There are a number of cases to keep in mind here: - * 1. fd may have been orphaned - * 2. The pollset may no longer be a unary poller (and we can't let case #1 - * leak to other pollset types!) - * 3. pollset's fd (which may have changed) may have been orphaned - * 4. The pollset may be shutting down. - */ - - gpr_mu_lock(pollset->mu); - /* First we need to ensure that nobody is polling concurrently */ - GPR_ASSERT(!pollset_has_workers(pollset)); - - gpr_free(up_args); - /* At this point the pollset may no longer be a unary poller. In that case - * we should just call the right add function and be done. */ - /* TODO(klempner): If we're not careful this could cause infinite recursion. - * That's not a problem for now because empty_pollset has a trivial poller - * and we don't have any mechanism to unbecome multipoller. */ - pollset->in_flight_cbs--; - if (pollset->shutting_down) { - /* We don't care about this pollset anymore. */ - if (pollset->in_flight_cbs == 0 && !pollset->called_shutdown) { - pollset->called_shutdown = 1; - finish_shutdown(exec_ctx, pollset); - } - } else if (fd_is_orphaned(fd)) { - /* Don't try to add it to anything, we'll drop our ref on it below */ - } else if (pollset->vtable != original_vtable) { - pollset->vtable->add_fd(exec_ctx, pollset, fd, 0); - } else if (fd != pollset->data.ptr) { - grpc_fd *fds[2]; - fds[0] = pollset->data.ptr; - fds[1] = fd; - - if (fds[0] && !fd_is_orphaned(fds[0])) { - platform_become_multipoller(exec_ctx, pollset, fds, GPR_ARRAY_SIZE(fds)); - GRPC_FD_UNREF(fds[0], "basicpoll"); - } else { - /* old fd is orphaned and we haven't cleaned it up until now, so remain a - * unary poller */ - /* Note that it is possible that fds[1] is also orphaned at this point. - * That's okay, we'll correct it at the next add or poll. */ - if (fds[0]) GRPC_FD_UNREF(fds[0], "basicpoll"); - pollset->data.ptr = fd; - GRPC_FD_REF(fd, "basicpoll"); - } - } - - gpr_mu_unlock(pollset->mu); - - /* Matching ref in basic_pollset_add_fd */ - GRPC_FD_UNREF(fd, "basicpoll_add"); -} - -static void basic_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_fd *fd, int and_unlock_pollset) { - grpc_unary_promote_args *up_args; - GPR_ASSERT(fd); - if (fd == pollset->data.ptr) goto exit; - - if (!pollset_has_workers(pollset)) { - /* Fast path -- no in flight cbs */ - /* TODO(klempner): Comment this out and fix any test failures or establish - * they are due to timing issues */ - grpc_fd *fds[2]; - fds[0] = pollset->data.ptr; - fds[1] = fd; - - if (fds[0] == NULL) { - pollset->data.ptr = fd; - GRPC_FD_REF(fd, "basicpoll"); - } else if (!fd_is_orphaned(fds[0])) { - platform_become_multipoller(exec_ctx, pollset, fds, GPR_ARRAY_SIZE(fds)); - GRPC_FD_UNREF(fds[0], "basicpoll"); - } else { - /* old fd is orphaned and we haven't cleaned it up until now, so remain a - * unary poller */ - GRPC_FD_UNREF(fds[0], "basicpoll"); - pollset->data.ptr = fd; - GRPC_FD_REF(fd, "basicpoll"); - } - goto exit; - } - - /* Now we need to promote. This needs to happen when we're not polling. Since - * this may be called from poll, the wait needs to happen asynchronously. */ - GRPC_FD_REF(fd, "basicpoll_add"); - pollset->in_flight_cbs++; - up_args = gpr_malloc(sizeof(*up_args)); - up_args->fd = fd; - up_args->original_vtable = pollset->vtable; - up_args->pollset = pollset; - up_args->promotion_closure.cb = basic_do_promote; - up_args->promotion_closure.cb_arg = up_args; - - grpc_closure_list_add(&pollset->idle_jobs, &up_args->promotion_closure, 1); - pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); - -exit: - if (and_unlock_pollset) { - gpr_mu_unlock(pollset->mu); - } -} - -static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, - grpc_pollset *pollset, - grpc_pollset_worker *worker, - gpr_timespec deadline, - gpr_timespec now) { -#define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR) -#define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR) - - struct pollfd pfd[3]; - grpc_fd *fd; - grpc_fd_watcher fd_watcher; - int timeout; - int r; - nfds_t nfds; - - fd = pollset->data.ptr; - if (fd && fd_is_orphaned(fd)) { - GRPC_FD_UNREF(fd, "basicpoll"); - fd = pollset->data.ptr = NULL; - } - timeout = poll_deadline_to_millis_timeout(deadline, now); - pfd[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd); - pfd[0].events = POLLIN; - pfd[0].revents = 0; - pfd[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd->fd); - pfd[1].events = POLLIN; - pfd[1].revents = 0; - nfds = 2; - if (fd) { - pfd[2].fd = fd->fd; - pfd[2].revents = 0; - GRPC_FD_REF(fd, "basicpoll_begin"); - gpr_mu_unlock(pollset->mu); - pfd[2].events = - (short)fd_begin_poll(fd, pollset, worker, POLLIN, POLLOUT, &fd_watcher); - if (pfd[2].events != 0) { - nfds++; - } - } else { - gpr_mu_unlock(pollset->mu); - } - - /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid - even going into the blocking annotation if possible */ - /* poll fd count (argument 2) is shortened by one if we have no events - to poll on - such that it only includes the kicker */ - GPR_TIMER_BEGIN("poll", 0); - GRPC_SCHEDULING_START_BLOCKING_REGION; - r = grpc_poll_function(pfd, nfds, timeout); - GRPC_SCHEDULING_END_BLOCKING_REGION; - GPR_TIMER_END("poll", 0); - - if (r < 0) { - if (errno != EINTR) { - gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno)); - } - if (fd) { - fd_end_poll(exec_ctx, &fd_watcher, 0, 0); - } - } else if (r == 0) { - if (fd) { - fd_end_poll(exec_ctx, &fd_watcher, 0, 0); - } - } else { - if (pfd[0].revents & POLLIN_CHECK) { - grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd); - } - if (pfd[1].revents & POLLIN_CHECK) { - grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd); - } - if (nfds > 2) { - fd_end_poll(exec_ctx, &fd_watcher, pfd[2].revents & POLLIN_CHECK, - pfd[2].revents & POLLOUT_CHECK); - } else if (fd) { - fd_end_poll(exec_ctx, &fd_watcher, 0, 0); - } - } - - if (fd) { - GRPC_FD_UNREF(fd, "basicpoll_begin"); - } -} - -static void basic_pollset_destroy(grpc_pollset *pollset) { - if (pollset->data.ptr != NULL) { - GRPC_FD_UNREF(pollset->data.ptr, "basicpoll"); - pollset->data.ptr = NULL; - } -} - -static const grpc_pollset_vtable basic_pollset = { - basic_pollset_add_fd, basic_pollset_maybe_work_and_unlock, - basic_pollset_destroy, basic_pollset_destroy}; - -static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null) { - pollset->vtable = &basic_pollset; - pollset->data.ptr = fd_or_null; - if (fd_or_null != NULL) { - GRPC_FD_REF(fd_or_null, "basicpoll"); - } -} - -/******************************************************************************* - * pollset_multipoller_with_poll_posix.c - */ - -typedef struct { - /* all polled fds */ - size_t fd_count; - size_t fd_capacity; - grpc_fd **fds; - /* fds that have been removed from the pollset explicitly */ - size_t del_count; - size_t del_capacity; - grpc_fd **dels; -} poll_hdr; - -static void multipoll_with_poll_pollset_add_fd(grpc_exec_ctx *exec_ctx, - grpc_pollset *pollset, - grpc_fd *fd, - int and_unlock_pollset) { - size_t i; - poll_hdr *h = pollset->data.ptr; - /* TODO(ctiller): this is O(num_fds^2); maybe switch to a hash set here */ - for (i = 0; i < h->fd_count; i++) { - if (h->fds[i] == fd) goto exit; - } - if (h->fd_count == h->fd_capacity) { - h->fd_capacity = GPR_MAX(h->fd_capacity + 8, h->fd_count * 3 / 2); - h->fds = gpr_realloc(h->fds, sizeof(grpc_fd *) * h->fd_capacity); - } - h->fds[h->fd_count++] = fd; - GRPC_FD_REF(fd, "multipoller"); -exit: - if (and_unlock_pollset) { - gpr_mu_unlock(pollset->mu); - } -} - -static void multipoll_with_poll_pollset_maybe_work_and_unlock( - grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, - gpr_timespec deadline, gpr_timespec now) { -#define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR) -#define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR) - - int timeout; - int r; - size_t i, j, fd_count; - nfds_t pfd_count; - poll_hdr *h; - /* TODO(ctiller): inline some elements to avoid an allocation */ - grpc_fd_watcher *watchers; - struct pollfd *pfds; - - h = pollset->data.ptr; - timeout = poll_deadline_to_millis_timeout(deadline, now); - /* TODO(ctiller): perform just one malloc here if we exceed the inline case */ - pfds = gpr_malloc(sizeof(*pfds) * (h->fd_count + 2)); - watchers = gpr_malloc(sizeof(*watchers) * (h->fd_count + 2)); - fd_count = 0; - pfd_count = 2; - pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd); - pfds[0].events = POLLIN; - pfds[0].revents = 0; - pfds[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd->fd); - pfds[1].events = POLLIN; - pfds[1].revents = 0; - for (i = 0; i < h->fd_count; i++) { - int remove = fd_is_orphaned(h->fds[i]); - for (j = 0; !remove && j < h->del_count; j++) { - if (h->fds[i] == h->dels[j]) remove = 1; - } - if (remove) { - GRPC_FD_UNREF(h->fds[i], "multipoller"); - } else { - h->fds[fd_count++] = h->fds[i]; - watchers[pfd_count].fd = h->fds[i]; - pfds[pfd_count].fd = h->fds[i]->fd; - pfds[pfd_count].revents = 0; - pfd_count++; - } - } - for (j = 0; j < h->del_count; j++) { - GRPC_FD_UNREF(h->dels[j], "multipoller_del"); - } - h->del_count = 0; - h->fd_count = fd_count; - gpr_mu_unlock(pollset->mu); - - for (i = 2; i < pfd_count; i++) { - pfds[i].events = (short)fd_begin_poll(watchers[i].fd, pollset, worker, - POLLIN, POLLOUT, &watchers[i]); - } - - /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid - even going into the blocking annotation if possible */ - GRPC_SCHEDULING_START_BLOCKING_REGION; - r = grpc_poll_function(pfds, pfd_count, timeout); - GRPC_SCHEDULING_END_BLOCKING_REGION; - - if (r < 0) { - if (errno != EINTR) { - gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno)); - } - for (i = 2; i < pfd_count; i++) { - fd_end_poll(exec_ctx, &watchers[i], 0, 0); - } - } else if (r == 0) { - for (i = 2; i < pfd_count; i++) { - fd_end_poll(exec_ctx, &watchers[i], 0, 0); - } - } else { - if (pfds[0].revents & POLLIN_CHECK) { - grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd); - } - if (pfds[1].revents & POLLIN_CHECK) { - grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd); - } - for (i = 2; i < pfd_count; i++) { - if (watchers[i].fd == NULL) { - fd_end_poll(exec_ctx, &watchers[i], 0, 0); - continue; - } - fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN_CHECK, - pfds[i].revents & POLLOUT_CHECK); - } - } - - gpr_free(pfds); - gpr_free(watchers); -} - -static void multipoll_with_poll_pollset_finish_shutdown(grpc_pollset *pollset) { - size_t i; - poll_hdr *h = pollset->data.ptr; - for (i = 0; i < h->fd_count; i++) { - GRPC_FD_UNREF(h->fds[i], "multipoller"); - } - for (i = 0; i < h->del_count; i++) { - GRPC_FD_UNREF(h->dels[i], "multipoller_del"); - } - h->fd_count = 0; - h->del_count = 0; -} - -static void multipoll_with_poll_pollset_destroy(grpc_pollset *pollset) { - poll_hdr *h = pollset->data.ptr; - multipoll_with_poll_pollset_finish_shutdown(pollset); - gpr_free(h->fds); - gpr_free(h->dels); - gpr_free(h); -} - -static const grpc_pollset_vtable multipoll_with_poll_pollset = { - multipoll_with_poll_pollset_add_fd, - multipoll_with_poll_pollset_maybe_work_and_unlock, - multipoll_with_poll_pollset_finish_shutdown, - multipoll_with_poll_pollset_destroy}; - -static void poll_become_multipoller(grpc_exec_ctx *exec_ctx, - grpc_pollset *pollset, grpc_fd **fds, - size_t nfds) { - size_t i; - poll_hdr *h = gpr_malloc(sizeof(poll_hdr)); - pollset->vtable = &multipoll_with_poll_pollset; - pollset->data.ptr = h; - h->fd_count = nfds; - h->fd_capacity = nfds; - h->fds = gpr_malloc(nfds * sizeof(grpc_fd *)); - h->del_count = 0; - h->del_capacity = 0; - h->dels = NULL; - for (i = 0; i < nfds; i++) { - h->fds[i] = fds[i]; - GRPC_FD_REF(fds[i], "multipoller"); - } -} - /******************************************************************************* * pollset_set_posix.c */ @@ -1599,10 +1181,7 @@ static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, * event engine binding */ -static void shutdown_engine(void) { - fd_global_shutdown(); - pollset_global_shutdown(); -} +static void shutdown_engine(void) { pollset_global_shutdown(); } static const grpc_event_engine_vtable vtable = { .pollset_size = sizeof(grpc_pollset), @@ -1637,7 +1216,6 @@ static const grpc_event_engine_vtable vtable = { }; const grpc_event_engine_vtable *grpc_init_poll_posix(void) { - fd_global_init(); pollset_global_init(); return &vtable; } diff --git a/src/core/iomgr/ev_posix.c b/src/core/iomgr/ev_posix.c index 127ab4b1812d18a6d4cafe1370223d43f2928667..d7681769c3daed0bdeea224ba391200da63eb60b 100644 --- a/src/core/iomgr/ev_posix.c +++ b/src/core/iomgr/ev_posix.c @@ -44,6 +44,16 @@ #include "src/core/iomgr/ev_poll_posix.h" #include "src/core/support/env.h" +/** Default poll() function - a pointer so that it can be overridden by some + * tests */ +grpc_poll_function_type grpc_poll_function = poll; + +/** The alarm system needs to be able to wakeup 'some poller' sometimes + * (specifically when a new alarm needs to be triggered earlier than the next + * alarm 'epoch'). + * This wakeup_fd gives us something to alert on when such a case occurs. */ +grpc_wakeup_fd grpc_global_wakeup_fd; + static const grpc_event_engine_vtable *g_event_engine; typedef const grpc_event_engine_vtable *(*event_engine_factory_fn)(void); diff --git a/src/core/iomgr/ev_posix.h b/src/core/iomgr/ev_posix.h index bfd216d3f31ebd947af7c569ab080ea90dea40ce..9d1f64652dc1f4908ced86c58429c77475ed3cc4 100644 --- a/src/core/iomgr/ev_posix.h +++ b/src/core/iomgr/ev_posix.h @@ -34,9 +34,12 @@ #ifndef GRPC_INTERNAL_CORE_IOMGR_EV_POSIX_H #define GRPC_INTERNAL_CORE_IOMGR_EV_POSIX_H +#include <poll.h> + #include "src/core/iomgr/exec_ctx.h" #include "src/core/iomgr/pollset.h" #include "src/core/iomgr/pollset_set.h" +#include "src/core/iomgr/wakeup_fd_posix.h" typedef struct grpc_fd grpc_fd; @@ -147,4 +150,9 @@ void grpc_pollset_set_add_fd(grpc_exec_ctx *exec_ctx, void grpc_pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pollset_set, grpc_fd *fd); +/* override to allow tests to hook poll() usage */ +typedef int (*grpc_poll_function_type)(struct pollfd *, nfds_t, int); +extern grpc_poll_function_type grpc_poll_function; +extern grpc_wakeup_fd grpc_global_wakeup_fd; + #endif // GRPC_INTERNAL_CORE_IOMGR_EV_POSIX_H diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 46b748ef366925d07fd08db0cf3ba9d29ba2ad7a..2629b82aff936a9a56e9b643149b6fe6b99b4f8a 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -114,6 +114,7 @@ CORE_SOURCE_FILES = [ 'src/core/iomgr/endpoint_pair_posix.c', 'src/core/iomgr/endpoint_pair_windows.c', 'src/core/iomgr/ev_poll_and_epoll_posix.c', + 'src/core/iomgr/ev_poll_posix.c', 'src/core/iomgr/ev_posix.c', 'src/core/iomgr/exec_ctx.c', 'src/core/iomgr/executor.c', diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 863f8113afa717f4eebea4c0cc44827f45e9f186..544106eefcf236c347bd01c4d464a8fdc5600b1f 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -812,6 +812,7 @@ src/core/iomgr/closure.h \ src/core/iomgr/endpoint.h \ src/core/iomgr/endpoint_pair.h \ src/core/iomgr/ev_poll_and_epoll_posix.h \ +src/core/iomgr/ev_poll_posix.h \ src/core/iomgr/ev_posix.h \ src/core/iomgr/exec_ctx.h \ src/core/iomgr/executor.h \ @@ -949,6 +950,7 @@ src/core/iomgr/endpoint.c \ src/core/iomgr/endpoint_pair_posix.c \ src/core/iomgr/endpoint_pair_windows.c \ src/core/iomgr/ev_poll_and_epoll_posix.c \ +src/core/iomgr/ev_poll_posix.c \ src/core/iomgr/ev_posix.c \ src/core/iomgr/exec_ctx.c \ src/core/iomgr/executor.c \ diff --git a/tools/run_tests/sources_and_headers.json b/tools/run_tests/sources_and_headers.json index e0c31db75e377e42e17a82d7d933d78784de7a1e..0d3d97a8294c8e1c9c362a7c631239b01dfb69bb 100644 --- a/tools/run_tests/sources_and_headers.json +++ b/tools/run_tests/sources_and_headers.json @@ -3773,6 +3773,7 @@ "src/core/iomgr/endpoint.h", "src/core/iomgr/endpoint_pair.h", "src/core/iomgr/ev_poll_and_epoll_posix.h", + "src/core/iomgr/ev_poll_posix.h", "src/core/iomgr/ev_posix.h", "src/core/iomgr/exec_ctx.h", "src/core/iomgr/executor.h", @@ -3973,6 +3974,8 @@ "src/core/iomgr/endpoint_pair_windows.c", "src/core/iomgr/ev_poll_and_epoll_posix.c", "src/core/iomgr/ev_poll_and_epoll_posix.h", + "src/core/iomgr/ev_poll_posix.c", + "src/core/iomgr/ev_poll_posix.h", "src/core/iomgr/ev_posix.c", "src/core/iomgr/ev_posix.h", "src/core/iomgr/exec_ctx.c", @@ -4325,6 +4328,7 @@ "src/core/iomgr/endpoint.h", "src/core/iomgr/endpoint_pair.h", "src/core/iomgr/ev_poll_and_epoll_posix.h", + "src/core/iomgr/ev_poll_posix.h", "src/core/iomgr/ev_posix.h", "src/core/iomgr/exec_ctx.h", "src/core/iomgr/executor.h", @@ -4509,6 +4513,8 @@ "src/core/iomgr/endpoint_pair_windows.c", "src/core/iomgr/ev_poll_and_epoll_posix.c", "src/core/iomgr/ev_poll_and_epoll_posix.h", + "src/core/iomgr/ev_poll_posix.c", + "src/core/iomgr/ev_poll_posix.h", "src/core/iomgr/ev_posix.c", "src/core/iomgr/ev_posix.h", "src/core/iomgr/exec_ctx.c", diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj b/vsprojects/vcxproj/grpc/grpc.vcxproj index 8e0a5e8b94c5e8986e385f6c3b83d682e04410dc..fe77cea9332b18b2fd491ba8931d92d5d25535f1 100644 --- a/vsprojects/vcxproj/grpc/grpc.vcxproj +++ b/vsprojects/vcxproj/grpc/grpc.vcxproj @@ -321,6 +321,7 @@ <ClInclude Include="$(SolutionDir)\..\src\core\iomgr\endpoint.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\iomgr\endpoint_pair.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\iomgr\ev_poll_and_epoll_posix.h" /> + <ClInclude Include="$(SolutionDir)\..\src\core\iomgr\ev_poll_posix.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\iomgr\ev_posix.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\iomgr\exec_ctx.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\iomgr\executor.h" /> @@ -501,6 +502,8 @@ </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\iomgr\ev_poll_and_epoll_posix.c"> </ClCompile> + <ClCompile Include="$(SolutionDir)\..\src\core\iomgr\ev_poll_posix.c"> + </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\iomgr\ev_posix.c"> </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\iomgr\exec_ctx.c"> diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters index 002379de624c0c65a998e8474578eb9c34130ea8..0ecb2c636a0964cd1aaedfe358bf7527c718f19b 100644 --- a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters +++ b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters @@ -124,6 +124,9 @@ <ClCompile Include="$(SolutionDir)\..\src\core\iomgr\ev_poll_and_epoll_posix.c"> <Filter>src\core\iomgr</Filter> </ClCompile> + <ClCompile Include="$(SolutionDir)\..\src\core\iomgr\ev_poll_posix.c"> + <Filter>src\core\iomgr</Filter> + </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\iomgr\ev_posix.c"> <Filter>src\core\iomgr</Filter> </ClCompile> @@ -632,6 +635,9 @@ <ClInclude Include="$(SolutionDir)\..\src\core\iomgr\ev_poll_and_epoll_posix.h"> <Filter>src\core\iomgr</Filter> </ClInclude> + <ClInclude Include="$(SolutionDir)\..\src\core\iomgr\ev_poll_posix.h"> + <Filter>src\core\iomgr</Filter> + </ClInclude> <ClInclude Include="$(SolutionDir)\..\src\core\iomgr\ev_posix.h"> <Filter>src\core\iomgr</Filter> </ClInclude> diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj index 770eae403b72e0975398ddf70301c2cf28a2e5bd..545aabb06131523165c65517a3b63123fe0c61cc 100644 --- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj +++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj @@ -311,6 +311,7 @@ <ClInclude Include="$(SolutionDir)\..\src\core\iomgr\endpoint.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\iomgr\endpoint_pair.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\iomgr\ev_poll_and_epoll_posix.h" /> + <ClInclude Include="$(SolutionDir)\..\src\core\iomgr\ev_poll_posix.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\iomgr\ev_posix.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\iomgr\exec_ctx.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\iomgr\executor.h" /> @@ -479,6 +480,8 @@ </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\iomgr\ev_poll_and_epoll_posix.c"> </ClCompile> + <ClCompile Include="$(SolutionDir)\..\src\core\iomgr\ev_poll_posix.c"> + </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\iomgr\ev_posix.c"> </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\iomgr\exec_ctx.c"> diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters index 09167ab0500e42de3d803eacf6a382c86f70f494..f6a69a307e301658930d0ea1b3a1ff7cafb767cd 100644 --- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters +++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters @@ -127,6 +127,9 @@ <ClCompile Include="$(SolutionDir)\..\src\core\iomgr\ev_poll_and_epoll_posix.c"> <Filter>src\core\iomgr</Filter> </ClCompile> + <ClCompile Include="$(SolutionDir)\..\src\core\iomgr\ev_poll_posix.c"> + <Filter>src\core\iomgr</Filter> + </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\iomgr\ev_posix.c"> <Filter>src\core\iomgr</Filter> </ClCompile> @@ -569,6 +572,9 @@ <ClInclude Include="$(SolutionDir)\..\src\core\iomgr\ev_poll_and_epoll_posix.h"> <Filter>src\core\iomgr</Filter> </ClInclude> + <ClInclude Include="$(SolutionDir)\..\src\core\iomgr\ev_poll_posix.h"> + <Filter>src\core\iomgr</Filter> + </ClInclude> <ClInclude Include="$(SolutionDir)\..\src\core\iomgr\ev_posix.h"> <Filter>src\core\iomgr</Filter> </ClInclude>