Skip to content
Snippets Groups Projects
Commit 44883302 authored by Craig Tiller's avatar Craig Tiller
Browse files

Remove condition variable from pollset

parent 0f29c4d3
No related branches found
No related tags found
No related merge requests found
......@@ -154,9 +154,18 @@ static void lb_destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_child_channel_top_filter = {
lb_start_transport_op, lb_channel_op, sizeof(lb_call_data),
lb_init_call_elem, lb_destroy_call_elem, sizeof(lb_channel_data),
lb_init_channel_elem, lb_destroy_channel_elem, "child-channel",
lb_start_transport_op,
lb_channel_op,
sizeof(lb_call_data),
lb_init_call_elem,
lb_destroy_call_elem,
sizeof(lb_channel_data),
lb_init_channel_elem,
lb_destroy_channel_elem,
"child-channel",
};
/* grpc_child_channel proper */
......
......@@ -140,10 +140,6 @@ static int multipoll_with_epoll_pollset_maybe_work(
gpr_mu_lock(&pollset->mu);
pollset->counter -= 1;
/* TODO(klempner): This should signal once per event rather than broadcast,
* although it probably doesn't matter because threads will generally be
* blocked in epoll_wait rather than being blocked on the cv. */
gpr_cv_broadcast(&pollset->cv);
return 1;
}
......
......@@ -110,9 +110,6 @@ static int multipoll_with_poll_pollset_maybe_work(
size_t i, np, nf, nd;
pollset_hdr *h;
if (pollset->counter) {
return 0;
}
h = pollset->data.ptr;
if (gpr_time_cmp(deadline, gpr_inf_future) == 0) {
timeout = -1;
......@@ -163,7 +160,7 @@ static int multipoll_with_poll_pollset_maybe_work(
end_polling(pollset);
return 0;
}
pollset->counter = 1;
pollset->counter++;
gpr_mu_unlock(&pollset->mu);
for (i = 1; i < np; i++) {
......@@ -197,8 +194,7 @@ static int multipoll_with_poll_pollset_maybe_work(
grpc_pollset_kick_post_poll(&pollset->kick_state);
gpr_mu_lock(&pollset->mu);
pollset->counter = 0;
gpr_cv_broadcast(&pollset->cv);
pollset->counter--;
return 1;
}
......
......@@ -97,7 +97,6 @@ static void become_unary_pollset(grpc_pollset *pollset, grpc_fd *fd);
void grpc_pollset_init(grpc_pollset *pollset) {
gpr_mu_init(&pollset->mu);
gpr_cv_init(&pollset->cv);
grpc_pollset_kick_init(&pollset->kick_state);
pollset->in_flight_cbs = 0;
pollset->shutting_down = 0;
......@@ -107,14 +106,12 @@ void grpc_pollset_init(grpc_pollset *pollset) {
void grpc_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) {
gpr_mu_lock(&pollset->mu);
pollset->vtable->add_fd(pollset, fd);
gpr_cv_broadcast(&pollset->cv);
gpr_mu_unlock(&pollset->mu);
}
void grpc_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) {
gpr_mu_lock(&pollset->mu);
pollset->vtable->del_fd(pollset, fd);
gpr_cv_broadcast(&pollset->cv);
gpr_mu_unlock(&pollset->mu);
}
......@@ -165,7 +162,6 @@ void grpc_pollset_destroy(grpc_pollset *pollset) {
pollset->vtable->destroy(pollset);
grpc_pollset_kick_destroy(&pollset->kick_state);
gpr_mu_destroy(&pollset->mu);
gpr_cv_destroy(&pollset->cv);
}
/*
......@@ -268,7 +264,6 @@ static void unary_poll_do_promote(void *args, int success) {
}
}
gpr_cv_broadcast(&pollset->cv);
gpr_mu_unlock(&pollset->mu);
if (do_shutdown_cb) {
......@@ -334,20 +329,15 @@ static int unary_poll_pollset_maybe_work(grpc_pollset *pollset,
int timeout;
int r;
if (pollset->counter) {
return 0;
}
if (pollset->in_flight_cbs) {
/* Give do_promote priority so we don't starve it out */
gpr_mu_unlock(&pollset->mu);
gpr_mu_lock(&pollset->mu);
return 0;
return 1;
}
fd = pollset->data.ptr;
if (grpc_fd_is_orphaned(fd)) {
grpc_fd_unref(fd);
become_empty_pollset(pollset);
return 0;
return 1;
}
if (gpr_time_cmp(deadline, gpr_inf_future) == 0) {
timeout = -1;
......@@ -366,7 +356,7 @@ static int unary_poll_pollset_maybe_work(grpc_pollset *pollset,
pfd[0].revents = 0;
pfd[1].fd = fd->fd;
pfd[1].revents = 0;
pollset->counter = 1;
pollset->counter++;
gpr_mu_unlock(&pollset->mu);
pfd[1].events = grpc_fd_begin_poll(fd, pollset, POLLIN, POLLOUT, &fd_watcher);
......@@ -399,8 +389,7 @@ static int unary_poll_pollset_maybe_work(grpc_pollset *pollset,
grpc_pollset_kick_post_poll(&pollset->kick_state);
gpr_mu_lock(&pollset->mu);
pollset->counter = 0;
gpr_cv_broadcast(&pollset->cv);
pollset->counter--;
return 1;
}
......
......@@ -52,7 +52,6 @@ typedef struct grpc_pollset {
few fds, and an epoll() based implementation for many fds */
const grpc_pollset_vtable *vtable;
gpr_mu mu;
gpr_cv cv;
grpc_pollset_kick_state kick_state;
int counter;
int in_flight_cbs;
......@@ -75,7 +74,6 @@ struct grpc_pollset_vtable {
};
#define GRPC_POLLSET_MU(pollset) (&(pollset)->mu)
#define GRPC_POLLSET_CV(pollset) (&(pollset)->cv)
/* Add an fd to a pollset */
void grpc_pollset_add_fd(grpc_pollset *pollset, struct grpc_fd *fd);
......
......@@ -48,7 +48,6 @@
void grpc_pollset_init(grpc_pollset *pollset) {
gpr_mu_init(&pollset->mu);
gpr_cv_init(&pollset->cv);
}
void grpc_pollset_shutdown(grpc_pollset *pollset,
......@@ -59,7 +58,6 @@ void grpc_pollset_shutdown(grpc_pollset *pollset,
void grpc_pollset_destroy(grpc_pollset *pollset) {
gpr_mu_destroy(&pollset->mu);
gpr_cv_destroy(&pollset->cv);
}
int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) {
......
......@@ -47,10 +47,8 @@
typedef struct grpc_pollset {
gpr_mu mu;
gpr_cv cv;
} grpc_pollset;
#define GRPC_POLLSET_MU(pollset) (&(pollset)->mu)
#define GRPC_POLLSET_CV(pollset) (&(pollset)->cv)
#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_WINDOWS_H */
......@@ -59,9 +59,6 @@ typedef struct event {
/* Completion queue structure */
struct grpc_completion_queue {
/* TODO(ctiller): see if this can be removed */
int allow_polling;
/* When refs drops to zero, we are in shutdown mode, and will be destroyable
once all queued events are drained */
gpr_refcount refs;
......@@ -85,7 +82,6 @@ grpc_completion_queue *grpc_completion_queue_create(void) {
gpr_ref_init(&cc->refs, 1);
gpr_ref_init(&cc->owning_refs, 1);
grpc_pollset_init(&cc->pollset);
cc->allow_polling = 1;
return cc;
}
......@@ -106,10 +102,6 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc) {
}
}
void grpc_completion_queue_dont_poll_test_only(grpc_completion_queue *cc) {
cc->allow_polling = 0;
}
/* Create and append an event to the queue. Returns the event so that its data
members can be filled in.
Requires GRPC_POLLSET_MU(&cc->pollset) locked. */
......@@ -133,7 +125,6 @@ static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type,
ev->bucket_prev = cc->buckets[bucket]->bucket_prev;
ev->bucket_next->bucket_prev = ev->bucket_prev->bucket_next = ev;
}
gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset));
grpc_pollset_kick(&cc->pollset);
return ev;
}
......@@ -151,7 +142,6 @@ static void end_op_locked(grpc_completion_queue *cc,
GPR_ASSERT(!cc->shutdown);
GPR_ASSERT(cc->shutdown_called);
cc->shutdown = 1;
gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset));
}
}
......@@ -205,11 +195,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
ev = create_shutdown_event();
break;
}
if (cc->allow_polling && grpc_pollset_work(&cc->pollset, deadline)) {
continue;
}
if (gpr_cv_wait(GRPC_POLLSET_CV(&cc->pollset),
GRPC_POLLSET_MU(&cc->pollset), deadline)) {
if (!grpc_pollset_work(&cc->pollset, deadline)) {
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
......@@ -267,11 +253,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
ev = create_shutdown_event();
break;
}
if (cc->allow_polling && grpc_pollset_work(&cc->pollset, deadline)) {
continue;
}
if (gpr_cv_wait(GRPC_POLLSET_CV(&cc->pollset),
GRPC_POLLSET_MU(&cc->pollset), deadline)) {
if (!grpc_pollset_work(&cc->pollset, deadline)) {
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
......@@ -297,7 +279,6 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
GPR_ASSERT(!cc->shutdown);
cc->shutdown = 1;
gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset));
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
}
}
......
......@@ -50,9 +50,6 @@ void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call);
void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call,
int success);
/* disable polling for some tests */
void grpc_completion_queue_dont_poll_test_only(grpc_completion_queue *cc);
grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc);
void grpc_cq_hack_spin_pollset(grpc_completion_queue *cc);
......
......@@ -46,6 +46,8 @@
enum { TIMEOUT = 200000 };
static void *tag(gpr_intptr t) { return (void *)t; }
static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config,
const char *test_name,
grpc_channel_args *client_args,
......@@ -73,7 +75,8 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return;
grpc_server_shutdown(f->server);
grpc_server_shutdown_and_notify(f->server, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server);
f->server = NULL;
}
......
......@@ -225,8 +225,6 @@ static void test_threading(int producers, int consumers) {
gpr_log(GPR_INFO, "%s: %d producers, %d consumers", "test_threading", producers,
consumers);
grpc_completion_queue_dont_poll_test_only(cc);
/* start all threads: they will wait for phase1 */
for (i = 0; i < producers + consumers; i++) {
gpr_thd_id id;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment