Skip to content
Snippets Groups Projects
Commit 69f90e63 authored by Craig Tiller's avatar Craig Tiller
Browse files

Working towards a non-blocking API test

parent 5148694b
No related branches found
No related tags found
No related merge requests found
...@@ -76,8 +76,8 @@ void grpc_pollset_destroy(grpc_pollset *pollset); ...@@ -76,8 +76,8 @@ void grpc_pollset_destroy(grpc_pollset *pollset);
Returns true if some work has been done, and false if the deadline Returns true if some work has been done, and false if the deadline
expired. */ expired. */
int grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
gpr_timespec deadline); gpr_timespec now, gpr_timespec deadline);
/* Break one polling thread out of polling work for this pollset. /* Break one polling thread out of polling work for this pollset.
If specific_worker is GRPC_POLLSET_KICK_BROADCAST, kick ALL the workers. If specific_worker is GRPC_POLLSET_KICK_BROADCAST, kick ALL the workers.
......
...@@ -181,7 +181,7 @@ static void multipoll_with_epoll_pollset_maybe_work( ...@@ -181,7 +181,7 @@ static void multipoll_with_epoll_pollset_maybe_work(
pfds[1].events = POLLIN; pfds[1].events = POLLIN;
pfds[1].revents = 0; pfds[1].revents = 0;
poll_rv = poll(pfds, 2, timeout_ms); poll_rv = grpc_poll_function(pfds, 2, timeout_ms);
if (poll_rv < 0) { if (poll_rv < 0) {
if (errno != EINTR) { if (errno != EINTR) {
......
...@@ -144,7 +144,7 @@ static void multipoll_with_poll_pollset_maybe_work( ...@@ -144,7 +144,7 @@ static void multipoll_with_poll_pollset_maybe_work(
POLLOUT, &watchers[i]); POLLOUT, &watchers[i]);
} }
r = poll(pfds, pfd_count, timeout); r = grpc_poll_function(pfds, pfd_count, timeout);
for (i = 1; i < pfd_count; i++) { for (i = 1; i < pfd_count; i++) {
grpc_fd_end_poll(&watchers[i], pfds[i].revents & POLLIN, grpc_fd_end_poll(&watchers[i], pfds[i].revents & POLLIN,
......
...@@ -38,7 +38,6 @@ ...@@ -38,7 +38,6 @@
#include "src/core/iomgr/pollset_posix.h" #include "src/core/iomgr/pollset_posix.h"
#include <errno.h> #include <errno.h>
#include <poll.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <unistd.h> #include <unistd.h>
...@@ -57,6 +56,8 @@ ...@@ -57,6 +56,8 @@
GPR_TLS_DECL(g_current_thread_poller); GPR_TLS_DECL(g_current_thread_poller);
GPR_TLS_DECL(g_current_thread_worker); GPR_TLS_DECL(g_current_thread_worker);
grpc_poll_function_type grpc_poll_function = poll;
static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) { static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
worker->prev->next = worker->next; worker->prev->next = worker->next;
worker->next->prev = worker->prev; worker->next->prev = worker->prev;
...@@ -168,14 +169,11 @@ static void finish_shutdown(grpc_pollset *pollset) { ...@@ -168,14 +169,11 @@ static void finish_shutdown(grpc_pollset *pollset) {
pollset->shutdown_done_cb(pollset->shutdown_done_arg); pollset->shutdown_done_cb(pollset->shutdown_done_arg);
} }
int grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
gpr_timespec deadline) { gpr_timespec deadline) {
/* pollset->mu already held */ /* pollset->mu already held */
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
int added_worker = 0; int added_worker = 0;
if (gpr_time_cmp(now, deadline) > 0) {
return 0;
}
/* this must happen before we (potentially) drop pollset->mu */ /* this must happen before we (potentially) drop pollset->mu */
worker->next = worker->prev = NULL; worker->next = worker->prev = NULL;
/* TODO(ctiller): pool these */ /* TODO(ctiller): pool these */
...@@ -217,7 +215,6 @@ done: ...@@ -217,7 +215,6 @@ done:
gpr_mu_lock(&pollset->mu); gpr_mu_lock(&pollset->mu);
} }
} }
return 1;
} }
void grpc_pollset_shutdown(grpc_pollset *pollset, void grpc_pollset_shutdown(grpc_pollset *pollset,
...@@ -456,7 +453,7 @@ static void basic_pollset_maybe_work(grpc_pollset *pollset, ...@@ -456,7 +453,7 @@ static void basic_pollset_maybe_work(grpc_pollset *pollset,
/* poll fd count (argument 2) is shortened by one if we have no events /* poll fd count (argument 2) is shortened by one if we have no events
to poll on - such that it only includes the kicker */ to poll on - such that it only includes the kicker */
r = poll(pfd, nfds, timeout); r = grpc_poll_function(pfd, nfds, timeout);
GRPC_TIMER_MARK(GRPC_PTAG_POLL_FINISHED, r); GRPC_TIMER_MARK(GRPC_PTAG_POLL_FINISHED, r);
if (fd) { if (fd) {
......
...@@ -34,6 +34,8 @@ ...@@ -34,6 +34,8 @@
#ifndef GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H #ifndef GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H
#define GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H #define GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H
#include <poll.h>
#include <grpc/support/sync.h> #include <grpc/support/sync.h>
#include "src/core/iomgr/wakeup_fd_posix.h" #include "src/core/iomgr/wakeup_fd_posix.h"
...@@ -117,4 +119,8 @@ void grpc_poll_become_multipoller(grpc_pollset *pollset, struct grpc_fd **fds, ...@@ -117,4 +119,8 @@ void grpc_poll_become_multipoller(grpc_pollset *pollset, struct grpc_fd **fds,
* be locked) */ * be locked) */
int grpc_pollset_has_workers(grpc_pollset *pollset); int grpc_pollset_has_workers(grpc_pollset *pollset);
/* override to allow tests to hook poll() usage */
typedef int (*grpc_poll_function_type)(struct pollfd *, nfds_t, int);
extern grpc_poll_function_type grpc_poll_function;
#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H */ #endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H */
...@@ -164,6 +164,8 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, ...@@ -164,6 +164,8 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
gpr_timespec deadline) { gpr_timespec deadline) {
grpc_event ret; grpc_event ret;
grpc_pollset_worker worker; grpc_pollset_worker worker;
int first_loop = 1;
gpr_timespec now;
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
...@@ -189,12 +191,15 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, ...@@ -189,12 +191,15 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
ret.type = GRPC_QUEUE_SHUTDOWN; ret.type = GRPC_QUEUE_SHUTDOWN;
break; break;
} }
if (!grpc_pollset_work(&cc->pollset, &worker, deadline)) { now = gpr_now(GPR_CLOCK_MONOTONIC);
if (!first_loop && gpr_time_cmp(now, deadline) >= 0) {
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
memset(&ret, 0, sizeof(ret)); memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT; ret.type = GRPC_QUEUE_TIMEOUT;
break; break;
} }
first_loop = 0;
grpc_pollset_work(&cc->pollset, &worker, now, deadline);
} }
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
GRPC_CQ_INTERNAL_UNREF(cc, "next"); GRPC_CQ_INTERNAL_UNREF(cc, "next");
...@@ -232,6 +237,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, ...@@ -232,6 +237,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
grpc_cq_completion *c; grpc_cq_completion *c;
grpc_cq_completion *prev; grpc_cq_completion *prev;
grpc_pollset_worker worker; grpc_pollset_worker worker;
int first_loop = 1;
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
...@@ -272,14 +278,16 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, ...@@ -272,14 +278,16 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
ret.type = GRPC_QUEUE_TIMEOUT; ret.type = GRPC_QUEUE_TIMEOUT;
break; break;
} }
if (!grpc_pollset_work(&cc->pollset, &worker, deadline)) { now = gpr_now(GPR_CLOCK_MONOTONIC);
if (!first_loop && gpr_time_cmp(now, deadline) >= 0) {
del_plucker(cc, tag, &worker); del_plucker(cc, tag, &worker);
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
memset(&ret, 0, sizeof(ret)); memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT; ret.type = GRPC_QUEUE_TIMEOUT;
break; break;
} }
del_plucker(cc, tag, &worker); first_loop = 0;
grpc_pollset_work(&cc->pollset, &worker, now, deadline);
} }
done: done:
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
......
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment