diff --git a/Makefile b/Makefile index 85f38cb0cfb0c84b7e3a6ed379ac0c95b9c96388..51c7786f0ffc8d59cd3f3f5692bfab377b2656bd 100644 --- a/Makefile +++ b/Makefile @@ -1353,8 +1353,7 @@ LIBGRPC_SRC = \ src/core/iomgr/fd_posix.c \ src/core/iomgr/iomgr.c \ src/core/iomgr/iomgr_posix.c \ - src/core/iomgr/pollset_kick_posix.c \ - src/core/iomgr/pollset_kick_eventfd.c \ + src/core/iomgr/pollset_kick.c \ src/core/iomgr/pollset_multipoller_with_poll_posix.c \ src/core/iomgr/pollset_posix.c \ src/core/iomgr/resolve_address_posix.c \ @@ -1366,6 +1365,10 @@ LIBGRPC_SRC = \ src/core/iomgr/tcp_posix.c \ src/core/iomgr/tcp_server_posix.c \ src/core/iomgr/time_averaged_stats.c \ + src/core/iomgr/wakeup_fd.c \ + src/core/iomgr/wakeup_fd_eventfd.c \ + src/core/iomgr/wakeup_fd_nospecial.c \ + src/core/iomgr/wakeup_fd_pipe.c \ src/core/statistics/census_init.c \ src/core/statistics/census_log.c \ src/core/statistics/census_rpc_stats.c \ @@ -1472,8 +1475,7 @@ src/core/iomgr/endpoint_pair_posix.c: $(OPENSSL_DEP) src/core/iomgr/fd_posix.c: $(OPENSSL_DEP) src/core/iomgr/iomgr.c: $(OPENSSL_DEP) src/core/iomgr/iomgr_posix.c: $(OPENSSL_DEP) -src/core/iomgr/pollset_kick_posix.c: $(OPENSSL_DEP) -src/core/iomgr/pollset_kick_eventfd.c: $(OPENSSL_DEP) +src/core/iomgr/pollset_kick.c: $(OPENSSL_DEP) src/core/iomgr/pollset_multipoller_with_poll_posix.c: $(OPENSSL_DEP) src/core/iomgr/pollset_posix.c: $(OPENSSL_DEP) src/core/iomgr/resolve_address_posix.c: $(OPENSSL_DEP) @@ -1485,6 +1487,10 @@ src/core/iomgr/tcp_client_posix.c: $(OPENSSL_DEP) src/core/iomgr/tcp_posix.c: $(OPENSSL_DEP) src/core/iomgr/tcp_server_posix.c: $(OPENSSL_DEP) src/core/iomgr/time_averaged_stats.c: $(OPENSSL_DEP) +src/core/iomgr/wakeup_fd.c: $(OPENSSL_DEP) +src/core/iomgr/wakeup_fd_eventfd.c: $(OPENSSL_DEP) +src/core/iomgr/wakeup_fd_nospecial.c: $(OPENSSL_DEP) +src/core/iomgr/wakeup_fd_pipe.c: $(OPENSSL_DEP) src/core/statistics/census_init.c: $(OPENSSL_DEP) src/core/statistics/census_log.c: $(OPENSSL_DEP) src/core/statistics/census_rpc_stats.c: $(OPENSSL_DEP) @@ -1608,8 +1614,7 @@ objs/$(CONFIG)/src/core/iomgr/endpoint_pair_posix.o: objs/$(CONFIG)/src/core/iomgr/fd_posix.o: objs/$(CONFIG)/src/core/iomgr/iomgr.o: objs/$(CONFIG)/src/core/iomgr/iomgr_posix.o: -objs/$(CONFIG)/src/core/iomgr/pollset_kick_posix.o: -objs/$(CONFIG)/src/core/iomgr/pollset_kick_eventfd.o: +objs/$(CONFIG)/src/core/iomgr/pollset_kick.o: objs/$(CONFIG)/src/core/iomgr/pollset_multipoller_with_poll_posix.o: objs/$(CONFIG)/src/core/iomgr/pollset_posix.o: objs/$(CONFIG)/src/core/iomgr/resolve_address_posix.o: @@ -1621,6 +1626,10 @@ objs/$(CONFIG)/src/core/iomgr/tcp_client_posix.o: objs/$(CONFIG)/src/core/iomgr/tcp_posix.o: objs/$(CONFIG)/src/core/iomgr/tcp_server_posix.o: objs/$(CONFIG)/src/core/iomgr/time_averaged_stats.o: +objs/$(CONFIG)/src/core/iomgr/wakeup_fd.o: +objs/$(CONFIG)/src/core/iomgr/wakeup_fd_eventfd.o: +objs/$(CONFIG)/src/core/iomgr/wakeup_fd_nospecial.o: +objs/$(CONFIG)/src/core/iomgr/wakeup_fd_pipe.o: objs/$(CONFIG)/src/core/statistics/census_init.o: objs/$(CONFIG)/src/core/statistics/census_log.o: objs/$(CONFIG)/src/core/statistics/census_rpc_stats.o: @@ -1764,8 +1773,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/iomgr/fd_posix.c \ src/core/iomgr/iomgr.c \ src/core/iomgr/iomgr_posix.c \ - src/core/iomgr/pollset_kick_posix.c \ - src/core/iomgr/pollset_kick_eventfd.c \ + src/core/iomgr/pollset_kick.c \ src/core/iomgr/pollset_multipoller_with_poll_posix.c \ src/core/iomgr/pollset_posix.c \ src/core/iomgr/resolve_address_posix.c \ @@ -1777,6 +1785,10 @@ LIBGRPC_UNSECURE_SRC = \ src/core/iomgr/tcp_posix.c \ src/core/iomgr/tcp_server_posix.c \ src/core/iomgr/time_averaged_stats.c \ + src/core/iomgr/wakeup_fd.c \ + src/core/iomgr/wakeup_fd_eventfd.c \ + src/core/iomgr/wakeup_fd_nospecial.c \ + src/core/iomgr/wakeup_fd_pipe.c \ src/core/statistics/census_init.c \ src/core/statistics/census_log.c \ src/core/statistics/census_rpc_stats.c \ @@ -1883,8 +1895,7 @@ objs/$(CONFIG)/src/core/iomgr/endpoint_pair_posix.o: objs/$(CONFIG)/src/core/iomgr/fd_posix.o: objs/$(CONFIG)/src/core/iomgr/iomgr.o: objs/$(CONFIG)/src/core/iomgr/iomgr_posix.o: -objs/$(CONFIG)/src/core/iomgr/pollset_kick_posix.o: -objs/$(CONFIG)/src/core/iomgr/pollset_kick_eventfd.o: +objs/$(CONFIG)/src/core/iomgr/pollset_kick.o: objs/$(CONFIG)/src/core/iomgr/pollset_multipoller_with_poll_posix.o: objs/$(CONFIG)/src/core/iomgr/pollset_posix.o: objs/$(CONFIG)/src/core/iomgr/resolve_address_posix.o: @@ -1896,6 +1907,10 @@ objs/$(CONFIG)/src/core/iomgr/tcp_client_posix.o: objs/$(CONFIG)/src/core/iomgr/tcp_posix.o: objs/$(CONFIG)/src/core/iomgr/tcp_server_posix.o: objs/$(CONFIG)/src/core/iomgr/time_averaged_stats.o: +objs/$(CONFIG)/src/core/iomgr/wakeup_fd.o: +objs/$(CONFIG)/src/core/iomgr/wakeup_fd_eventfd.o: +objs/$(CONFIG)/src/core/iomgr/wakeup_fd_nospecial.o: +objs/$(CONFIG)/src/core/iomgr/wakeup_fd_pipe.o: objs/$(CONFIG)/src/core/statistics/census_init.o: objs/$(CONFIG)/src/core/statistics/census_log.o: objs/$(CONFIG)/src/core/statistics/census_rpc_stats.o: diff --git a/build.json b/build.json index f6d7f5189306f8b154ea6fb43666a1ec26de9418..17187ecfc2d8ed6fc2bfe6e9cd204e32b5da6f93 100644 --- a/build.json +++ b/build.json @@ -47,8 +47,6 @@ "src/core/iomgr/iomgr_posix.h", "src/core/iomgr/pollset.h", "src/core/iomgr/pollset_kick.h", - "src/core/iomgr/pollset_kick_posix.h", - "src/core/iomgr/pollset_kick_eventfd.h", "src/core/iomgr/pollset_posix.h", "src/core/iomgr/resolve_address.h", "src/core/iomgr/sockaddr.h", @@ -60,6 +58,7 @@ "src/core/iomgr/tcp_posix.h", "src/core/iomgr/tcp_server.h", "src/core/iomgr/time_averaged_stats.h", + "src/core/iomgr/wakeup_fd.h", "src/core/statistics/census_interface.h", "src/core/statistics/census_log.h", "src/core/statistics/census_rpc_stats.h", @@ -124,8 +123,7 @@ "src/core/iomgr/fd_posix.c", "src/core/iomgr/iomgr.c", "src/core/iomgr/iomgr_posix.c", - "src/core/iomgr/pollset_kick_posix.c", - "src/core/iomgr/pollset_kick_eventfd.c", + "src/core/iomgr/pollset_kick.c", "src/core/iomgr/pollset_multipoller_with_poll_posix.c", "src/core/iomgr/pollset_posix.c", "src/core/iomgr/resolve_address_posix.c", @@ -137,6 +135,10 @@ "src/core/iomgr/tcp_posix.c", "src/core/iomgr/tcp_server_posix.c", "src/core/iomgr/time_averaged_stats.c", + "src/core/iomgr/wakeup_fd.c", + "src/core/iomgr/wakeup_fd_eventfd.c", + "src/core/iomgr/wakeup_fd_nospecial.c", + "src/core/iomgr/wakeup_fd_pipe.c", "src/core/statistics/census_init.c", "src/core/statistics/census_log.c", "src/core/statistics/census_rpc_stats.c", diff --git a/include/grpc/support/port_platform.h b/include/grpc/support/port_platform.h index 488f1aeadbbbe4029adc299fcfd40a47e9c36b84..d6723289ad08a2da17589f714dacfe4078458ab7 100644 --- a/include/grpc/support/port_platform.h +++ b/include/grpc/support/port_platform.h @@ -68,6 +68,7 @@ #define GPR_GCC_ATOMIC 1 #define GPR_LINUX 1 #define GPR_POSIX_MULTIPOLL_WITH_POLL 1 +#define GPR_POSIX_HAS_SPECIAL_WAKEUP_FD 1 #define GPR_LINUX_EVENTFD 1 #define GPR_POSIX_SOCKET 1 #define GPR_POSIX_SOCKETADDR 1 diff --git a/src/core/iomgr/pollset_kick_posix.c b/src/core/iomgr/pollset_kick.c similarity index 60% rename from src/core/iomgr/pollset_kick_posix.c rename to src/core/iomgr/pollset_kick.c index 4e3ee25dc7e9f12f277e4b388538cd5dfc54312f..f863207cf1ab146f84a5c2ae634750201733608e 100644 --- a/src/core/iomgr/pollset_kick_posix.c +++ b/src/core/iomgr/pollset_kick.c @@ -31,28 +31,28 @@ * */ -#include "src/core/iomgr/pollset_kick_posix.h" +#include "src/core/iomgr/pollset_kick.h" #include <errno.h> #include <string.h> #include <unistd.h> -#include "src/core/iomgr/pollset_kick_eventfd.h" #include "src/core/iomgr/socket_utils_posix.h" +#include "src/core/iomgr/wakeup_fd.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> -/* This implementation is based on a freelist of pipes. */ +/* This implementation is based on a freelist of wakeup fds, with extra logic to + * handle kicks while there is no attached fd. */ -#define GRPC_MAX_CACHED_PIPES 50 -#define GRPC_PIPE_LOW_WATERMARK 25 +#define GRPC_MAX_CACHED_WFDS 50 +#define GRPC_WFD_LOW_WATERMARK 25 static grpc_kick_fd_info *fd_freelist = NULL; static int fd_freelist_count = 0; static gpr_mu fd_freelist_mu; -static const grpc_pollset_kick_vtable *kick_vtable = NULL; -static grpc_kick_fd_info *allocate_pipe(void) { +static grpc_kick_fd_info *allocate_wfd(void) { grpc_kick_fd_info *info; gpr_mu_lock(&fd_freelist_mu); if (fd_freelist != NULL) { @@ -61,30 +61,30 @@ static grpc_kick_fd_info *allocate_pipe(void) { --fd_freelist_count; } else { info = gpr_malloc(sizeof(*info)); - kick_vtable->create(info); + grpc_wakeup_fd_create(&info->wakeup_fd); info->next = NULL; } gpr_mu_unlock(&fd_freelist_mu); return info; } -static void destroy_pipe(void) { +static void destroy_wfd(void) { /* assumes fd_freelist_mu is held */ grpc_kick_fd_info *current = fd_freelist; fd_freelist = fd_freelist->next; fd_freelist_count--; - kick_vtable->destroy(current); + grpc_wakeup_fd_destroy(¤t->wakeup_fd); gpr_free(current); } -static void free_pipe(grpc_kick_fd_info *fd_info) { +static void free_wfd(grpc_kick_fd_info *fd_info) { gpr_mu_lock(&fd_freelist_mu); fd_info->next = fd_freelist; fd_freelist = fd_info; fd_freelist_count++; - if (fd_freelist_count > GRPC_MAX_CACHED_PIPES) { - while (fd_freelist_count > GRPC_PIPE_LOW_WATERMARK) { - destroy_pipe(); + if (fd_freelist_count > GRPC_MAX_CACHED_WFDS) { + while (fd_freelist_count > GRPC_WFD_LOW_WATERMARK) { + destroy_wfd(); } } gpr_mu_unlock(&fd_freelist_mu); @@ -108,18 +108,18 @@ int grpc_pollset_kick_pre_poll(grpc_pollset_kick_state *kick_state) { gpr_mu_unlock(&kick_state->mu); return -1; } - kick_state->fd_info = allocate_pipe(); + kick_state->fd_info = allocate_wfd(); gpr_mu_unlock(&kick_state->mu); - return kick_state->fd_info->read_fd; + return GRPC_WAKEUP_FD_FD(&kick_state->fd_info->wakeup_fd); } void grpc_pollset_kick_consume(grpc_pollset_kick_state *kick_state) { - kick_vtable->consume(kick_state->fd_info); + grpc_wakeup_fd_consume_wakeup(&kick_state->fd_info->wakeup_fd); } void grpc_pollset_kick_post_poll(grpc_pollset_kick_state *kick_state) { gpr_mu_lock(&kick_state->mu); - free_pipe(kick_state->fd_info); + free_wfd(kick_state->fd_info); kick_state->fd_info = NULL; gpr_mu_unlock(&kick_state->mu); } @@ -127,81 +127,23 @@ void grpc_pollset_kick_post_poll(grpc_pollset_kick_state *kick_state) { void grpc_pollset_kick_kick(grpc_pollset_kick_state *kick_state) { gpr_mu_lock(&kick_state->mu); if (kick_state->fd_info != NULL) { - kick_vtable->kick(kick_state->fd_info); + grpc_wakeup_fd_wakeup(&kick_state->fd_info->wakeup_fd); } else { kick_state->kicked = 1; } gpr_mu_unlock(&kick_state->mu); } -static void pipe_create(grpc_kick_fd_info *fd_info) { - int pipefd[2]; - /* TODO(klempner): Make this nonfatal */ - GPR_ASSERT(0 == pipe(pipefd)); - GPR_ASSERT(grpc_set_socket_nonblocking(pipefd[0], 1)); - GPR_ASSERT(grpc_set_socket_nonblocking(pipefd[1], 1)); - fd_info->read_fd = pipefd[0]; - fd_info->write_fd = pipefd[1]; -} - -static void pipe_consume(grpc_kick_fd_info *fd_info) { - char buf[128]; - int r; - - for (;;) { - r = read(fd_info->read_fd, buf, sizeof(buf)); - if (r > 0) continue; - if (r == 0) return; - switch (errno) { - case EAGAIN: - return; - case EINTR: - continue; - default: - gpr_log(GPR_ERROR, "error reading pipe: %s", strerror(errno)); - return; - } - } -} - -static void pipe_kick(grpc_kick_fd_info *fd_info) { - char c = 0; - while (write(fd_info->write_fd, &c, 1) != 1 && errno == EINTR) - ; -} - -static void pipe_destroy(grpc_kick_fd_info *fd_info) { - close(fd_info->read_fd); - close(fd_info->write_fd); -} - -static const grpc_pollset_kick_vtable pipe_kick_vtable = { - pipe_create, pipe_consume, pipe_kick, pipe_destroy -}; - -static void global_init_common(void) { - fd_freelist = NULL; - gpr_mu_init(&fd_freelist_mu); -} - -void grpc_pollset_kick_global_init_posix(void) { - global_init_common(); - kick_vtable = &pipe_kick_vtable; +void grpc_pollset_kick_global_init_fallback_fd(void) { + grpc_wakeup_fd_global_init_force_fallback(); } void grpc_pollset_kick_global_init(void) { - global_init_common(); - kick_vtable = grpc_pollset_kick_eventfd_init(); - if (kick_vtable == NULL) { - kick_vtable = &pipe_kick_vtable; - } + grpc_wakeup_fd_global_init(); } void grpc_pollset_kick_global_destroy(void) { - while (fd_freelist != NULL) { - destroy_pipe(); - } - gpr_mu_destroy(&fd_freelist_mu); + grpc_wakeup_fd_global_destroy(); } diff --git a/src/core/iomgr/pollset_kick.h b/src/core/iomgr/pollset_kick.h index 4459a31b4ff3b3c29a38851d20cb2a64570d3047..5e909312611fc7e2c02dffda7819730b01fd068d 100644 --- a/src/core/iomgr/pollset_kick.h +++ b/src/core/iomgr/pollset_kick.h @@ -34,27 +34,33 @@ #ifndef __GRPC_INTERNAL_IOMGR_POLLSET_KICK_H_ #define __GRPC_INTERNAL_IOMGR_POLLSET_KICK_H_ -#include <grpc/support/port_platform.h> +#include "src/core/iomgr/wakeup_fd.h" +#include <grpc/support/sync.h> /* This is an abstraction around the typical pipe mechanism for waking up a thread sitting in a poll() style call. */ -#ifdef GPR_POSIX_SOCKET -#include "src/core/iomgr/pollset_kick_posix.h" -#else -#error "No pollset kick support on platform" -#endif +typedef struct grpc_kick_fd_info { + grpc_wakeup_fd_info wakeup_fd; + struct grpc_kick_fd_info *next; +} grpc_kick_fd_info; + +typedef struct grpc_pollset_kick_state { + gpr_mu mu; + int kicked; + struct grpc_kick_fd_info *fd_info; +} grpc_pollset_kick_state; void grpc_pollset_kick_global_init(void); void grpc_pollset_kick_global_destroy(void); -/* Guarantees a pure posix implementation rather than a specialized one, if - * applicable. Intended for testing. */ -void grpc_pollset_kick_global_init_posix(void); - void grpc_pollset_kick_init(grpc_pollset_kick_state *kick_state); void grpc_pollset_kick_destroy(grpc_pollset_kick_state *kick_state); +/* Guarantees a pure posix implementation rather than a specialized one, if + * applicable. Intended for testing. */ +void grpc_pollset_kick_global_init_fallback_fd(void); + /* Must be called before entering poll(). If return value is -1, this consumed an existing kick. Otherwise the return value is an FD to add to the poll set. */ diff --git a/src/core/iomgr/wakeup_fd.c b/src/core/iomgr/wakeup_fd.c new file mode 100644 index 0000000000000000000000000000000000000000..b81707f2a020b5cec57f618c62863e03aa0904c0 --- /dev/null +++ b/src/core/iomgr/wakeup_fd.c @@ -0,0 +1,70 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/iomgr/wakeup_fd.h" +#include "src/core/iomgr/wakeup_fd_pipe.h" +#include <stddef.h> + +static const grpc_wakeup_fd_vtable *wakeup_fd_vtable = NULL; + +void grpc_wakeup_fd_global_init(void) { + if (specialized_wakeup_fd_vtable.check_availability()) { + wakeup_fd_vtable = &specialized_wakeup_fd_vtable; + } else { + wakeup_fd_vtable = &pipe_wakeup_fd_vtable; + } +} + +void grpc_wakeup_fd_global_init_force_fallback(void) { + wakeup_fd_vtable = &pipe_wakeup_fd_vtable; +} + +void grpc_wakeup_fd_global_destroy(void) { + wakeup_fd_vtable = NULL; +} + +void grpc_wakeup_fd_create(grpc_wakeup_fd_info *fd_info) { + wakeup_fd_vtable->create(fd_info); +} + +void grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd_info *fd_info) { + wakeup_fd_vtable->consume(fd_info); +} + +void grpc_wakeup_fd_wakeup(grpc_wakeup_fd_info *fd_info) { + wakeup_fd_vtable->wakeup(fd_info); +} + +void grpc_wakeup_fd_destroy(grpc_wakeup_fd_info *fd_info) { + wakeup_fd_vtable->destroy(fd_info); +} diff --git a/src/core/iomgr/wakeup_fd.h b/src/core/iomgr/wakeup_fd.h new file mode 100644 index 0000000000000000000000000000000000000000..225291ca601e72083444bf7e459ad2dbcc44ef52 --- /dev/null +++ b/src/core/iomgr/wakeup_fd.h @@ -0,0 +1,102 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +/* + * wakeup_fd abstracts the concept of a file descriptor for the purpose of + * waking up a thread in select()/poll()/epoll_wait()/etc. + + * The poll() family of system calls provide a way for a thread to block until + * there is activity on one (or more) of a set of file descriptors. An + * application may wish to wake up this thread to do non file related work. The + * typical way to do this is to add a pipe to the set of file descriptors, then + * write to the pipe to wake up the thread in poll(). + * + * Linux has a lighter weight eventfd specifically designed for this purpose. + * wakeup_fd abstracts the difference between the two. + * + * Setup: + * 1. Before calling anything, call global_init() at least once. + * 1. Call grpc_wakeup_fd_create() to get a wakeup_fd. + * 2. Add the result of GRPC_WAKEUP_FD_FD to the set of monitored file + * descriptors for the poll() style API you are using. Monitor the file + * descriptor for readability. + * 3. To tear down, call grpc_wakeup_fd_destroy(). This closes the underlying + * file descriptor. + * + * Usage: + * 1. To wake up a polling thread, call grpc_wakeup_fd_wakeup() on a wakeup_fd + * it is monitoring. + * 2. If the polling thread was awakened by a wakeup_fd event, call + * grpc_wakeup_fd_consume_wakeup() on it. + */ +#ifndef __GRPC_INTERNAL_IOMGR_WAKEUP_FD_H_ +#define __GRPC_INTERNAL_IOMGR_WAKEUP_FD_H_ + +typedef struct grpc_wakeup_fd_info grpc_wakeup_fd_info; + +void grpc_wakeup_fd_global_init(void); +void grpc_wakeup_fd_global_destroy(void); + + +void grpc_wakeup_fd_create(grpc_wakeup_fd_info *fd_info); +void grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd_info *fd_info); +void grpc_wakeup_fd_wakeup(grpc_wakeup_fd_info *fd_info); +void grpc_wakeup_fd_destroy(grpc_wakeup_fd_info *fd_info); + +#define GRPC_WAKEUP_FD_FD(fd_info) ((fd_info)->read_fd) + +/* Force using the fallback implementation. This is intended for testing + * purposes only.*/ +void grpc_wakeup_fd_global_init_force_fallback(void); + +/* Private structures; don't access their fields directly outside of wakeup fd + * code. */ +struct grpc_wakeup_fd_info { + int read_fd; + int write_fd; +}; + +typedef struct grpc_wakeup_fd_vtable { + void (*create)(grpc_wakeup_fd_info *fd_info); + void (*consume)(grpc_wakeup_fd_info *fd_info); + void (*wakeup)(grpc_wakeup_fd_info *fd_info); + void (*destroy)(grpc_wakeup_fd_info *fd_info); + /* Must be called before calling any other functions */ + int (*check_availability)(void); +} grpc_wakeup_fd_vtable; + +/* Defined in some specialized implementation's .c file, or by + * wakeup_fd_nospecial.c if no such implementation exists. */ +extern const grpc_wakeup_fd_vtable specialized_wakeup_fd_vtable; + +#endif /* __GRPC_INTERNAL_IOMGR_WAKEUP_FD_H_ */ diff --git a/src/core/iomgr/pollset_kick_eventfd.c b/src/core/iomgr/wakeup_fd_eventfd.c similarity index 76% rename from src/core/iomgr/pollset_kick_eventfd.c rename to src/core/iomgr/wakeup_fd_eventfd.c index 301ebad875862ef0ec7a18a54c058b8dcef1ec90..4c495e675a6f6b061fa73e49d57fcce0744d9ef5 100644 --- a/src/core/iomgr/pollset_kick_eventfd.c +++ b/src/core/iomgr/wakeup_fd_eventfd.c @@ -31,17 +31,18 @@ * */ -#include "src/core/iomgr/pollset_kick_eventfd.h" +#include <grpc/support/port_platform.h> #ifdef GPR_LINUX_EVENTFD + #include <errno.h> #include <sys/eventfd.h> #include <unistd.h> -#include <grpc/support/port_platform.h> +#include "src/core/iomgr/wakeup_fd.h" #include <grpc/support/log.h> -static void eventfd_create(grpc_kick_fd_info *fd_info) { +static void eventfd_create(grpc_wakeup_fd_info *fd_info) { int efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); /* TODO(klempner): Handle failure more gracefully */ GPR_ASSERT(efd >= 0); @@ -49,7 +50,7 @@ static void eventfd_create(grpc_kick_fd_info *fd_info) { fd_info->write_fd = -1; } -static void eventfd_consume(grpc_kick_fd_info *fd_info) { +static void eventfd_consume(grpc_wakeup_fd_info *fd_info) { eventfd_t value; int err; do { @@ -57,29 +58,25 @@ static void eventfd_consume(grpc_kick_fd_info *fd_info) { } while (err < 0 && errno == EINTR); } -static void eventfd_kick(grpc_kick_fd_info *fd_info) { +static void eventfd_wakeup(grpc_wakeup_fd_info *fd_info) { int err; do { err = eventfd_write(fd_info->read_fd, 1); } while (err < 0 && errno == EINTR); } -static void eventfd_destroy(grpc_kick_fd_info *fd_info) { +static void eventfd_destroy(grpc_wakeup_fd_info *fd_info) { close(fd_info->read_fd); } -static const grpc_pollset_kick_vtable eventfd_kick_vtable = { - eventfd_create, eventfd_consume, eventfd_kick, eventfd_destroy -}; - -const grpc_pollset_kick_vtable *grpc_pollset_kick_eventfd_init(void) { - /* TODO(klempner): Check that eventfd works */ - return &eventfd_kick_vtable; +static int eventfd_check_availability(void) { + /* TODO(klempner): Actually check if eventfd is available */ + return 1; } -#else /* GPR_LINUX_EVENTFD not defined */ -const grpc_pollset_kick_vtable *grpc_pollset_kick_eventfd_init(void) { - return NULL; -} +const grpc_wakeup_fd_vtable specialized_wakeup_fd_vtable = { + eventfd_create, eventfd_consume, eventfd_wakeup, eventfd_destroy, + eventfd_check_availability +}; #endif /* GPR_LINUX_EVENTFD */ diff --git a/src/core/iomgr/pollset_kick_posix.h b/src/core/iomgr/wakeup_fd_nospecial.c similarity index 68% rename from src/core/iomgr/pollset_kick_posix.h rename to src/core/iomgr/wakeup_fd_nospecial.c index 5eb499876071b6ea65608d7f815fe2c65691d48c..21e8074d50ea495889b85163bfe76700c60327d1 100644 --- a/src/core/iomgr/pollset_kick_posix.h +++ b/src/core/iomgr/wakeup_fd_nospecial.c @@ -31,28 +31,23 @@ * */ -#ifndef __GRPC_INTERNAL_IOMGR_POLLSET_KICK_POSIX_H_ -#define __GRPC_INTERNAL_IOMGR_POLLSET_KICK_POSIX_H_ +/* + * This is a dummy file to provide an invalid specialized_wakeup_fd_vtable on + * systems without anything better than pipe. + */ + +#include <grpc/support/port_platform.h> -#include <grpc/support/sync.h> +#ifndef GPR_POSIX_HAS_SPECIAL_WAKEUP_FD -typedef struct grpc_kick_fd_info { - int read_fd; - int write_fd; - struct grpc_kick_fd_info *next; -} grpc_kick_fd_info; +#include "src/core/iomgr/wakeup_fd.h" -typedef struct grpc_pollset_kick_vtable { - void (*create)(struct grpc_kick_fd_info *fd_info); - void (*consume)(struct grpc_kick_fd_info *fd_info); - void (*kick)(struct grpc_kick_fd_info *fd_info); - void (*destroy)(struct grpc_kick_fd_info *fd_info); -} grpc_pollset_kick_vtable; +static int check_availability_invalid(void) { + return 0; +} -typedef struct grpc_pollset_kick_state { - gpr_mu mu; - int kicked; - struct grpc_kick_fd_info *fd_info; -} grpc_pollset_kick_state; +const grpc_wakeup_fd_vtable specialized_wakeup_fd_vtable = { + NULL, NULL, NULL, NULL, check_availability_invalid +}; -#endif /* __GRPC_INTERNAL_IOMGR_POLLSET_KICK_POSIX_H_ */ +#endif /* GPR_POSIX_HAS_SPECIAL_WAKEUP */ diff --git a/src/core/iomgr/wakeup_fd_pipe.c b/src/core/iomgr/wakeup_fd_pipe.c new file mode 100644 index 0000000000000000000000000000000000000000..f8a0aeb606c3880282d2d3d67a0faf78cbaf2108 --- /dev/null +++ b/src/core/iomgr/wakeup_fd_pipe.c @@ -0,0 +1,93 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +/* TODO(klempner): Allow this code to be disabled. */ +#include "src/core/iomgr/wakeup_fd.h" + +#include <errno.h> +#include <string.h> +#include <unistd.h> + +#include "src/core/iomgr/socket_utils_posix.h" +#include <grpc/support/log.h> + +static void pipe_create(grpc_wakeup_fd_info *fd_info) { + int pipefd[2]; + /* TODO(klempner): Make this nonfatal */ + GPR_ASSERT(0 == pipe(pipefd)); + GPR_ASSERT(grpc_set_socket_nonblocking(pipefd[0], 1)); + GPR_ASSERT(grpc_set_socket_nonblocking(pipefd[1], 1)); + fd_info->read_fd = pipefd[0]; + fd_info->write_fd = pipefd[1]; +} + +static void pipe_consume(grpc_wakeup_fd_info *fd_info) { + char buf[128]; + int r; + + for (;;) { + r = read(fd_info->read_fd, buf, sizeof(buf)); + if (r > 0) continue; + if (r == 0) return; + switch (errno) { + case EAGAIN: + return; + case EINTR: + continue; + default: + gpr_log(GPR_ERROR, "error reading pipe: %s", strerror(errno)); + return; + } + } +} + +static void pipe_wakeup(grpc_wakeup_fd_info *fd_info) { + char c = 0; + while (write(fd_info->write_fd, &c, 1) != 1 && errno == EINTR) + ; +} + +static void pipe_destroy(grpc_wakeup_fd_info *fd_info) { + close(fd_info->read_fd); + close(fd_info->write_fd); +} + +static int pipe_check_availability(void) { + /* Assume that pipes are always available. */ + return 1; +} + +const grpc_wakeup_fd_vtable pipe_wakeup_fd_vtable = { + pipe_create, pipe_consume, pipe_wakeup, pipe_destroy, pipe_check_availability +}; + diff --git a/src/core/iomgr/pollset_kick_eventfd.h b/src/core/iomgr/wakeup_fd_pipe.h similarity index 81% rename from src/core/iomgr/pollset_kick_eventfd.h rename to src/core/iomgr/wakeup_fd_pipe.h index f06f7f65ec3c7e9b2f35cc06ad8d4d2e8b2a447e..8e2ed85885b4b5d5c5e89e2953fdba2f667f7b5b 100644 --- a/src/core/iomgr/pollset_kick_eventfd.h +++ b/src/core/iomgr/wakeup_fd_pipe.h @@ -31,12 +31,11 @@ * */ -#ifndef __GRPC_INTERNAL_IOMGR_POLLSET_KICK_EVENTFD_H_ -#define __GRPC_INTERNAL_IOMGR_POLLSET_KICK_EVENTFD_H_ +#ifndef __GRPC_INTERNAL_IOMGR_WAKEUP_FD_PIPE_H_ +#define __GRPC_INTERNAL_IOMGR_WAKEUP_FD_PIPE_H_ -#include "src/core/iomgr/pollset_kick_posix.h" +#include "src/core/iomgr/wakeup_fd.h" -/* Tries to enable eventfd support, returns a kick vtable if successful. */ -const grpc_pollset_kick_vtable *grpc_pollset_kick_eventfd_init(void); +extern grpc_wakeup_fd_vtable pipe_wakeup_fd_vtable; -#endif /* __GRPC_INTERNAL_IOMGR_POLLSET_KICK_EVENTFD_H_ */ +#endif /* __GRPC_INTERNAL_IOMGR_WAKEUP_FD_PIPE_H_ */ diff --git a/test/core/iomgr/poll_kick_test.c b/test/core/iomgr/poll_kick_test.c index b1d134d746f9317abcb2951039b298ac69e88073..3c6d815c9de79fca9ea3327600fb554b608f8fc7 100644 --- a/test/core/iomgr/poll_kick_test.c +++ b/test/core/iomgr/poll_kick_test.c @@ -122,7 +122,7 @@ int main(int argc, char **argv) { run_tests(); grpc_pollset_kick_global_destroy(); - grpc_pollset_kick_global_init_posix(); + grpc_pollset_kick_global_init_fallback_fd(); run_tests(); grpc_pollset_kick_global_destroy(); return 0; diff --git a/vsprojects/vs2013/grpc.vcxproj b/vsprojects/vs2013/grpc.vcxproj index cdfe2b391eade64b14d42d4d44238dbb21cfa8b8..8cec7944395cdd521667c73bd9c6d659f4469355 100644 --- a/vsprojects/vs2013/grpc.vcxproj +++ b/vsprojects/vs2013/grpc.vcxproj @@ -120,8 +120,6 @@ <ClInclude Include="..\..\src\core\iomgr\iomgr_posix.h" /> <ClInclude Include="..\..\src\core\iomgr\pollset.h" /> <ClInclude Include="..\..\src\core\iomgr\pollset_kick.h" /> - <ClInclude Include="..\..\src\core\iomgr\pollset_kick_posix.h" /> - <ClInclude Include="..\..\src\core\iomgr\pollset_kick_eventfd.h" /> <ClInclude Include="..\..\src\core\iomgr\pollset_posix.h" /> <ClInclude Include="..\..\src\core\iomgr\resolve_address.h" /> <ClInclude Include="..\..\src\core\iomgr\sockaddr.h" /> @@ -133,6 +131,7 @@ <ClInclude Include="..\..\src\core\iomgr\tcp_posix.h" /> <ClInclude Include="..\..\src\core\iomgr\tcp_server.h" /> <ClInclude Include="..\..\src\core\iomgr\time_averaged_stats.h" /> + <ClInclude Include="..\..\src\core\iomgr\wakeup_fd.h" /> <ClInclude Include="..\..\src\core\statistics\census_interface.h" /> <ClInclude Include="..\..\src\core\statistics\census_log.h" /> <ClInclude Include="..\..\src\core\statistics\census_rpc_stats.h" /> @@ -249,9 +248,7 @@ </ClCompile> <ClCompile Include="..\..\src\core\iomgr\iomgr_posix.c"> </ClCompile> - <ClCompile Include="..\..\src\core\iomgr\pollset_kick_posix.c"> - </ClCompile> - <ClCompile Include="..\..\src\core\iomgr\pollset_kick_eventfd.c"> + <ClCompile Include="..\..\src\core\iomgr\pollset_kick.c"> </ClCompile> <ClCompile Include="..\..\src\core\iomgr\pollset_multipoller_with_poll_posix.c"> </ClCompile> @@ -275,6 +272,14 @@ </ClCompile> <ClCompile Include="..\..\src\core\iomgr\time_averaged_stats.c"> </ClCompile> + <ClCompile Include="..\..\src\core\iomgr\wakeup_fd.c"> + </ClCompile> + <ClCompile Include="..\..\src\core\iomgr\wakeup_fd_eventfd.c"> + </ClCompile> + <ClCompile Include="..\..\src\core\iomgr\wakeup_fd_nospecial.c"> + </ClCompile> + <ClCompile Include="..\..\src\core\iomgr\wakeup_fd_pipe.c"> + </ClCompile> <ClCompile Include="..\..\src\core\statistics\census_init.c"> </ClCompile> <ClCompile Include="..\..\src\core\statistics\census_log.c"> diff --git a/vsprojects/vs2013/grpc_unsecure.vcxproj b/vsprojects/vs2013/grpc_unsecure.vcxproj index cdfe2b391eade64b14d42d4d44238dbb21cfa8b8..8cec7944395cdd521667c73bd9c6d659f4469355 100644 --- a/vsprojects/vs2013/grpc_unsecure.vcxproj +++ b/vsprojects/vs2013/grpc_unsecure.vcxproj @@ -120,8 +120,6 @@ <ClInclude Include="..\..\src\core\iomgr\iomgr_posix.h" /> <ClInclude Include="..\..\src\core\iomgr\pollset.h" /> <ClInclude Include="..\..\src\core\iomgr\pollset_kick.h" /> - <ClInclude Include="..\..\src\core\iomgr\pollset_kick_posix.h" /> - <ClInclude Include="..\..\src\core\iomgr\pollset_kick_eventfd.h" /> <ClInclude Include="..\..\src\core\iomgr\pollset_posix.h" /> <ClInclude Include="..\..\src\core\iomgr\resolve_address.h" /> <ClInclude Include="..\..\src\core\iomgr\sockaddr.h" /> @@ -133,6 +131,7 @@ <ClInclude Include="..\..\src\core\iomgr\tcp_posix.h" /> <ClInclude Include="..\..\src\core\iomgr\tcp_server.h" /> <ClInclude Include="..\..\src\core\iomgr\time_averaged_stats.h" /> + <ClInclude Include="..\..\src\core\iomgr\wakeup_fd.h" /> <ClInclude Include="..\..\src\core\statistics\census_interface.h" /> <ClInclude Include="..\..\src\core\statistics\census_log.h" /> <ClInclude Include="..\..\src\core\statistics\census_rpc_stats.h" /> @@ -249,9 +248,7 @@ </ClCompile> <ClCompile Include="..\..\src\core\iomgr\iomgr_posix.c"> </ClCompile> - <ClCompile Include="..\..\src\core\iomgr\pollset_kick_posix.c"> - </ClCompile> - <ClCompile Include="..\..\src\core\iomgr\pollset_kick_eventfd.c"> + <ClCompile Include="..\..\src\core\iomgr\pollset_kick.c"> </ClCompile> <ClCompile Include="..\..\src\core\iomgr\pollset_multipoller_with_poll_posix.c"> </ClCompile> @@ -275,6 +272,14 @@ </ClCompile> <ClCompile Include="..\..\src\core\iomgr\time_averaged_stats.c"> </ClCompile> + <ClCompile Include="..\..\src\core\iomgr\wakeup_fd.c"> + </ClCompile> + <ClCompile Include="..\..\src\core\iomgr\wakeup_fd_eventfd.c"> + </ClCompile> + <ClCompile Include="..\..\src\core\iomgr\wakeup_fd_nospecial.c"> + </ClCompile> + <ClCompile Include="..\..\src\core\iomgr\wakeup_fd_pipe.c"> + </ClCompile> <ClCompile Include="..\..\src\core\statistics\census_init.c"> </ClCompile> <ClCompile Include="..\..\src\core\statistics\census_log.c">