diff --git a/src/core/ext/client_channel/connector.c b/src/core/ext/client_channel/connector.c index 0582e5b3726fe518c848a22e0a3569145c73db3c..7a720fd1bd22063e8cce685c4238aeb957f0d5df 100644 --- a/src/core/ext/client_channel/connector.c +++ b/src/core/ext/client_channel/connector.c @@ -49,7 +49,7 @@ void grpc_connector_connect(grpc_exec_ctx* exec_ctx, grpc_connector* connector, connector->vtable->connect(exec_ctx, connector, in_args, out_args, notify); } -void grpc_connector_shutdown(grpc_exec_ctx* exec_ctx, - grpc_connector* connector) { - connector->vtable->shutdown(exec_ctx, connector); +void grpc_connector_shutdown(grpc_exec_ctx* exec_ctx, grpc_connector* connector, + grpc_error* why) { + connector->vtable->shutdown(exec_ctx, connector, why); } diff --git a/src/core/ext/client_channel/connector.h b/src/core/ext/client_channel/connector.h index 395f89b3b2c00314841a96e833f2fa8bad8357ba..9bff41f003f8795ac2e084579e6ef3cf959c8631 100644 --- a/src/core/ext/client_channel/connector.h +++ b/src/core/ext/client_channel/connector.h @@ -68,7 +68,8 @@ struct grpc_connector_vtable { void (*ref)(grpc_connector *connector); void (*unref)(grpc_exec_ctx *exec_ctx, grpc_connector *connector); /** Implementation of grpc_connector_shutdown */ - void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_connector *connector); + void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_connector *connector, + grpc_error *why); /** Implementation of grpc_connector_connect */ void (*connect)(grpc_exec_ctx *exec_ctx, grpc_connector *connector, const grpc_connect_in_args *in_args, @@ -83,7 +84,7 @@ void grpc_connector_connect(grpc_exec_ctx *exec_ctx, grpc_connector *connector, grpc_connect_out_args *out_args, grpc_closure *notify); /** Cancel any pending connection */ -void grpc_connector_shutdown(grpc_exec_ctx *exec_ctx, - grpc_connector *connector); +void grpc_connector_shutdown(grpc_exec_ctx *exec_ctx, grpc_connector *connector, + grpc_error *why); #endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_CONNECTOR_H */ diff --git a/src/core/ext/client_channel/http_connect_handshaker.c b/src/core/ext/client_channel/http_connect_handshaker.c index 622d236320f3a717440b2c8c22a8d18d230dd20b..58ab233f1b8e89b8af55e8265d8f4d93838c8e6c 100644 --- a/src/core/ext/client_channel/http_connect_handshaker.c +++ b/src/core/ext/client_channel/http_connect_handshaker.c @@ -123,7 +123,8 @@ static void handshake_failed_locked(grpc_exec_ctx* exec_ctx, // before destroying them, even if we know that there are no // pending read/write callbacks. This should be fixed, at which // point this can be removed. - grpc_endpoint_shutdown(exec_ctx, handshaker->args->endpoint); + grpc_endpoint_shutdown(exec_ctx, handshaker->args->endpoint, + GRPC_ERROR_REF(error)); // Not shutting down, so the handshake failed. Clean up before // invoking the callback. cleanup_args_for_failure_locked(exec_ctx, handshaker); @@ -251,15 +252,18 @@ static void http_connect_handshaker_destroy(grpc_exec_ctx* exec_ctx, } static void http_connect_handshaker_shutdown(grpc_exec_ctx* exec_ctx, - grpc_handshaker* handshaker_in) { + grpc_handshaker* handshaker_in, + grpc_error* why) { http_connect_handshaker* handshaker = (http_connect_handshaker*)handshaker_in; gpr_mu_lock(&handshaker->mu); if (!handshaker->shutdown) { handshaker->shutdown = true; - grpc_endpoint_shutdown(exec_ctx, handshaker->args->endpoint); + grpc_endpoint_shutdown(exec_ctx, handshaker->args->endpoint, + GRPC_ERROR_REF(why)); cleanup_args_for_failure_locked(exec_ctx, handshaker); } gpr_mu_unlock(&handshaker->mu); + GRPC_ERROR_UNREF(why); } static void http_connect_handshaker_do_handshake( diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c index c5041f6924f8773c00ef0d2bdb5d7d548ebc97c4..f1e4e079e21e522d17b267199ca050839ce24a0c 100644 --- a/src/core/ext/client_channel/subchannel.c +++ b/src/core/ext/client_channel/subchannel.c @@ -273,7 +273,8 @@ static void disconnect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { gpr_mu_lock(&c->mu); GPR_ASSERT(!c->disconnected); c->disconnected = true; - grpc_connector_shutdown(exec_ctx, c->connector); + grpc_connector_shutdown(exec_ctx, c->connector, + GRPC_ERROR_CREATE("Subchannel disconnected")); con = GET_CONNECTED_SUBCHANNEL(c, no_barrier); if (con != NULL) { GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, con, "connection"); diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.c b/src/core/ext/transport/chttp2/client/chttp2_connector.c index 013c96dc703345a319404dac7470a5085cb6d98e..d0a762a28033b30fc7953c8e72c93e9ed8f8f2ef 100644 --- a/src/core/ext/transport/chttp2/client/chttp2_connector.c +++ b/src/core/ext/transport/chttp2/client/chttp2_connector.c @@ -92,19 +92,21 @@ static void chttp2_connector_unref(grpc_exec_ctx *exec_ctx, } static void chttp2_connector_shutdown(grpc_exec_ctx *exec_ctx, - grpc_connector *con) { + grpc_connector *con, grpc_error *why) { chttp2_connector *c = (chttp2_connector *)con; gpr_mu_lock(&c->mu); c->shutdown = true; if (c->handshake_mgr != NULL) { - grpc_handshake_manager_shutdown(exec_ctx, c->handshake_mgr); + grpc_handshake_manager_shutdown(exec_ctx, c->handshake_mgr, + GRPC_ERROR_REF(why)); } // If handshaking is not yet in progress, shutdown the endpoint. // Otherwise, the handshaker will do this for us. if (!c->connecting && c->endpoint != NULL) { - grpc_endpoint_shutdown(exec_ctx, c->endpoint); + grpc_endpoint_shutdown(exec_ctx, c->endpoint, GRPC_ERROR_REF(why)); } gpr_mu_unlock(&c->mu); + GRPC_ERROR_UNREF(why); } static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, @@ -121,7 +123,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, // before destroying them, even if we know that there are no // pending read/write callbacks. This should be fixed, at which // point this can be removed. - grpc_endpoint_shutdown(exec_ctx, args->endpoint); + grpc_endpoint_shutdown(exec_ctx, args->endpoint, GRPC_ERROR_REF(error)); grpc_endpoint_destroy(exec_ctx, args->endpoint); grpc_channel_args_destroy(exec_ctx, args->args); grpc_slice_buffer_destroy_internal(exec_ctx, args->read_buffer); @@ -195,7 +197,9 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_closure *notify = c->notify; c->notify = NULL; grpc_closure_sched(exec_ctx, notify, error); - if (c->endpoint != NULL) grpc_endpoint_shutdown(exec_ctx, c->endpoint); + if (c->endpoint != NULL) { + grpc_endpoint_shutdown(exec_ctx, c->endpoint, GRPC_ERROR_REF(error)); + } gpr_mu_unlock(&c->mu); chttp2_connector_unref(exec_ctx, arg); } else { diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.c b/src/core/ext/transport/chttp2/server/chttp2_server.c index 56a1a0de9b3a6e23f58e4243657ebe3688eba770..ae2c3838ed9ad9e05e9e3fdea4eeedc9b40a9662 100644 --- a/src/core/ext/transport/chttp2/server/chttp2_server.c +++ b/src/core/ext/transport/chttp2/server/chttp2_server.c @@ -101,16 +101,19 @@ static void pending_handshake_manager_remove_locked( } static void pending_handshake_manager_shutdown_locked(grpc_exec_ctx *exec_ctx, - server_state *state) { + server_state *state, + grpc_error *why) { pending_handshake_manager_node *prev_node = NULL; for (pending_handshake_manager_node *node = state->pending_handshake_mgrs; node != NULL; node = node->next) { - grpc_handshake_manager_shutdown(exec_ctx, node->handshake_mgr); + grpc_handshake_manager_shutdown(exec_ctx, node->handshake_mgr, + GRPC_ERROR_REF(why)); gpr_free(prev_node); prev_node = node; } gpr_free(prev_node); state->pending_handshake_mgrs = NULL; + GRPC_ERROR_UNREF(why); } static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, @@ -129,7 +132,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, // before destroying them, even if we know that there are no // pending read/write callbacks. This should be fixed, at which // point this can be removed. - grpc_endpoint_shutdown(exec_ctx, args->endpoint); + grpc_endpoint_shutdown(exec_ctx, args->endpoint, GRPC_ERROR_NONE); grpc_endpoint_destroy(exec_ctx, args->endpoint); grpc_channel_args_destroy(exec_ctx, args->args); grpc_slice_buffer_destroy_internal(exec_ctx, args->read_buffer); @@ -210,7 +213,8 @@ static void tcp_server_shutdown_complete(grpc_exec_ctx *exec_ctx, void *arg, gpr_mu_lock(&state->mu); grpc_closure *destroy_done = state->server_destroy_listener_done; GPR_ASSERT(state->shutdown); - pending_handshake_manager_shutdown_locked(exec_ctx, state); + pending_handshake_manager_shutdown_locked(exec_ctx, state, + GRPC_ERROR_REF(error)); gpr_mu_unlock(&state->mu); // Flush queued work before destroying handshaker factory, since that // may do a synchronous unref. diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 2004bc6437ef7d0abfc3bdbfa27392e3a514e08f..15f486d6760ccf78d95359afe3ece787f81791d2 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -417,7 +417,7 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx, t->closed = 1; connectivity_state_set(exec_ctx, t, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "close_transport"); - grpc_endpoint_shutdown(exec_ctx, t->ep); + grpc_endpoint_shutdown(exec_ctx, t->ep, GRPC_ERROR_REF(error)); /* flush writable stream list to avoid dangling references */ grpc_chttp2_stream *s; diff --git a/src/core/lib/channel/handshaker.c b/src/core/lib/channel/handshaker.c index c052ca538526861df51bf56c5cd69cbeb41c7c72..5bed2d041d86e10b21c866d10a73dad5215f508e 100644 --- a/src/core/lib/channel/handshaker.c +++ b/src/core/lib/channel/handshaker.c @@ -55,8 +55,8 @@ void grpc_handshaker_destroy(grpc_exec_ctx* exec_ctx, } void grpc_handshaker_shutdown(grpc_exec_ctx* exec_ctx, - grpc_handshaker* handshaker) { - handshaker->vtable->shutdown(exec_ctx, handshaker); + grpc_handshaker* handshaker, grpc_error* why) { + handshaker->vtable->shutdown(exec_ctx, handshaker, why); } void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx, @@ -141,14 +141,17 @@ void grpc_handshake_manager_destroy(grpc_exec_ctx* exec_ctx, } void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx, - grpc_handshake_manager* mgr) { + grpc_handshake_manager* mgr, + grpc_error* why) { gpr_mu_lock(&mgr->mu); // Shutdown the handshaker that's currently in progress, if any. if (!mgr->shutdown && mgr->index > 0) { mgr->shutdown = true; - grpc_handshaker_shutdown(exec_ctx, mgr->handshakers[mgr->index - 1]); + grpc_handshaker_shutdown(exec_ctx, mgr->handshakers[mgr->index - 1], + GRPC_ERROR_REF(why)); } gpr_mu_unlock(&mgr->mu); + GRPC_ERROR_UNREF(why); } // Helper function to call either the next handshaker or the @@ -197,7 +200,8 @@ static void call_next_handshaker(grpc_exec_ctx* exec_ctx, void* arg, static void on_timeout(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { grpc_handshake_manager* mgr = arg; if (error == GRPC_ERROR_NONE) { // Timer fired, rather than being cancelled. - grpc_handshake_manager_shutdown(exec_ctx, mgr); + grpc_handshake_manager_shutdown(exec_ctx, mgr, + GRPC_ERROR_CREATE("Handshake timed out")); } grpc_handshake_manager_unref(exec_ctx, mgr); } diff --git a/src/core/lib/channel/handshaker.h b/src/core/lib/channel/handshaker.h index 450b7adaee0baa45a327ba6b666b9f979f41b6ab..a8e3692add4eac4b1ab0baa6eb9c7f766a5bbda0 100644 --- a/src/core/lib/channel/handshaker.h +++ b/src/core/lib/channel/handshaker.h @@ -86,7 +86,8 @@ typedef struct { /// Shuts down the handshaker (e.g., to clean up when the operation is /// aborted in the middle). - void (*shutdown)(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker); + void (*shutdown)(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker, + grpc_error* why); /// Performs handshaking, modifying \a args as needed (e.g., to /// replace \a endpoint with a wrapped endpoint). @@ -111,7 +112,7 @@ void grpc_handshaker_init(const grpc_handshaker_vtable* vtable, void grpc_handshaker_destroy(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker); void grpc_handshaker_shutdown(grpc_exec_ctx* exec_ctx, - grpc_handshaker* handshaker); + grpc_handshaker* handshaker, grpc_error* why); void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker, grpc_tcp_server_acceptor* acceptor, @@ -141,7 +142,8 @@ void grpc_handshake_manager_destroy(grpc_exec_ctx* exec_ctx, /// The caller must still call grpc_handshake_manager_destroy() after /// calling this function. void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx, - grpc_handshake_manager* mgr); + grpc_handshake_manager* mgr, + grpc_error* why); /// Invokes handshakers in the order they were added. /// Takes ownership of \a endpoint, and then passes that ownership to diff --git a/src/core/lib/iomgr/endpoint.c b/src/core/lib/iomgr/endpoint.c index 2d300f456084e486947c03fe345ec791b420e0d0..bf6e98146a4e91d294cea4816bc056629c5ce268 100644 --- a/src/core/lib/iomgr/endpoint.c +++ b/src/core/lib/iomgr/endpoint.c @@ -54,8 +54,9 @@ void grpc_endpoint_add_to_pollset_set(grpc_exec_ctx* exec_ctx, ep->vtable->add_to_pollset_set(exec_ctx, ep, pollset_set); } -void grpc_endpoint_shutdown(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep) { - ep->vtable->shutdown(exec_ctx, ep); +void grpc_endpoint_shutdown(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, + grpc_error* why) { + ep->vtable->shutdown(exec_ctx, ep, why); } void grpc_endpoint_destroy(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep) { diff --git a/src/core/lib/iomgr/endpoint.h b/src/core/lib/iomgr/endpoint.h index 1609b64f2b097bb03b36c2706b30d88b96ce3123..740357ecc5480412a933388240265f8292b51ed9 100644 --- a/src/core/lib/iomgr/endpoint.h +++ b/src/core/lib/iomgr/endpoint.h @@ -57,7 +57,7 @@ struct grpc_endpoint_vtable { grpc_pollset *pollset); void (*add_to_pollset_set)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_pollset_set *pollset); - void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep); + void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_error *why); void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep); grpc_resource_user *(*get_resource_user)(grpc_endpoint *ep); char *(*get_peer)(grpc_endpoint *ep); @@ -96,7 +96,8 @@ void grpc_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, /* Causes any pending and future read/write callbacks to run immediately with success==0 */ -void grpc_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep); +void grpc_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, + grpc_error *why); void grpc_endpoint_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep); /* Add an endpoint to a pollset, so that when the pollset is polled, events from diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c index 39b4d9cbd7b1f93d6da248a14733767a1dbb2d16..51842fc208144478f32cec6cc9bde3f5164d7fbc 100644 --- a/src/core/lib/iomgr/ev_epoll_linux.c +++ b/src/core/lib/iomgr/ev_epoll_linux.c @@ -143,6 +143,7 @@ struct grpc_fd { /* Indicates that the fd is shutdown and that any pending read/write closures should fail */ bool shutdown; + grpc_error *shutdown_error; /* reason for shutdown: set iff shutdown==true */ /* The fd is either closed or we relinquished control of it. In either cases, this indicates that the 'fd' on this structure is no longer valid */ @@ -907,6 +908,7 @@ static void unref_by(grpc_fd *fd, int n) { fd->freelist_next = fd_freelist; fd_freelist = fd; grpc_iomgr_unregister_object(&fd->iomgr_object); + if (fd->shutdown) GRPC_ERROR_UNREF(fd->shutdown_error); gpr_mu_unlock(&fd_freelist_mu); } else { @@ -1058,11 +1060,11 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, GRPC_ERROR_UNREF(error); } -static grpc_error *fd_shutdown_error(bool shutdown) { - if (!shutdown) { +static grpc_error *fd_shutdown_error(grpc_fd *fd) { + if (!fd->shutdown) { return GRPC_ERROR_NONE; } else { - return GRPC_ERROR_CREATE("FD shutdown"); + return GRPC_ERROR_CREATE_REFERENCING("FD shutdown", &fd->shutdown_error, 1); } } @@ -1076,7 +1078,7 @@ static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, } else if (*st == CLOSURE_READY) { /* already ready ==> queue the closure to run immediately */ *st = CLOSURE_NOT_READY; - grpc_closure_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown)); + grpc_closure_sched(exec_ctx, closure, fd_shutdown_error(fd)); } else { /* upcallptr was set to a different closure. This is an error! */ gpr_log(GPR_ERROR, @@ -1098,7 +1100,7 @@ static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, return 0; } else { /* waiting ==> queue closure */ - grpc_closure_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown)); + grpc_closure_sched(exec_ctx, *st, fd_shutdown_error(fd)); *st = CLOSURE_NOT_READY; return 1; } @@ -1123,17 +1125,20 @@ static bool fd_is_shutdown(grpc_fd *fd) { } /* Might be called multiple times */ -static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { +static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) { gpr_mu_lock(&fd->po.mu); /* Do the actual shutdown only once */ if (!fd->shutdown) { fd->shutdown = true; + fd->shutdown_error = why; shutdown(fd->fd, SHUT_RDWR); /* Flush any pending read and write closures. Since fd->shutdown is 'true' at this point, the closures would be called with 'success = false' */ set_ready_locked(exec_ctx, fd, &fd->read_closure); set_ready_locked(exec_ctx, fd, &fd->write_closure); + } else { + GRPC_ERROR_UNREF(why); } gpr_mu_unlock(&fd->po.mu); } diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index 9477ac36884391270ce79bc0b26fc4712e4b3960..ca129322194fdd8c052e63a652bc16befa87ac59 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -82,6 +82,7 @@ struct grpc_fd { int shutdown; int closed; int released; + grpc_error *shutdown_error; /* The watcher list. @@ -306,6 +307,7 @@ static void unref_by(grpc_fd *fd, int n) { if (old == n) { gpr_mu_destroy(&fd->mu); grpc_iomgr_unregister_object(&fd->iomgr_object); + if (fd->shutdown) GRPC_ERROR_UNREF(fd->shutdown_error); gpr_free(fd); } else { GPR_ASSERT(old > n); @@ -444,11 +446,11 @@ static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); } static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); } #endif -static grpc_error *fd_shutdown_error(bool shutdown) { - if (!shutdown) { +static grpc_error *fd_shutdown_error(grpc_fd *fd) { + if (!fd->shutdown) { return GRPC_ERROR_NONE; } else { - return GRPC_ERROR_CREATE("FD shutdown"); + return GRPC_ERROR_CREATE_REFERENCING("FD shutdown", &fd->shutdown_error, 1); } } @@ -462,7 +464,7 @@ static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, } else if (*st == CLOSURE_READY) { /* already ready ==> queue the closure to run immediately */ *st = CLOSURE_NOT_READY; - grpc_closure_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown)); + grpc_closure_sched(exec_ctx, closure, fd_shutdown_error(fd)); maybe_wake_one_watcher_locked(fd); } else { /* upcallptr was set to a different closure. This is an error! */ @@ -485,7 +487,7 @@ static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, return 0; } else { /* waiting ==> queue closure */ - grpc_closure_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown)); + grpc_closure_sched(exec_ctx, *st, fd_shutdown_error(fd)); *st = CLOSURE_NOT_READY; return 1; } @@ -496,15 +498,18 @@ static void set_read_notifier_pollset_locked( fd->read_notifier_pollset = read_notifier_pollset; } -static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { +static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) { gpr_mu_lock(&fd->mu); /* only shutdown once */ if (!fd->shutdown) { fd->shutdown = 1; + fd->shutdown_error = why; /* signal read/write closed to OS so that future operations fail */ shutdown(fd->fd, SHUT_RDWR); set_ready_locked(exec_ctx, fd, &fd->read_closure); set_ready_locked(exec_ctx, fd, &fd->write_closure); + } else { + GRPC_ERROR_UNREF(why); } gpr_mu_unlock(&fd->mu); } diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c index c106ba540065d764614c6bde5affe9b926072f86..5bb55631d69dae86aada76c681b043fbd1bd8289 100644 --- a/src/core/lib/iomgr/ev_posix.c +++ b/src/core/lib/iomgr/ev_posix.c @@ -162,8 +162,8 @@ void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done, g_event_engine->fd_orphan(exec_ctx, fd, on_done, release_fd, reason); } -void grpc_fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { - g_event_engine->fd_shutdown(exec_ctx, fd); +void grpc_fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) { + g_event_engine->fd_shutdown(exec_ctx, fd, why); } bool grpc_fd_is_shutdown(grpc_fd *fd) { diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h index 1068a4bad519f81cff6e3a17ecab2711527cc948..a589efdeece264c814fa1d49b405e6978b6d75d4 100644 --- a/src/core/lib/iomgr/ev_posix.h +++ b/src/core/lib/iomgr/ev_posix.h @@ -51,7 +51,7 @@ typedef struct grpc_event_engine_vtable { int (*fd_wrapped_fd)(grpc_fd *fd); void (*fd_orphan)(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done, int *release_fd, const char *reason); - void (*fd_shutdown)(grpc_exec_ctx *exec_ctx, grpc_fd *fd); + void (*fd_shutdown)(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why); void (*fd_notify_on_read)(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure); void (*fd_notify_on_write)(grpc_exec_ctx *exec_ctx, grpc_fd *fd, @@ -140,7 +140,7 @@ void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done, bool grpc_fd_is_shutdown(grpc_fd *fd); /* Cause any current and future callbacks to fail. */ -void grpc_fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd); +void grpc_fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why); /* Register read interest, causing read_cb to be called once when fd becomes readable, on deadline specified by deadline, or on shutdown triggered by diff --git a/src/core/lib/iomgr/network_status_tracker.c b/src/core/lib/iomgr/network_status_tracker.c index a5ca9ed2c3431c97b71aa3322500e0a0b624272e..1601a390028f81bbff9ebe6211d849528059d09d 100644 --- a/src/core/lib/iomgr/network_status_tracker.c +++ b/src/core/lib/iomgr/network_status_tracker.c @@ -117,7 +117,8 @@ void grpc_network_status_shutdown_all_endpoints() { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; for (endpoint_ll_node *curr = head; curr != NULL; curr = curr->next) { - curr->ep->vtable->shutdown(&exec_ctx, curr->ep); + curr->ep->vtable->shutdown(&exec_ctx, curr->ep, + GRPC_ERROR_CREATE("Network unavailable")); } gpr_mu_unlock(&g_endpoint_mutex); grpc_exec_ctx_finish(&exec_ctx); diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c index 16b0f4e73c19072c4901b63929c7e6ea693cf260..0144192b71e0045f01c9013e0260f8ac7d3d5ef4 100644 --- a/src/core/lib/iomgr/tcp_client_posix.c +++ b/src/core/lib/iomgr/tcp_client_posix.c @@ -121,7 +121,8 @@ static void tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { } gpr_mu_lock(&ac->mu); if (ac->fd != NULL) { - grpc_fd_shutdown(exec_ctx, ac->fd); + grpc_fd_shutdown(exec_ctx, ac->fd, + GRPC_ERROR_CREATE("connect() timed out")); } done = (--ac->refs == 0); gpr_mu_unlock(&ac->mu); diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index a33e63e845c6fb75ea835b76535cf8172f23ee96..a4381f8fc969042316b3c1f158b60d71be53a38b 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -119,9 +119,10 @@ static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, grpc_error *error); -static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { +static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, + grpc_error *why) { grpc_tcp *tcp = (grpc_tcp *)ep; - grpc_fd_shutdown(exec_ctx, tcp->em_fd); + grpc_fd_shutdown(exec_ctx, tcp->em_fd, why); grpc_resource_user_shutdown(exec_ctx, tcp->resource_user); } diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c index 20efb678b230b4e904681bb70c567f9a05df579b..e9e7511c9cc40ca76136a1104c0fe3e419acab13 100644 --- a/src/core/lib/iomgr/tcp_server_posix.c +++ b/src/core/lib/iomgr/tcp_server_posix.c @@ -276,7 +276,8 @@ static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { if (s->active_ports) { grpc_tcp_listener *sp; for (sp = s->head; sp; sp = sp->next) { - grpc_fd_shutdown(exec_ctx, sp->emfd); + grpc_fd_shutdown(exec_ctx, sp->emfd, + GRPC_ERROR_CREATE("Server destroyed")); } gpr_mu_unlock(&s->mu); } else { @@ -773,7 +774,8 @@ void grpc_tcp_server_shutdown_listeners(grpc_exec_ctx *exec_ctx, if (s->active_ports) { grpc_tcp_listener *sp; for (sp = s->head; sp; sp = sp->next) { - grpc_fd_shutdown(exec_ctx, sp->emfd); + grpc_fd_shutdown(exec_ctx, sp->emfd, + GRPC_ERROR_CREATE("Server shutdown")); } } gpr_mu_unlock(&s->mu); diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.c index 7f4ea49a1ceff2357620c75aa56ad47953ccd864..5fb398c50bd611dcace86c54d5501282018e1644 100644 --- a/src/core/lib/iomgr/tcp_uv.c +++ b/src/core/lib/iomgr/tcp_uv.c @@ -298,13 +298,15 @@ static void uv_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, static void shutdown_callback(uv_shutdown_t *req, int status) {} -static void uv_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { +static void uv_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, + grpc_error *why) { grpc_tcp *tcp = (grpc_tcp *)ep; if (!tcp->shutting_down) { tcp->shutting_down = true; uv_shutdown_t *req = &tcp->shutdown_req; uv_shutdown(req, (uv_stream_t *)tcp->handle, shutdown_callback); } + GRPC_ERROR_UNREF(why); } static void uv_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c index 84f791ba07af7abf0851d0515addf8624d7e2c7c..6c413971e33d26bd4ad56474a7be9591ac683cd5 100644 --- a/src/core/lib/iomgr/tcp_windows.c +++ b/src/core/lib/iomgr/tcp_windows.c @@ -116,6 +116,7 @@ typedef struct grpc_tcp { to protect ourselves when requesting a shutdown. */ gpr_mu mu; int shutting_down; + grpc_error *shutdown_error; char *peer_string; } grpc_tcp; @@ -125,6 +126,7 @@ static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { gpr_mu_destroy(&tcp->mu); gpr_free(tcp->peer_string); grpc_resource_user_unref(exec_ctx, tcp->resource_user); + if (tcp->shutting_down) GRPC_ERROR_UNREF(tcp->shutdown_error); gpr_free(tcp); } @@ -182,7 +184,10 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) { grpc_slice_buffer_add(tcp->read_slices, sub); } else { grpc_slice_unref_internal(exec_ctx, tcp->read_slice); - error = GRPC_ERROR_CREATE("End of TCP stream"); + error = tcp->shutting_down + ? GRPC_ERROR_CREATE_REFERENCING("TCP stream shutting down", + &tcp->shutdown_error, 1) + : GRPC_ERROR_CREATE("End of TCP stream"); } } } @@ -203,8 +208,9 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, WSABUF buffer; if (tcp->shutting_down) { - grpc_closure_sched(exec_ctx, cb, - GRPC_ERROR_CREATE("TCP socket is shutting down")); + grpc_closure_sched(exec_ctx, cb, GRPC_ERROR_CREATE_REFERENCING( + "TCP socket is shutting down", + &tcp->shutdown_error, 1)); return; } @@ -291,8 +297,9 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, size_t len; if (tcp->shutting_down) { - grpc_closure_sched(exec_ctx, cb, - GRPC_ERROR_CREATE("TCP socket is shutting down")); + grpc_closure_sched(exec_ctx, cb, GRPC_ERROR_CREATE_REFERENCING( + "TCP socket is shutting down", + &tcp->shutdown_error, 1)); return; } @@ -373,12 +380,18 @@ static void win_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, we're not going to protect against these. However the IO Completion Port callback will happen from another thread, so we need to protect against concurrent access of the data structure in that regard. */ -static void win_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { +static void win_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, + grpc_error *why) { grpc_tcp *tcp = (grpc_tcp *)ep; gpr_mu_lock(&tcp->mu); /* At that point, what may happen is that we're already inside the IOCP callback. See the comments in on_read and on_write. */ - tcp->shutting_down = 1; + if (!tcp->shutting_down) { + tcp->shutting_down = 1; + tcp->shutdown_error = why; + } else { + GRPC_ERROR_UNREF(why); + } grpc_winsocket_shutdown(tcp->socket); gpr_mu_unlock(&tcp->mu); grpc_resource_user_shutdown(exec_ctx, tcp->resource_user); diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c index dfbd295c914df62753be1474018edea0b7e9cf36..3b23b47d4f177d39288389a8921654f64910ad3c 100644 --- a/src/core/lib/iomgr/udp_server.c +++ b/src/core/lib/iomgr/udp_server.c @@ -203,7 +203,8 @@ void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, for (sp = s->head; sp; sp = sp->next) { GPR_ASSERT(sp->orphan_cb); sp->orphan_cb(sp->emfd); - grpc_fd_shutdown(exec_ctx, sp->emfd); + grpc_fd_shutdown(exec_ctx, sp->emfd, + GRPC_ERROR_CREATE("Server destroyed")); } gpr_mu_unlock(&s->mu); } else { diff --git a/src/core/lib/security/transport/secure_endpoint.c b/src/core/lib/security/transport/secure_endpoint.c index 18a7a6f7e7fefeb665c87eedbd924c8dd9cde02f..7d58843d69063311f5edb3ecc62cf662de2c8cfe 100644 --- a/src/core/lib/security/transport/secure_endpoint.c +++ b/src/core/lib/security/transport/secure_endpoint.c @@ -341,10 +341,10 @@ static void endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *secure_ep, GPR_TIMER_END("secure_endpoint.endpoint_write", 0); } -static void endpoint_shutdown(grpc_exec_ctx *exec_ctx, - grpc_endpoint *secure_ep) { +static void endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *secure_ep, + grpc_error *why) { secure_endpoint *ep = (secure_endpoint *)secure_ep; - grpc_endpoint_shutdown(exec_ctx, ep->wrapped_ep); + grpc_endpoint_shutdown(exec_ctx, ep->wrapped_ep, why); } static void endpoint_destroy(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/security/transport/security_handshaker.c b/src/core/lib/security/transport/security_handshaker.c index 37d57d759b333e9a137aad2c3b52416191aefaba..bb8a3bf6cdf5758c0518cbed98784ee390071e20 100644 --- a/src/core/lib/security/transport/security_handshaker.c +++ b/src/core/lib/security/transport/security_handshaker.c @@ -130,7 +130,7 @@ static void security_handshake_failed_locked(grpc_exec_ctx *exec_ctx, // before destroying them, even if we know that there are no // pending read/write callbacks. This should be fixed, at which // point this can be removed. - grpc_endpoint_shutdown(exec_ctx, h->args->endpoint); + grpc_endpoint_shutdown(exec_ctx, h->args->endpoint, GRPC_ERROR_REF(error)); // Not shutting down, so the write failed. Clean up before // invoking the callback. cleanup_args_for_failure_locked(exec_ctx, h); @@ -347,15 +347,17 @@ static void security_handshaker_destroy(grpc_exec_ctx *exec_ctx, } static void security_handshaker_shutdown(grpc_exec_ctx *exec_ctx, - grpc_handshaker *handshaker) { + grpc_handshaker *handshaker, + grpc_error *why) { security_handshaker *h = (security_handshaker *)handshaker; gpr_mu_lock(&h->mu); if (!h->shutdown) { h->shutdown = true; - grpc_endpoint_shutdown(exec_ctx, h->args->endpoint); + grpc_endpoint_shutdown(exec_ctx, h->args->endpoint, GRPC_ERROR_REF(why)); cleanup_args_for_failure_locked(exec_ctx, h); } gpr_mu_unlock(&h->mu); + GRPC_ERROR_UNREF(why); } static void security_handshaker_do_handshake(grpc_exec_ctx *exec_ctx, @@ -417,7 +419,10 @@ static void fail_handshaker_destroy(grpc_exec_ctx *exec_ctx, } static void fail_handshaker_shutdown(grpc_exec_ctx *exec_ctx, - grpc_handshaker *handshaker) {} + grpc_handshaker *handshaker, + grpc_error *why) { + GRPC_ERROR_UNREF(why); +} static void fail_handshaker_do_handshake(grpc_exec_ctx *exec_ctx, grpc_handshaker *handshaker, diff --git a/test/core/bad_client/bad_client.c b/test/core/bad_client/bad_client.c index a84086804cdb0e84f81032c50db1ccc2d9f616cf..b7e7606b184d35f3c93b96c8664856f5635898ee 100644 --- a/test/core/bad_client/bad_client.c +++ b/test/core/bad_client/bad_client.c @@ -163,7 +163,8 @@ void grpc_run_bad_client_test( gpr_event_wait(&a.done_write, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5))); if (flags & GRPC_BAD_CLIENT_DISCONNECT) { - grpc_endpoint_shutdown(&exec_ctx, sfd.client); + grpc_endpoint_shutdown(&exec_ctx, sfd.client, + GRPC_ERROR_CREATE("Forced Disconnect")); grpc_endpoint_destroy(&exec_ctx, sfd.client); grpc_exec_ctx_finish(&exec_ctx); sfd.client = NULL; @@ -189,7 +190,8 @@ void grpc_run_bad_client_test( grpc_slice_buffer_destroy_internal(&exec_ctx, &args.incoming); } // Shutdown. - grpc_endpoint_shutdown(&exec_ctx, sfd.client); + grpc_endpoint_shutdown(&exec_ctx, sfd.client, + GRPC_ERROR_CREATE("Test Shutdown")); grpc_endpoint_destroy(&exec_ctx, sfd.client); grpc_exec_ctx_finish(&exec_ctx); } diff --git a/test/core/client_channel/set_initial_connect_string_test.c b/test/core/client_channel/set_initial_connect_string_test.c index fc0aca0434f04433e74a1ea10d29a424e8fa9368..a0a33667cc879c4651fcd8183aaeae88c9931bb4 100644 --- a/test/core/client_channel/set_initial_connect_string_test.c +++ b/test/core/client_channel/set_initial_connect_string_test.c @@ -81,7 +81,9 @@ static void handle_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { state.incoming_buffer.length, strlen(magic_connect_string)); if (state.incoming_buffer.length > strlen(magic_connect_string)) { gpr_atm_rel_store(&state.done_atm, 1); - grpc_endpoint_shutdown(exec_ctx, state.tcp); + grpc_endpoint_shutdown( + exec_ctx, state.tcp, + GRPC_ERROR_CREATE("Incoming buffer longer than magic_connect_string")); grpc_endpoint_destroy(exec_ctx, state.tcp); } else { grpc_endpoint_read(exec_ctx, state.tcp, &state.temp_incoming_buffer, diff --git a/test/core/end2end/bad_server_response_test.c b/test/core/end2end/bad_server_response_test.c index 42bf4fa235080e2ea990f175105aeb49d1f336eb..d5f428eb8291084b794b6411ee51255878a3e0ec 100644 --- a/test/core/end2end/bad_server_response_test.c +++ b/test/core/end2end/bad_server_response_test.c @@ -298,7 +298,8 @@ static void run_test(const char *response_payload, gpr_event_wait(&ev, gpr_inf_future(GPR_CLOCK_REALTIME)); /* clean up */ - grpc_endpoint_shutdown(&exec_ctx, state.tcp); + grpc_endpoint_shutdown(&exec_ctx, state.tcp, + GRPC_ERROR_CREATE("Test Shutdown")); grpc_endpoint_destroy(&exec_ctx, state.tcp); cleanup_rpc(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx); diff --git a/test/core/end2end/fixtures/http_proxy.c b/test/core/end2end/fixtures/http_proxy.c index dac9baf3cec605b21e1e0fc9bbd90d029f30eab7..6fdc86fc12fa5ac58d9266cbd26c0e5d1c7e6d95 100644 --- a/test/core/end2end/fixtures/http_proxy.c +++ b/test/core/end2end/fixtures/http_proxy.c @@ -133,9 +133,12 @@ static void proxy_connection_failed(grpc_exec_ctx* exec_ctx, const char* msg = grpc_error_string(error); gpr_log(GPR_INFO, "%s: %s", prefix, msg); - grpc_endpoint_shutdown(exec_ctx, conn->client_endpoint); - if (conn->server_endpoint != NULL) - grpc_endpoint_shutdown(exec_ctx, conn->server_endpoint); + grpc_endpoint_shutdown(exec_ctx, conn->client_endpoint, + GRPC_ERROR_REF(error)); + if (conn->server_endpoint != NULL) { + grpc_endpoint_shutdown(exec_ctx, conn->server_endpoint, + GRPC_ERROR_REF(error)); + } proxy_connection_unref(exec_ctx, conn); } diff --git a/test/core/internal_api_canaries/iomgr.c b/test/core/internal_api_canaries/iomgr.c index 3225b9dc198f0bd79ca254c21a3a5776bb7e2603..d73d5c175c9ad2db438406e510c59b54bf0164bb 100644 --- a/test/core/internal_api_canaries/iomgr.c +++ b/test/core/internal_api_canaries/iomgr.c @@ -92,7 +92,7 @@ static void test_code(void) { grpc_endpoint_read(&exec_ctx, &endpoint, NULL, NULL); grpc_endpoint_get_peer(&endpoint); grpc_endpoint_write(&exec_ctx, &endpoint, NULL, NULL); - grpc_endpoint_shutdown(&exec_ctx, &endpoint); + grpc_endpoint_shutdown(&exec_ctx, &endpoint, GRPC_ERROR_CANCELLED); grpc_endpoint_destroy(&exec_ctx, &endpoint); grpc_endpoint_add_to_pollset(&exec_ctx, &endpoint, NULL); grpc_endpoint_add_to_pollset_set(&exec_ctx, &endpoint, NULL); diff --git a/test/core/iomgr/endpoint_tests.c b/test/core/iomgr/endpoint_tests.c index df5dd209031b98efa7e382a40a4405201766a712..bbc5f383f633006d1fb2616e6436cdeb29f1aab7 100644 --- a/test/core/iomgr/endpoint_tests.c +++ b/test/core/iomgr/endpoint_tests.c @@ -233,9 +233,11 @@ static void read_and_write_test(grpc_endpoint_test_config config, if (shutdown) { gpr_log(GPR_DEBUG, "shutdown read"); - grpc_endpoint_shutdown(&exec_ctx, state.read_ep); + grpc_endpoint_shutdown(&exec_ctx, state.read_ep, + GRPC_ERROR_CREATE("Test Shutdown")); gpr_log(GPR_DEBUG, "shutdown write"); - grpc_endpoint_shutdown(&exec_ctx, state.write_ep); + grpc_endpoint_shutdown(&exec_ctx, state.write_ep, + GRPC_ERROR_CREATE("Test Shutdown")); } grpc_exec_ctx_flush(&exec_ctx); @@ -296,7 +298,8 @@ static void multiple_shutdown_test(grpc_endpoint_test_config config) { grpc_closure_create(inc_on_failure, &fail_count, grpc_schedule_on_exec_ctx)); wait_for_fail_count(&exec_ctx, &fail_count, 0); - grpc_endpoint_shutdown(&exec_ctx, f.client_ep); + grpc_endpoint_shutdown(&exec_ctx, f.client_ep, + GRPC_ERROR_CREATE("Test Shutdown")); wait_for_fail_count(&exec_ctx, &fail_count, 1); grpc_endpoint_read(&exec_ctx, f.client_ep, &slice_buffer, grpc_closure_create(inc_on_failure, &fail_count, @@ -307,7 +310,8 @@ static void multiple_shutdown_test(grpc_endpoint_test_config config) { grpc_closure_create(inc_on_failure, &fail_count, grpc_schedule_on_exec_ctx)); wait_for_fail_count(&exec_ctx, &fail_count, 3); - grpc_endpoint_shutdown(&exec_ctx, f.client_ep); + grpc_endpoint_shutdown(&exec_ctx, f.client_ep, + GRPC_ERROR_CREATE("Test Shutdown")); wait_for_fail_count(&exec_ctx, &fail_count, 3); grpc_slice_buffer_destroy_internal(&exec_ctx, &slice_buffer); diff --git a/test/core/iomgr/ev_epoll_linux_test.c b/test/core/iomgr/ev_epoll_linux_test.c index a10be7f81bd7f47bd333c5f05c55c6374460e81b..5b05ea3338ab831b12e13b615c354e5bc5f06d05 100644 --- a/test/core/iomgr/ev_epoll_linux_test.c +++ b/test/core/iomgr/ev_epoll_linux_test.c @@ -89,7 +89,8 @@ static void test_fd_cleanup(grpc_exec_ctx *exec_ctx, test_fd *tfds, int i; for (i = 0; i < num_fds; i++) { - grpc_fd_shutdown(exec_ctx, tfds[i].fd); + grpc_fd_shutdown(exec_ctx, tfds[i].fd, + GRPC_ERROR_CREATE("test_fd_cleanup")); grpc_exec_ctx_flush(exec_ctx); grpc_fd_orphan(exec_ctx, tfds[i].fd, NULL, &release_fd, "test_fd_cleanup"); diff --git a/test/core/iomgr/fd_posix_test.c b/test/core/iomgr/fd_posix_test.c index a617bfc6469937a09e5fa60284eedceaecc22592..4726e935d893fe547639e424091858a8854c41ed 100644 --- a/test/core/iomgr/fd_posix_test.c +++ b/test/core/iomgr/fd_posix_test.c @@ -132,7 +132,8 @@ static void session_shutdown_cb(grpc_exec_ctx *exec_ctx, void *arg, /*session */ grpc_fd_orphan(exec_ctx, se->em_fd, NULL, NULL, "a"); gpr_free(se); /* Start to shutdown listen fd. */ - grpc_fd_shutdown(exec_ctx, sv->em_fd); + grpc_fd_shutdown(exec_ctx, sv->em_fd, + GRPC_ERROR_CREATE("session_shutdown_cb")); } /* Called when data become readable in a session. */ diff --git a/test/core/iomgr/tcp_client_posix_test.c b/test/core/iomgr/tcp_client_posix_test.c index 0ea7a000eb430f074eb8e8723a6f3bf986162db1..6bb00bc7872afb56ce05c8af8a7b103242489e17 100644 --- a/test/core/iomgr/tcp_client_posix_test.c +++ b/test/core/iomgr/tcp_client_posix_test.c @@ -72,7 +72,8 @@ static void must_succeed(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { GPR_ASSERT(g_connecting != NULL); GPR_ASSERT(error == GRPC_ERROR_NONE); - grpc_endpoint_shutdown(exec_ctx, g_connecting); + grpc_endpoint_shutdown(exec_ctx, g_connecting, + GRPC_ERROR_CREATE("must_succeed called")); grpc_endpoint_destroy(exec_ctx, g_connecting); g_connecting = NULL; finish_connection(); diff --git a/test/core/iomgr/tcp_server_posix_test.c b/test/core/iomgr/tcp_server_posix_test.c index 020f0059802e371e4d1fa7714e674609dcbd7740..417bc5111a08c5f82a21016b7380558d545d99b6 100644 --- a/test/core/iomgr/tcp_server_posix_test.c +++ b/test/core/iomgr/tcp_server_posix_test.c @@ -121,7 +121,7 @@ static void server_weak_ref_set(server_weak_ref *weak_ref, static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, grpc_pollset *pollset, grpc_tcp_server_acceptor *acceptor) { - grpc_endpoint_shutdown(exec_ctx, tcp); + grpc_endpoint_shutdown(exec_ctx, tcp, GRPC_ERROR_CREATE("Connected")); grpc_endpoint_destroy(exec_ctx, tcp); on_connect_result temp_result; diff --git a/test/core/security/secure_endpoint_test.c b/test/core/security/secure_endpoint_test.c index bcc50d0bb0e00ab7b52d4ab2d70cf0e78ab33cc0..97e9c3d6db504a45e36697f4140cccb47931bad1 100644 --- a/test/core/security/secure_endpoint_test.c +++ b/test/core/security/secure_endpoint_test.c @@ -166,8 +166,10 @@ static void test_leftover(grpc_endpoint_test_config config, size_t slice_size) { GPR_ASSERT(incoming.count == 1); GPR_ASSERT(grpc_slice_eq(s, incoming.slices[0])); - grpc_endpoint_shutdown(&exec_ctx, f.client_ep); - grpc_endpoint_shutdown(&exec_ctx, f.server_ep); + grpc_endpoint_shutdown(&exec_ctx, f.client_ep, + GRPC_ERROR_CREATE("test_leftover end")); + grpc_endpoint_shutdown(&exec_ctx, f.server_ep, + GRPC_ERROR_CREATE("test_leftover end")); grpc_endpoint_destroy(&exec_ctx, f.client_ep); grpc_endpoint_destroy(&exec_ctx, f.server_ep); grpc_exec_ctx_finish(&exec_ctx); diff --git a/test/core/security/ssl_server_fuzzer.c b/test/core/security/ssl_server_fuzzer.c index 55e8f5e78d23042b28cea890b008c66932ddba5c..f789278add8f8abd1a71298d411224c1aabed78d 100644 --- a/test/core/security/ssl_server_fuzzer.c +++ b/test/core/security/ssl_server_fuzzer.c @@ -121,7 +121,8 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) { // server will wait for more data. Explicitly fail the server by shutting down // the endpoint. if (!state.done_callback_called) { - grpc_endpoint_shutdown(&exec_ctx, mock_endpoint); + grpc_endpoint_shutdown(&exec_ctx, mock_endpoint, + GRPC_ERROR_CREATE("Explicit close")); grpc_exec_ctx_flush(&exec_ctx); } diff --git a/test/core/surface/concurrent_connectivity_test.c b/test/core/surface/concurrent_connectivity_test.c index 8ebe8d07e4c6a7c6568dd4d7b35dee0789c286d8..7071f93d8dbf1ed3ad47792b014350b44eb1e054 100644 --- a/test/core/surface/concurrent_connectivity_test.c +++ b/test/core/surface/concurrent_connectivity_test.c @@ -107,7 +107,7 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *vargs, grpc_endpoint *tcp, grpc_tcp_server_acceptor *acceptor) { gpr_free(acceptor); struct server_thread_args *args = (struct server_thread_args *)vargs; - grpc_endpoint_shutdown(exec_ctx, tcp); + grpc_endpoint_shutdown(exec_ctx, tcp, GRPC_ERROR_CREATE("Connected")); grpc_endpoint_destroy(exec_ctx, tcp); GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(args->pollset, NULL)); } diff --git a/test/core/util/mock_endpoint.c b/test/core/util/mock_endpoint.c index 04793bceabe28aa1882db5ca9fceff81b1648474..d531ec60310dc0a4eda077b33c24bf05d1e7a5a6 100644 --- a/test/core/util/mock_endpoint.c +++ b/test/core/util/mock_endpoint.c @@ -78,16 +78,18 @@ static void me_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, static void me_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_pollset_set *pollset) {} -static void me_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { +static void me_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, + grpc_error *why) { grpc_mock_endpoint *m = (grpc_mock_endpoint *)ep; gpr_mu_lock(&m->mu); if (m->on_read) { - grpc_closure_sched(exec_ctx, m->on_read, - GRPC_ERROR_CREATE("Endpoint Shutdown")); + grpc_closure_sched(exec_ctx, m->on_read, GRPC_ERROR_CREATE_REFERENCING( + "Endpoint Shutdown", &why, 1)); m->on_read = NULL; } gpr_mu_unlock(&m->mu); grpc_resource_user_shutdown(exec_ctx, m->resource_user); + GRPC_ERROR_UNREF(why); } static void me_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { diff --git a/test/core/util/passthru_endpoint.c b/test/core/util/passthru_endpoint.c index d42ec7f9e8cb3e46be8a2665ceee8e497747d71e..2ad019ddb0879a5139167367d5b1c849c64ed736 100644 --- a/test/core/util/passthru_endpoint.c +++ b/test/core/util/passthru_endpoint.c @@ -109,21 +109,25 @@ static void me_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, static void me_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_pollset_set *pollset) {} -static void me_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { +static void me_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, + grpc_error *why) { half *m = (half *)ep; gpr_mu_lock(&m->parent->mu); m->parent->shutdown = true; if (m->on_read) { - grpc_closure_sched(exec_ctx, m->on_read, GRPC_ERROR_CREATE("Shutdown")); + grpc_closure_sched(exec_ctx, m->on_read, + GRPC_ERROR_CREATE_REFERENCING("Shutdown", &why, 1)); m->on_read = NULL; } m = other_half(m); if (m->on_read) { - grpc_closure_sched(exec_ctx, m->on_read, GRPC_ERROR_CREATE("Shutdown")); + grpc_closure_sched(exec_ctx, m->on_read, + GRPC_ERROR_CREATE_REFERENCING("Shutdown", &why, 1)); m->on_read = NULL; } gpr_mu_unlock(&m->parent->mu); grpc_resource_user_shutdown(exec_ctx, m->resource_user); + GRPC_ERROR_UNREF(why); } static void me_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { diff --git a/test/core/util/reconnect_server.c b/test/core/util/reconnect_server.c index 7bf83a74a1fda08295947980a337365c26ddda49..7fbd0ca6aaa60553b61747c746f2c24a384f07da 100644 --- a/test/core/util/reconnect_server.c +++ b/test/core/util/reconnect_server.c @@ -80,7 +80,7 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME); timestamp_list *new_tail; peer = grpc_endpoint_get_peer(tcp); - grpc_endpoint_shutdown(exec_ctx, tcp); + grpc_endpoint_shutdown(exec_ctx, tcp, GRPC_ERROR_CREATE("Connected")); grpc_endpoint_destroy(exec_ctx, tcp); if (peer) { last_colon = strrchr(peer, ':');