diff --git a/src/core/channel/child_channel.c b/src/core/channel/child_channel.c index a2f3c54290a4a8b4a13691c54e3a6f11744c197f..bdd414711730b3b3ed50ffbc373632f9aee90e5c 100644 --- a/src/core/channel/child_channel.c +++ b/src/core/channel/child_channel.c @@ -154,9 +154,18 @@ static void lb_destroy_channel_elem(grpc_channel_element *elem) { } const grpc_channel_filter grpc_child_channel_top_filter = { - lb_start_transport_op, lb_channel_op, sizeof(lb_call_data), - lb_init_call_elem, lb_destroy_call_elem, sizeof(lb_channel_data), - lb_init_channel_elem, lb_destroy_channel_elem, "child-channel", + lb_start_transport_op, + lb_channel_op, + + sizeof(lb_call_data), + lb_init_call_elem, + lb_destroy_call_elem, + + sizeof(lb_channel_data), + lb_init_channel_elem, + lb_destroy_channel_elem, + + "child-channel", }; /* grpc_child_channel proper */ diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c index a1c3938a33ce7eb89dfa3101993533c8173e1df4..62cb959c16a0436ea0ce08a71246813de515a180 100644 --- a/src/core/iomgr/pollset_multipoller_with_epoll.c +++ b/src/core/iomgr/pollset_multipoller_with_epoll.c @@ -140,10 +140,6 @@ static int multipoll_with_epoll_pollset_maybe_work( gpr_mu_lock(&pollset->mu); pollset->counter -= 1; - /* TODO(klempner): This should signal once per event rather than broadcast, - * although it probably doesn't matter because threads will generally be - * blocked in epoll_wait rather than being blocked on the cv. */ - gpr_cv_broadcast(&pollset->cv); return 1; } diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c index 4d36107ab0fed41df0beb74d2367dcdae70e36d5..b263001739fb4b65c7167fccd73c29c4a29f6090 100644 --- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c +++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c @@ -110,9 +110,6 @@ static int multipoll_with_poll_pollset_maybe_work( size_t i, np, nf, nd; pollset_hdr *h; - if (pollset->counter) { - return 0; - } h = pollset->data.ptr; if (gpr_time_cmp(deadline, gpr_inf_future) == 0) { timeout = -1; @@ -163,7 +160,7 @@ static int multipoll_with_poll_pollset_maybe_work( end_polling(pollset); return 0; } - pollset->counter = 1; + pollset->counter++; gpr_mu_unlock(&pollset->mu); for (i = 1; i < np; i++) { @@ -197,8 +194,7 @@ static int multipoll_with_poll_pollset_maybe_work( grpc_pollset_kick_post_poll(&pollset->kick_state); gpr_mu_lock(&pollset->mu); - pollset->counter = 0; - gpr_cv_broadcast(&pollset->cv); + pollset->counter--; return 1; } diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index be4289fbf632e1a1bfc70cb23a244d49947990b4..4a545c673bc11b63b8f6906bf7f676e087cc822a 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -97,7 +97,6 @@ static void become_unary_pollset(grpc_pollset *pollset, grpc_fd *fd); void grpc_pollset_init(grpc_pollset *pollset) { gpr_mu_init(&pollset->mu); - gpr_cv_init(&pollset->cv); grpc_pollset_kick_init(&pollset->kick_state); pollset->in_flight_cbs = 0; pollset->shutting_down = 0; @@ -107,14 +106,12 @@ void grpc_pollset_init(grpc_pollset *pollset) { void grpc_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { gpr_mu_lock(&pollset->mu); pollset->vtable->add_fd(pollset, fd); - gpr_cv_broadcast(&pollset->cv); gpr_mu_unlock(&pollset->mu); } void grpc_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) { gpr_mu_lock(&pollset->mu); pollset->vtable->del_fd(pollset, fd); - gpr_cv_broadcast(&pollset->cv); gpr_mu_unlock(&pollset->mu); } @@ -165,7 +162,6 @@ void grpc_pollset_destroy(grpc_pollset *pollset) { pollset->vtable->destroy(pollset); grpc_pollset_kick_destroy(&pollset->kick_state); gpr_mu_destroy(&pollset->mu); - gpr_cv_destroy(&pollset->cv); } /* @@ -268,7 +264,6 @@ static void unary_poll_do_promote(void *args, int success) { } } - gpr_cv_broadcast(&pollset->cv); gpr_mu_unlock(&pollset->mu); if (do_shutdown_cb) { @@ -334,20 +329,15 @@ static int unary_poll_pollset_maybe_work(grpc_pollset *pollset, int timeout; int r; - if (pollset->counter) { - return 0; - } if (pollset->in_flight_cbs) { /* Give do_promote priority so we don't starve it out */ - gpr_mu_unlock(&pollset->mu); - gpr_mu_lock(&pollset->mu); - return 0; + return 1; } fd = pollset->data.ptr; if (grpc_fd_is_orphaned(fd)) { grpc_fd_unref(fd); become_empty_pollset(pollset); - return 0; + return 1; } if (gpr_time_cmp(deadline, gpr_inf_future) == 0) { timeout = -1; @@ -366,7 +356,7 @@ static int unary_poll_pollset_maybe_work(grpc_pollset *pollset, pfd[0].revents = 0; pfd[1].fd = fd->fd; pfd[1].revents = 0; - pollset->counter = 1; + pollset->counter++; gpr_mu_unlock(&pollset->mu); pfd[1].events = grpc_fd_begin_poll(fd, pollset, POLLIN, POLLOUT, &fd_watcher); @@ -399,8 +389,7 @@ static int unary_poll_pollset_maybe_work(grpc_pollset *pollset, grpc_pollset_kick_post_poll(&pollset->kick_state); gpr_mu_lock(&pollset->mu); - pollset->counter = 0; - gpr_cv_broadcast(&pollset->cv); + pollset->counter--; return 1; } diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h index 1a92e151acb22b9731621afe9c07c5cf2c8efe65..ff9e119252fd7c43e783eeef4d36e640699f4d50 100644 --- a/src/core/iomgr/pollset_posix.h +++ b/src/core/iomgr/pollset_posix.h @@ -52,7 +52,6 @@ typedef struct grpc_pollset { few fds, and an epoll() based implementation for many fds */ const grpc_pollset_vtable *vtable; gpr_mu mu; - gpr_cv cv; grpc_pollset_kick_state kick_state; int counter; int in_flight_cbs; @@ -75,7 +74,6 @@ struct grpc_pollset_vtable { }; #define GRPC_POLLSET_MU(pollset) (&(pollset)->mu) -#define GRPC_POLLSET_CV(pollset) (&(pollset)->cv) /* Add an fd to a pollset */ void grpc_pollset_add_fd(grpc_pollset *pollset, struct grpc_fd *fd); diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c index 5af0685f9d97818dee8255028af3db176155174f..8484bb1ff06019ac1245865368ad30b04e8e3e51 100644 --- a/src/core/iomgr/pollset_windows.c +++ b/src/core/iomgr/pollset_windows.c @@ -48,7 +48,6 @@ void grpc_pollset_init(grpc_pollset *pollset) { gpr_mu_init(&pollset->mu); - gpr_cv_init(&pollset->cv); } void grpc_pollset_shutdown(grpc_pollset *pollset, @@ -59,7 +58,6 @@ void grpc_pollset_shutdown(grpc_pollset *pollset, void grpc_pollset_destroy(grpc_pollset *pollset) { gpr_mu_destroy(&pollset->mu); - gpr_cv_destroy(&pollset->cv); } int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) { diff --git a/src/core/iomgr/pollset_windows.h b/src/core/iomgr/pollset_windows.h index e1115bac4ff0fc74425c451f59caa8f07f761aa3..acd82d0a0a420c1ed652fb65db4bf797332d3af1 100644 --- a/src/core/iomgr/pollset_windows.h +++ b/src/core/iomgr/pollset_windows.h @@ -47,10 +47,8 @@ typedef struct grpc_pollset { gpr_mu mu; - gpr_cv cv; } grpc_pollset; #define GRPC_POLLSET_MU(pollset) (&(pollset)->mu) -#define GRPC_POLLSET_CV(pollset) (&(pollset)->cv) #endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_WINDOWS_H */ diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index 8c9ca48a059776510b4f62372d111104a02dee2d..f0fa25712a4058562ff87eff0be776b024da5a94 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -59,9 +59,6 @@ typedef struct event { /* Completion queue structure */ struct grpc_completion_queue { - /* TODO(ctiller): see if this can be removed */ - int allow_polling; - /* When refs drops to zero, we are in shutdown mode, and will be destroyable once all queued events are drained */ gpr_refcount refs; @@ -85,7 +82,6 @@ grpc_completion_queue *grpc_completion_queue_create(void) { gpr_ref_init(&cc->refs, 1); gpr_ref_init(&cc->owning_refs, 1); grpc_pollset_init(&cc->pollset); - cc->allow_polling = 1; return cc; } @@ -106,10 +102,6 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc) { } } -void grpc_completion_queue_dont_poll_test_only(grpc_completion_queue *cc) { - cc->allow_polling = 0; -} - /* Create and append an event to the queue. Returns the event so that its data members can be filled in. Requires GRPC_POLLSET_MU(&cc->pollset) locked. */ @@ -133,7 +125,6 @@ static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type, ev->bucket_prev = cc->buckets[bucket]->bucket_prev; ev->bucket_next->bucket_prev = ev->bucket_prev->bucket_next = ev; } - gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset)); grpc_pollset_kick(&cc->pollset); return ev; } @@ -151,7 +142,6 @@ static void end_op_locked(grpc_completion_queue *cc, GPR_ASSERT(!cc->shutdown); GPR_ASSERT(cc->shutdown_called); cc->shutdown = 1; - gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset)); } } @@ -205,11 +195,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, ev = create_shutdown_event(); break; } - if (cc->allow_polling && grpc_pollset_work(&cc->pollset, deadline)) { - continue; - } - if (gpr_cv_wait(GRPC_POLLSET_CV(&cc->pollset), - GRPC_POLLSET_MU(&cc->pollset), deadline)) { + if (!grpc_pollset_work(&cc->pollset, deadline)) { gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; @@ -267,11 +253,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, ev = create_shutdown_event(); break; } - if (cc->allow_polling && grpc_pollset_work(&cc->pollset, deadline)) { - continue; - } - if (gpr_cv_wait(GRPC_POLLSET_CV(&cc->pollset), - GRPC_POLLSET_MU(&cc->pollset), deadline)) { + if (!grpc_pollset_work(&cc->pollset, deadline)) { gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; @@ -297,7 +279,6 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) { gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); GPR_ASSERT(!cc->shutdown); cc->shutdown = 1; - gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset)); gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); } } diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h index 7b6fad98fdf84f57d4d9e886d05ead00e84ea7d5..28a4874f67650a4344483a68e1670452b7ebdc18 100644 --- a/src/core/surface/completion_queue.h +++ b/src/core/surface/completion_queue.h @@ -50,9 +50,6 @@ void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call); void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call, int success); -/* disable polling for some tests */ -void grpc_completion_queue_dont_poll_test_only(grpc_completion_queue *cc); - grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc); void grpc_cq_hack_spin_pollset(grpc_completion_queue *cc); diff --git a/test/core/end2end/tests/cancel_in_a_vacuum.c b/test/core/end2end/tests/cancel_in_a_vacuum.c index 1b235b78e9950aace949e15d509a662bd54c3fbb..c380b79b944082c5b8d94b5cebda545e361c596c 100644 --- a/test/core/end2end/tests/cancel_in_a_vacuum.c +++ b/test/core/end2end/tests/cancel_in_a_vacuum.c @@ -46,6 +46,8 @@ enum { TIMEOUT = 200000 }; +static void *tag(gpr_intptr t) { return (void *)t; } + static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config, const char *test_name, grpc_channel_args *client_args, @@ -73,7 +75,8 @@ static void drain_cq(grpc_completion_queue *cq) { static void shutdown_server(grpc_end2end_test_fixture *f) { if (!f->server) return; - grpc_server_shutdown(f->server); + grpc_server_shutdown_and_notify(f->server, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE); grpc_server_destroy(f->server); f->server = NULL; } diff --git a/test/core/surface/completion_queue_test.c b/test/core/surface/completion_queue_test.c index e26f379bfc35fc59feb10b2644db59cbf5a08e72..bca517349c604f989085a9adbb3b9eb260922361 100644 --- a/test/core/surface/completion_queue_test.c +++ b/test/core/surface/completion_queue_test.c @@ -225,8 +225,6 @@ static void test_threading(int producers, int consumers) { gpr_log(GPR_INFO, "%s: %d producers, %d consumers", "test_threading", producers, consumers); - grpc_completion_queue_dont_poll_test_only(cc); - /* start all threads: they will wait for phase1 */ for (i = 0; i < producers + consumers; i++) { gpr_thd_id id;