diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c index 1ab8295e9ec644cfd44bfae31f495a5c1332e304..b696344eaba9330f7a78a83b90e6babbdb7d8844 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c @@ -103,10 +103,9 @@ static void fd_node_destroy(grpc_exec_ctx *exec_ctx, fd_node *fdn) { grpc_pollset_set_del_fd(exec_ctx, fdn->ev_driver->pollset_set, fdn->grpc_fd); /* c-ares library has closed the fd inside grpc_fd. This fd may be picked up immediately by another thread, and should not be closed by the following - grpc_fd_orphan. To prevent this fd from being closed by grpc_fd_orphan, - a fd pointer is provided. */ - int fd; - grpc_fd_orphan(exec_ctx, fdn->grpc_fd, NULL, &fd, "c-ares query finished"); + grpc_fd_orphan. */ + grpc_fd_orphan(exec_ctx, fdn->grpc_fd, NULL, NULL, true /* already_closed */, + "c-ares query finished"); gpr_free(fdn); } diff --git a/src/core/lib/iomgr/ev_epoll1_linux.c b/src/core/lib/iomgr/ev_epoll1_linux.c index b89b8af15a91d1e605edb70475dfa079de5107c9..8db4a92453b9e0d8ba507c6223355e545ed291ee 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.c +++ b/src/core/lib/iomgr/ev_epoll1_linux.c @@ -249,7 +249,7 @@ static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) { static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done, int *release_fd, - const char *reason) { + bool already_closed, const char *reason) { grpc_error *error = GRPC_ERROR_NONE; if (!grpc_lfev_is_shutdown(&fd->read_closure)) { @@ -260,7 +260,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, descriptor fd->fd (but we still own the grpc_fd structure). */ if (release_fd != NULL) { *release_fd = fd->fd; - } else { + } else if (!already_closed) { close(fd->fd); } diff --git a/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c b/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c index 5166dc2ac2ed2f7f8d352a5b0e8ca4db0be6ebfd..f2f3e1570461562c8ae2d243977867aa28070651 100644 --- a/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c +++ b/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c @@ -931,7 +931,7 @@ static int fd_wrapped_fd(grpc_fd *fd) { static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done, int *release_fd, - const char *reason) { + bool already_closed, const char *reason) { grpc_error *error = GRPC_ERROR_NONE; polling_island *unref_pi = NULL; @@ -952,8 +952,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, before doing this.) */ if (fd->po.pi != NULL) { polling_island *pi_latest = polling_island_lock(fd->po.pi); - polling_island_remove_fd_locked(pi_latest, fd, false /* is_fd_closed */, - &error); + polling_island_remove_fd_locked(pi_latest, fd, already_closed, &error); gpr_mu_unlock(&pi_latest->mu); unref_pi = fd->po.pi; diff --git a/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c b/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c index 1058f69a83da00be1cadd11bfb9429f645b21453..07c8eadf4f4d3bd0981b0040803e29f709f771cb 100644 --- a/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c +++ b/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c @@ -493,8 +493,8 @@ static int fd_wrapped_fd(grpc_fd *fd) { static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done, int *release_fd, - const char *reason) { - bool is_fd_closed = false; + bool already_closed, const char *reason) { + bool is_fd_closed = already_closed; grpc_error *error = GRPC_ERROR_NONE; epoll_set *unref_eps = NULL; @@ -505,7 +505,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, descriptor fd->fd (but we still own the grpc_fd structure). */ if (release_fd != NULL) { *release_fd = fd->fd; - } else { + } else if (!is_fd_closed) { close(fd->fd); is_fd_closed = true; } diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c index a2fa4ad9a5da87b0c7fff1162d443ee8e4328838..770d1fd0a9b67133fa131c24e22d512bc2d851fd 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.c +++ b/src/core/lib/iomgr/ev_epollex_linux.c @@ -380,8 +380,8 @@ static int fd_wrapped_fd(grpc_fd *fd) { static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done, int *release_fd, - const char *reason) { - bool is_fd_closed = false; + bool already_closed, const char *reason) { + bool is_fd_closed = already_closed; grpc_error *error = GRPC_ERROR_NONE; gpr_mu_lock(&fd->pollable.po.mu); @@ -392,7 +392,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, descriptor fd->fd (but we still own the grpc_fd structure). */ if (release_fd != NULL) { *release_fd = fd->fd; - } else { + } else if (!is_fd_closed) { close(fd->fd); is_fd_closed = true; } diff --git a/src/core/lib/iomgr/ev_epollsig_linux.c b/src/core/lib/iomgr/ev_epollsig_linux.c index a84c208e1a75ba785266fd0c506360ccbb98bc7b..11067e39590b6e8136d1ad46ac24bd576166d54a 100644 --- a/src/core/lib/iomgr/ev_epollsig_linux.c +++ b/src/core/lib/iomgr/ev_epollsig_linux.c @@ -854,7 +854,7 @@ static int fd_wrapped_fd(grpc_fd *fd) { static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done, int *release_fd, - const char *reason) { + bool already_closed, const char *reason) { grpc_error *error = GRPC_ERROR_NONE; polling_island *unref_pi = NULL; @@ -875,8 +875,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, before doing this.) */ if (fd->po.pi != NULL) { polling_island *pi_latest = polling_island_lock(fd->po.pi); - polling_island_remove_fd_locked(pi_latest, fd, false /* is_fd_closed */, - &error); + polling_island_remove_fd_locked(pi_latest, fd, already_closed, &error); gpr_mu_unlock(&pi_latest->mu); unref_pi = fd->po.pi; diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index 1f8d7eef26dd5f2fdcb7d08608cf4adde51eed7e..365aa583bb8df96dc5421da1256b282f5675fe5c 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -398,11 +398,14 @@ static int fd_wrapped_fd(grpc_fd *fd) { static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done, int *release_fd, - const char *reason) { + bool already_closed, const char *reason) { fd->on_done_closure = on_done; fd->released = release_fd != NULL; - if (fd->released) { + if (release_fd != NULL) { *release_fd = fd->fd; + fd->released = true; + } else if (already_closed) { + fd->released = true; } gpr_mu_lock(&fd->mu); REF_BY(fd, 1, reason); /* remove active status, but keep referenced */ diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c index cff77e641c55c19b3eb9150110a85485c97ed8a0..91f8cd54821143bb26781f2df550b877b5a3a625 100644 --- a/src/core/lib/iomgr/ev_posix.c +++ b/src/core/lib/iomgr/ev_posix.c @@ -170,8 +170,9 @@ int grpc_fd_wrapped_fd(grpc_fd *fd) { } void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done, - int *release_fd, const char *reason) { - g_event_engine->fd_orphan(exec_ctx, fd, on_done, release_fd, reason); + int *release_fd, bool already_closed, const char *reason) { + g_event_engine->fd_orphan(exec_ctx, fd, on_done, release_fd, already_closed, + reason); } void grpc_fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) { diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h index 0de73338433edf7971a7070b7b995c15613f22b8..1108e46ef816d45e91bb6c2800d7440995c6d4fa 100644 --- a/src/core/lib/iomgr/ev_posix.h +++ b/src/core/lib/iomgr/ev_posix.h @@ -37,7 +37,7 @@ typedef struct grpc_event_engine_vtable { grpc_fd *(*fd_create)(int fd, const char *name); int (*fd_wrapped_fd)(grpc_fd *fd); void (*fd_orphan)(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done, - int *release_fd, const char *reason); + int *release_fd, bool already_closed, const char *reason); void (*fd_shutdown)(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why); void (*fd_notify_on_read)(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure); @@ -104,7 +104,7 @@ int grpc_fd_wrapped_fd(grpc_fd *fd); notify_on_write. MUST NOT be called with a pollset lock taken */ void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done, - int *release_fd, const char *reason); + int *release_fd, bool already_closed, const char *reason); /* Has grpc_fd_shutdown been called on an fd? */ bool grpc_fd_is_shutdown(grpc_fd *fd); diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c index 21e320a6e731e781afd60f9c3e8aae62a7b22e2d..a25fba45279aa38598bcef81773742cf9ccf0633 100644 --- a/src/core/lib/iomgr/tcp_client_posix.c +++ b/src/core/lib/iomgr/tcp_client_posix.c @@ -209,7 +209,8 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { finish: if (fd != NULL) { grpc_pollset_set_del_fd(exec_ctx, ac->interested_parties, fd); - grpc_fd_orphan(exec_ctx, fd, NULL, NULL, "tcp_client_orphan"); + grpc_fd_orphan(exec_ctx, fd, NULL, NULL, false /* already_closed */, + "tcp_client_orphan"); fd = NULL; } done = (--ac->refs == 0); @@ -295,7 +296,8 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, } if (errno != EWOULDBLOCK && errno != EINPROGRESS) { - grpc_fd_orphan(exec_ctx, fdobj, NULL, NULL, "tcp_client_connect_error"); + grpc_fd_orphan(exec_ctx, fdobj, NULL, NULL, false /* already_closed */, + "tcp_client_connect_error"); GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_OS_ERROR(errno, "connect")); goto done; } diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index b6dcd15cb08ded358dd59917a025aa27bf9a1cd3..2f543fd8a9ed6f5df626d6a6fa5fefafef50029d 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -156,7 +156,7 @@ static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { grpc_fd_orphan(exec_ctx, tcp->em_fd, tcp->release_fd_cb, tcp->release_fd, - "tcp_unref_orphan"); + false /* already_closed */, "tcp_unref_orphan"); grpc_slice_buffer_destroy_internal(exec_ctx, &tcp->last_read_buffer); grpc_resource_user_unref(exec_ctx, tcp->resource_user); gpr_free(tcp->peer_string); diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c index f304642951ebe6d1cac348fca2930621121fbe3b..0fc5c0fd867d92265c300536a3f0c055fa769d0a 100644 --- a/src/core/lib/iomgr/tcp_server_posix.c +++ b/src/core/lib/iomgr/tcp_server_posix.c @@ -166,7 +166,7 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { GRPC_CLOSURE_INIT(&sp->destroyed_closure, destroyed_port, s, grpc_schedule_on_exec_ctx); grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, NULL, - "tcp_listener_shutdown"); + false /* already_closed */, "tcp_listener_shutdown"); } gpr_mu_unlock(&s->mu); } else { diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c index 54e7f417a7c1590dcbec257189c4a9880fece1d5..88fa34cb7a66815fc9404dd9371d65aca57832ee 100644 --- a/src/core/lib/iomgr/udp_server.c +++ b/src/core/lib/iomgr/udp_server.c @@ -214,7 +214,7 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { sp->server->user_data); } grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, NULL, - "udp_listener_shutdown"); + false /* already_closed */, "udp_listener_shutdown"); } gpr_mu_unlock(&s->mu); } else { diff --git a/test/core/iomgr/ev_epollsig_linux_test.c b/test/core/iomgr/ev_epollsig_linux_test.c index 1d272fa406558de167342d486162cb7d4e0c4aeb..c702065d4d6ea34005c90eafcfdd8b8427562f26 100644 --- a/test/core/iomgr/ev_epollsig_linux_test.c +++ b/test/core/iomgr/ev_epollsig_linux_test.c @@ -79,7 +79,8 @@ static void test_fd_cleanup(grpc_exec_ctx *exec_ctx, test_fd *tfds, GRPC_ERROR_CREATE_FROM_STATIC_STRING("test_fd_cleanup")); grpc_exec_ctx_flush(exec_ctx); - grpc_fd_orphan(exec_ctx, tfds[i].fd, NULL, &release_fd, "test_fd_cleanup"); + grpc_fd_orphan(exec_ctx, tfds[i].fd, NULL, &release_fd, + false /* already_closed */, "test_fd_cleanup"); grpc_exec_ctx_flush(exec_ctx); GPR_ASSERT(release_fd == tfds[i].inner_fd); @@ -294,7 +295,8 @@ static void test_threading(void) { { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_fd_shutdown(&exec_ctx, shared.wakeup_desc, GRPC_ERROR_CANCELLED); - grpc_fd_orphan(&exec_ctx, shared.wakeup_desc, NULL, NULL, "done"); + grpc_fd_orphan(&exec_ctx, shared.wakeup_desc, NULL, NULL, + false /* already_closed */, "done"); grpc_pollset_shutdown(&exec_ctx, shared.pollset, GRPC_CLOSURE_CREATE(destroy_pollset, shared.pollset, grpc_schedule_on_exec_ctx)); diff --git a/test/core/iomgr/fd_posix_test.c b/test/core/iomgr/fd_posix_test.c index 02596450d225be3af3cf962e8ca6cf8eda3bb49c..85d5d9c07f3859ed562eadc3b59c4a6bf0d5dd00 100644 --- a/test/core/iomgr/fd_posix_test.c +++ b/test/core/iomgr/fd_posix_test.c @@ -114,7 +114,8 @@ static void session_shutdown_cb(grpc_exec_ctx *exec_ctx, void *arg, /*session */ bool success) { session *se = arg; server *sv = se->sv; - grpc_fd_orphan(exec_ctx, se->em_fd, NULL, NULL, "a"); + grpc_fd_orphan(exec_ctx, se->em_fd, NULL, NULL, false /* already_closed */, + "a"); gpr_free(se); /* Start to shutdown listen fd. */ grpc_fd_shutdown(exec_ctx, sv->em_fd, @@ -171,7 +172,8 @@ static void listen_shutdown_cb(grpc_exec_ctx *exec_ctx, void *arg /*server */, int success) { server *sv = arg; - grpc_fd_orphan(exec_ctx, sv->em_fd, NULL, NULL, "b"); + grpc_fd_orphan(exec_ctx, sv->em_fd, NULL, NULL, false /* already_closed */, + "b"); gpr_mu_lock(g_mu); sv->done = 1; @@ -291,7 +293,8 @@ static void client_init(client *cl) { static void client_session_shutdown_cb(grpc_exec_ctx *exec_ctx, void *arg /*client */, int success) { client *cl = arg; - grpc_fd_orphan(exec_ctx, cl->em_fd, NULL, NULL, "c"); + grpc_fd_orphan(exec_ctx, cl->em_fd, NULL, NULL, false /* already_closed */, + "c"); cl->done = 1; GPR_ASSERT( GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL))); @@ -511,7 +514,7 @@ static void test_grpc_fd_change(void) { GPR_ASSERT(b.cb_that_ran == second_read_callback); gpr_mu_unlock(g_mu); - grpc_fd_orphan(&exec_ctx, em_fd, NULL, NULL, "d"); + grpc_fd_orphan(&exec_ctx, em_fd, NULL, NULL, false /* already_closed */, "d"); grpc_exec_ctx_finish(&exec_ctx); destroy_change_data(&a); destroy_change_data(&b); diff --git a/test/core/iomgr/pollset_set_test.c b/test/core/iomgr/pollset_set_test.c index 6aedaf1081a611885aa9cff599ea47a3e15566b9..5750ac0f4b33afd1ce80268f09e56c1afc9838c4 100644 --- a/test/core/iomgr/pollset_set_test.c +++ b/test/core/iomgr/pollset_set_test.c @@ -137,7 +137,8 @@ static void cleanup_test_fds(grpc_exec_ctx *exec_ctx, test_fd *tfds, * grpc_wakeup_fd and we would like to destroy it ourselves (by calling * grpc_wakeup_fd_destroy). To prevent grpc_fd from calling close() on the * underlying fd, call it with a non-NULL 'release_fd' parameter */ - grpc_fd_orphan(exec_ctx, tfds[i].fd, NULL, &release_fd, "test_fd_cleanup"); + grpc_fd_orphan(exec_ctx, tfds[i].fd, NULL, &release_fd, + false /* already_closed */, "test_fd_cleanup"); grpc_exec_ctx_flush(exec_ctx); grpc_wakeup_fd_destroy(&tfds[i].wakeup_fd); diff --git a/test/cpp/microbenchmarks/bm_pollset.cc b/test/cpp/microbenchmarks/bm_pollset.cc index 683f4703c2aa1ef770f3c89957510500a7c66fb4..1fc1f2f83b1d5cfb49a7e60fc29b6cda4875f59b 100644 --- a/test/cpp/microbenchmarks/bm_pollset.cc +++ b/test/cpp/microbenchmarks/bm_pollset.cc @@ -149,7 +149,7 @@ static void BM_PollAddFd(benchmark::State& state) { grpc_pollset_add_fd(&exec_ctx, ps, fd); grpc_exec_ctx_flush(&exec_ctx); } - grpc_fd_orphan(&exec_ctx, fd, NULL, NULL, "xxx"); + grpc_fd_orphan(&exec_ctx, fd, NULL, NULL, false /* already_closed */, "xxx"); grpc_closure shutdown_ps_closure; GRPC_CLOSURE_INIT(&shutdown_ps_closure, shutdown_ps, ps, grpc_schedule_on_exec_ctx); @@ -247,7 +247,8 @@ static void BM_SingleThreadPollOneFd(benchmark::State& state) { while (!done) { GRPC_ERROR_UNREF(grpc_pollset_work(&exec_ctx, ps, NULL, now, deadline)); } - grpc_fd_orphan(&exec_ctx, wakeup, NULL, NULL, "done"); + grpc_fd_orphan(&exec_ctx, wakeup, NULL, NULL, false /* already_closed */, + "done"); wakeup_fd.read_fd = 0; grpc_closure shutdown_ps_closure; GRPC_CLOSURE_INIT(&shutdown_ps_closure, shutdown_ps, ps,