From cda759d658980346eb6a5673a36cda7f1906da3e Mon Sep 17 00:00:00 2001 From: Craig Tiller <ctiller@google.com> Date: Fri, 27 Jan 2017 11:37:37 -0800 Subject: [PATCH] Add an error to fd_shutdown (and recursively) Allows diagnosing WHY a file descriptor was shutdown prematurely. --- src/core/ext/client_channel/connector.c | 6 +++--- src/core/ext/client_channel/connector.h | 7 ++++--- .../client_channel/http_connect_handshaker.c | 10 +++++++--- src/core/ext/client_channel/subchannel.c | 3 ++- .../transport/chttp2/client/chttp2_connector.c | 14 +++++++++----- .../ext/transport/chttp2/server/chttp2_server.c | 12 ++++++++---- .../chttp2/transport/chttp2_transport.c | 2 +- src/core/lib/channel/handshaker.c | 14 +++++++++----- src/core/lib/channel/handshaker.h | 8 +++++--- src/core/lib/iomgr/endpoint.c | 5 +++-- src/core/lib/iomgr/endpoint.h | 5 +++-- src/core/lib/iomgr/ev_epoll_linux.c | 17 +++++++++++------ src/core/lib/iomgr/ev_poll_posix.c | 17 +++++++++++------ src/core/lib/iomgr/ev_posix.c | 4 ++-- src/core/lib/iomgr/ev_posix.h | 4 ++-- src/core/lib/iomgr/network_status_tracker.c | 3 ++- src/core/lib/iomgr/tcp_client_posix.c | 3 ++- src/core/lib/iomgr/tcp_posix.c | 5 +++-- src/core/lib/iomgr/tcp_server_posix.c | 6 ++++-- src/core/lib/iomgr/udp_server.c | 3 ++- .../lib/security/transport/secure_endpoint.c | 6 +++--- .../security/transport/security_handshaker.c | 13 +++++++++---- test/core/bad_client/bad_client.c | 6 ++++-- .../set_initial_connect_string_test.c | 4 +++- test/core/end2end/bad_server_response_test.c | 3 ++- test/core/end2end/fixtures/http_proxy.c | 9 ++++++--- test/core/internal_api_canaries/iomgr.c | 2 +- test/core/iomgr/endpoint_tests.c | 12 ++++++++---- test/core/iomgr/ev_epoll_linux_test.c | 3 ++- test/core/iomgr/fd_posix_test.c | 3 ++- test/core/iomgr/tcp_client_posix_test.c | 3 ++- test/core/iomgr/tcp_server_posix_test.c | 2 +- test/core/security/secure_endpoint_test.c | 6 ++++-- test/core/security/ssl_server_fuzzer.c | 3 ++- .../core/surface/concurrent_connectivity_test.c | 2 +- test/core/util/mock_endpoint.c | 8 +++++--- test/core/util/passthru_endpoint.c | 10 +++++++--- test/core/util/reconnect_server.c | 2 +- 38 files changed, 156 insertions(+), 89 deletions(-) diff --git a/src/core/ext/client_channel/connector.c b/src/core/ext/client_channel/connector.c index 0582e5b372..7a720fd1bd 100644 --- a/src/core/ext/client_channel/connector.c +++ b/src/core/ext/client_channel/connector.c @@ -49,7 +49,7 @@ void grpc_connector_connect(grpc_exec_ctx* exec_ctx, grpc_connector* connector, connector->vtable->connect(exec_ctx, connector, in_args, out_args, notify); } -void grpc_connector_shutdown(grpc_exec_ctx* exec_ctx, - grpc_connector* connector) { - connector->vtable->shutdown(exec_ctx, connector); +void grpc_connector_shutdown(grpc_exec_ctx* exec_ctx, grpc_connector* connector, + grpc_error* why) { + connector->vtable->shutdown(exec_ctx, connector, why); } diff --git a/src/core/ext/client_channel/connector.h b/src/core/ext/client_channel/connector.h index 395f89b3b2..9bff41f003 100644 --- a/src/core/ext/client_channel/connector.h +++ b/src/core/ext/client_channel/connector.h @@ -68,7 +68,8 @@ struct grpc_connector_vtable { void (*ref)(grpc_connector *connector); void (*unref)(grpc_exec_ctx *exec_ctx, grpc_connector *connector); /** Implementation of grpc_connector_shutdown */ - void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_connector *connector); + void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_connector *connector, + grpc_error *why); /** Implementation of grpc_connector_connect */ void (*connect)(grpc_exec_ctx *exec_ctx, grpc_connector *connector, const grpc_connect_in_args *in_args, @@ -83,7 +84,7 @@ void grpc_connector_connect(grpc_exec_ctx *exec_ctx, grpc_connector *connector, grpc_connect_out_args *out_args, grpc_closure *notify); /** Cancel any pending connection */ -void grpc_connector_shutdown(grpc_exec_ctx *exec_ctx, - grpc_connector *connector); +void grpc_connector_shutdown(grpc_exec_ctx *exec_ctx, grpc_connector *connector, + grpc_error *why); #endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_CONNECTOR_H */ diff --git a/src/core/ext/client_channel/http_connect_handshaker.c b/src/core/ext/client_channel/http_connect_handshaker.c index 622d236320..58ab233f1b 100644 --- a/src/core/ext/client_channel/http_connect_handshaker.c +++ b/src/core/ext/client_channel/http_connect_handshaker.c @@ -123,7 +123,8 @@ static void handshake_failed_locked(grpc_exec_ctx* exec_ctx, // before destroying them, even if we know that there are no // pending read/write callbacks. This should be fixed, at which // point this can be removed. - grpc_endpoint_shutdown(exec_ctx, handshaker->args->endpoint); + grpc_endpoint_shutdown(exec_ctx, handshaker->args->endpoint, + GRPC_ERROR_REF(error)); // Not shutting down, so the handshake failed. Clean up before // invoking the callback. cleanup_args_for_failure_locked(exec_ctx, handshaker); @@ -251,15 +252,18 @@ static void http_connect_handshaker_destroy(grpc_exec_ctx* exec_ctx, } static void http_connect_handshaker_shutdown(grpc_exec_ctx* exec_ctx, - grpc_handshaker* handshaker_in) { + grpc_handshaker* handshaker_in, + grpc_error* why) { http_connect_handshaker* handshaker = (http_connect_handshaker*)handshaker_in; gpr_mu_lock(&handshaker->mu); if (!handshaker->shutdown) { handshaker->shutdown = true; - grpc_endpoint_shutdown(exec_ctx, handshaker->args->endpoint); + grpc_endpoint_shutdown(exec_ctx, handshaker->args->endpoint, + GRPC_ERROR_REF(why)); cleanup_args_for_failure_locked(exec_ctx, handshaker); } gpr_mu_unlock(&handshaker->mu); + GRPC_ERROR_UNREF(why); } static void http_connect_handshaker_do_handshake( diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c index c5041f6924..f1e4e079e2 100644 --- a/src/core/ext/client_channel/subchannel.c +++ b/src/core/ext/client_channel/subchannel.c @@ -273,7 +273,8 @@ static void disconnect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { gpr_mu_lock(&c->mu); GPR_ASSERT(!c->disconnected); c->disconnected = true; - grpc_connector_shutdown(exec_ctx, c->connector); + grpc_connector_shutdown(exec_ctx, c->connector, + GRPC_ERROR_CREATE("Subchannel disconnected")); con = GET_CONNECTED_SUBCHANNEL(c, no_barrier); if (con != NULL) { GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, con, "connection"); diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.c b/src/core/ext/transport/chttp2/client/chttp2_connector.c index 013c96dc70..d0a762a280 100644 --- a/src/core/ext/transport/chttp2/client/chttp2_connector.c +++ b/src/core/ext/transport/chttp2/client/chttp2_connector.c @@ -92,19 +92,21 @@ static void chttp2_connector_unref(grpc_exec_ctx *exec_ctx, } static void chttp2_connector_shutdown(grpc_exec_ctx *exec_ctx, - grpc_connector *con) { + grpc_connector *con, grpc_error *why) { chttp2_connector *c = (chttp2_connector *)con; gpr_mu_lock(&c->mu); c->shutdown = true; if (c->handshake_mgr != NULL) { - grpc_handshake_manager_shutdown(exec_ctx, c->handshake_mgr); + grpc_handshake_manager_shutdown(exec_ctx, c->handshake_mgr, + GRPC_ERROR_REF(why)); } // If handshaking is not yet in progress, shutdown the endpoint. // Otherwise, the handshaker will do this for us. if (!c->connecting && c->endpoint != NULL) { - grpc_endpoint_shutdown(exec_ctx, c->endpoint); + grpc_endpoint_shutdown(exec_ctx, c->endpoint, GRPC_ERROR_REF(why)); } gpr_mu_unlock(&c->mu); + GRPC_ERROR_UNREF(why); } static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, @@ -121,7 +123,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, // before destroying them, even if we know that there are no // pending read/write callbacks. This should be fixed, at which // point this can be removed. - grpc_endpoint_shutdown(exec_ctx, args->endpoint); + grpc_endpoint_shutdown(exec_ctx, args->endpoint, GRPC_ERROR_REF(error)); grpc_endpoint_destroy(exec_ctx, args->endpoint); grpc_channel_args_destroy(exec_ctx, args->args); grpc_slice_buffer_destroy_internal(exec_ctx, args->read_buffer); @@ -195,7 +197,9 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_closure *notify = c->notify; c->notify = NULL; grpc_closure_sched(exec_ctx, notify, error); - if (c->endpoint != NULL) grpc_endpoint_shutdown(exec_ctx, c->endpoint); + if (c->endpoint != NULL) { + grpc_endpoint_shutdown(exec_ctx, c->endpoint, GRPC_ERROR_REF(error)); + } gpr_mu_unlock(&c->mu); chttp2_connector_unref(exec_ctx, arg); } else { diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.c b/src/core/ext/transport/chttp2/server/chttp2_server.c index 56a1a0de9b..ae2c3838ed 100644 --- a/src/core/ext/transport/chttp2/server/chttp2_server.c +++ b/src/core/ext/transport/chttp2/server/chttp2_server.c @@ -101,16 +101,19 @@ static void pending_handshake_manager_remove_locked( } static void pending_handshake_manager_shutdown_locked(grpc_exec_ctx *exec_ctx, - server_state *state) { + server_state *state, + grpc_error *why) { pending_handshake_manager_node *prev_node = NULL; for (pending_handshake_manager_node *node = state->pending_handshake_mgrs; node != NULL; node = node->next) { - grpc_handshake_manager_shutdown(exec_ctx, node->handshake_mgr); + grpc_handshake_manager_shutdown(exec_ctx, node->handshake_mgr, + GRPC_ERROR_REF(why)); gpr_free(prev_node); prev_node = node; } gpr_free(prev_node); state->pending_handshake_mgrs = NULL; + GRPC_ERROR_UNREF(why); } static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, @@ -129,7 +132,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, // before destroying them, even if we know that there are no // pending read/write callbacks. This should be fixed, at which // point this can be removed. - grpc_endpoint_shutdown(exec_ctx, args->endpoint); + grpc_endpoint_shutdown(exec_ctx, args->endpoint, GRPC_ERROR_NONE); grpc_endpoint_destroy(exec_ctx, args->endpoint); grpc_channel_args_destroy(exec_ctx, args->args); grpc_slice_buffer_destroy_internal(exec_ctx, args->read_buffer); @@ -210,7 +213,8 @@ static void tcp_server_shutdown_complete(grpc_exec_ctx *exec_ctx, void *arg, gpr_mu_lock(&state->mu); grpc_closure *destroy_done = state->server_destroy_listener_done; GPR_ASSERT(state->shutdown); - pending_handshake_manager_shutdown_locked(exec_ctx, state); + pending_handshake_manager_shutdown_locked(exec_ctx, state, + GRPC_ERROR_REF(error)); gpr_mu_unlock(&state->mu); // Flush queued work before destroying handshaker factory, since that // may do a synchronous unref. diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 2004bc6437..15f486d676 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -417,7 +417,7 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx, t->closed = 1; connectivity_state_set(exec_ctx, t, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "close_transport"); - grpc_endpoint_shutdown(exec_ctx, t->ep); + grpc_endpoint_shutdown(exec_ctx, t->ep, GRPC_ERROR_REF(error)); /* flush writable stream list to avoid dangling references */ grpc_chttp2_stream *s; diff --git a/src/core/lib/channel/handshaker.c b/src/core/lib/channel/handshaker.c index c052ca5385..5bed2d041d 100644 --- a/src/core/lib/channel/handshaker.c +++ b/src/core/lib/channel/handshaker.c @@ -55,8 +55,8 @@ void grpc_handshaker_destroy(grpc_exec_ctx* exec_ctx, } void grpc_handshaker_shutdown(grpc_exec_ctx* exec_ctx, - grpc_handshaker* handshaker) { - handshaker->vtable->shutdown(exec_ctx, handshaker); + grpc_handshaker* handshaker, grpc_error* why) { + handshaker->vtable->shutdown(exec_ctx, handshaker, why); } void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx, @@ -141,14 +141,17 @@ void grpc_handshake_manager_destroy(grpc_exec_ctx* exec_ctx, } void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx, - grpc_handshake_manager* mgr) { + grpc_handshake_manager* mgr, + grpc_error* why) { gpr_mu_lock(&mgr->mu); // Shutdown the handshaker that's currently in progress, if any. if (!mgr->shutdown && mgr->index > 0) { mgr->shutdown = true; - grpc_handshaker_shutdown(exec_ctx, mgr->handshakers[mgr->index - 1]); + grpc_handshaker_shutdown(exec_ctx, mgr->handshakers[mgr->index - 1], + GRPC_ERROR_REF(why)); } gpr_mu_unlock(&mgr->mu); + GRPC_ERROR_UNREF(why); } // Helper function to call either the next handshaker or the @@ -197,7 +200,8 @@ static void call_next_handshaker(grpc_exec_ctx* exec_ctx, void* arg, static void on_timeout(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { grpc_handshake_manager* mgr = arg; if (error == GRPC_ERROR_NONE) { // Timer fired, rather than being cancelled. - grpc_handshake_manager_shutdown(exec_ctx, mgr); + grpc_handshake_manager_shutdown(exec_ctx, mgr, + GRPC_ERROR_CREATE("Handshake timed out")); } grpc_handshake_manager_unref(exec_ctx, mgr); } diff --git a/src/core/lib/channel/handshaker.h b/src/core/lib/channel/handshaker.h index 450b7adaee..a8e3692add 100644 --- a/src/core/lib/channel/handshaker.h +++ b/src/core/lib/channel/handshaker.h @@ -86,7 +86,8 @@ typedef struct { /// Shuts down the handshaker (e.g., to clean up when the operation is /// aborted in the middle). - void (*shutdown)(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker); + void (*shutdown)(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker, + grpc_error* why); /// Performs handshaking, modifying \a args as needed (e.g., to /// replace \a endpoint with a wrapped endpoint). @@ -111,7 +112,7 @@ void grpc_handshaker_init(const grpc_handshaker_vtable* vtable, void grpc_handshaker_destroy(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker); void grpc_handshaker_shutdown(grpc_exec_ctx* exec_ctx, - grpc_handshaker* handshaker); + grpc_handshaker* handshaker, grpc_error* why); void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker, grpc_tcp_server_acceptor* acceptor, @@ -141,7 +142,8 @@ void grpc_handshake_manager_destroy(grpc_exec_ctx* exec_ctx, /// The caller must still call grpc_handshake_manager_destroy() after /// calling this function. void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx, - grpc_handshake_manager* mgr); + grpc_handshake_manager* mgr, + grpc_error* why); /// Invokes handshakers in the order they were added. /// Takes ownership of \a endpoint, and then passes that ownership to diff --git a/src/core/lib/iomgr/endpoint.c b/src/core/lib/iomgr/endpoint.c index 2d300f4560..bf6e98146a 100644 --- a/src/core/lib/iomgr/endpoint.c +++ b/src/core/lib/iomgr/endpoint.c @@ -54,8 +54,9 @@ void grpc_endpoint_add_to_pollset_set(grpc_exec_ctx* exec_ctx, ep->vtable->add_to_pollset_set(exec_ctx, ep, pollset_set); } -void grpc_endpoint_shutdown(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep) { - ep->vtable->shutdown(exec_ctx, ep); +void grpc_endpoint_shutdown(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, + grpc_error* why) { + ep->vtable->shutdown(exec_ctx, ep, why); } void grpc_endpoint_destroy(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep) { diff --git a/src/core/lib/iomgr/endpoint.h b/src/core/lib/iomgr/endpoint.h index 1609b64f2b..740357ecc5 100644 --- a/src/core/lib/iomgr/endpoint.h +++ b/src/core/lib/iomgr/endpoint.h @@ -57,7 +57,7 @@ struct grpc_endpoint_vtable { grpc_pollset *pollset); void (*add_to_pollset_set)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_pollset_set *pollset); - void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep); + void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_error *why); void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep); grpc_resource_user *(*get_resource_user)(grpc_endpoint *ep); char *(*get_peer)(grpc_endpoint *ep); @@ -96,7 +96,8 @@ void grpc_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, /* Causes any pending and future read/write callbacks to run immediately with success==0 */ -void grpc_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep); +void grpc_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, + grpc_error *why); void grpc_endpoint_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep); /* Add an endpoint to a pollset, so that when the pollset is polled, events from diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c index 39b4d9cbd7..51842fc208 100644 --- a/src/core/lib/iomgr/ev_epoll_linux.c +++ b/src/core/lib/iomgr/ev_epoll_linux.c @@ -143,6 +143,7 @@ struct grpc_fd { /* Indicates that the fd is shutdown and that any pending read/write closures should fail */ bool shutdown; + grpc_error *shutdown_error; /* reason for shutdown: set iff shutdown==true */ /* The fd is either closed or we relinquished control of it. In either cases, this indicates that the 'fd' on this structure is no longer valid */ @@ -907,6 +908,7 @@ static void unref_by(grpc_fd *fd, int n) { fd->freelist_next = fd_freelist; fd_freelist = fd; grpc_iomgr_unregister_object(&fd->iomgr_object); + if (fd->shutdown) GRPC_ERROR_UNREF(fd->shutdown_error); gpr_mu_unlock(&fd_freelist_mu); } else { @@ -1058,11 +1060,11 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, GRPC_ERROR_UNREF(error); } -static grpc_error *fd_shutdown_error(bool shutdown) { - if (!shutdown) { +static grpc_error *fd_shutdown_error(grpc_fd *fd) { + if (!fd->shutdown) { return GRPC_ERROR_NONE; } else { - return GRPC_ERROR_CREATE("FD shutdown"); + return GRPC_ERROR_CREATE_REFERENCING("FD shutdown", &fd->shutdown_error, 1); } } @@ -1076,7 +1078,7 @@ static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, } else if (*st == CLOSURE_READY) { /* already ready ==> queue the closure to run immediately */ *st = CLOSURE_NOT_READY; - grpc_closure_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown)); + grpc_closure_sched(exec_ctx, closure, fd_shutdown_error(fd)); } else { /* upcallptr was set to a different closure. This is an error! */ gpr_log(GPR_ERROR, @@ -1098,7 +1100,7 @@ static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, return 0; } else { /* waiting ==> queue closure */ - grpc_closure_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown)); + grpc_closure_sched(exec_ctx, *st, fd_shutdown_error(fd)); *st = CLOSURE_NOT_READY; return 1; } @@ -1123,17 +1125,20 @@ static bool fd_is_shutdown(grpc_fd *fd) { } /* Might be called multiple times */ -static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { +static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) { gpr_mu_lock(&fd->po.mu); /* Do the actual shutdown only once */ if (!fd->shutdown) { fd->shutdown = true; + fd->shutdown_error = why; shutdown(fd->fd, SHUT_RDWR); /* Flush any pending read and write closures. Since fd->shutdown is 'true' at this point, the closures would be called with 'success = false' */ set_ready_locked(exec_ctx, fd, &fd->read_closure); set_ready_locked(exec_ctx, fd, &fd->write_closure); + } else { + GRPC_ERROR_UNREF(why); } gpr_mu_unlock(&fd->po.mu); } diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index 9477ac3688..ca12932219 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -82,6 +82,7 @@ struct grpc_fd { int shutdown; int closed; int released; + grpc_error *shutdown_error; /* The watcher list. @@ -306,6 +307,7 @@ static void unref_by(grpc_fd *fd, int n) { if (old == n) { gpr_mu_destroy(&fd->mu); grpc_iomgr_unregister_object(&fd->iomgr_object); + if (fd->shutdown) GRPC_ERROR_UNREF(fd->shutdown_error); gpr_free(fd); } else { GPR_ASSERT(old > n); @@ -444,11 +446,11 @@ static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); } static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); } #endif -static grpc_error *fd_shutdown_error(bool shutdown) { - if (!shutdown) { +static grpc_error *fd_shutdown_error(grpc_fd *fd) { + if (!fd->shutdown) { return GRPC_ERROR_NONE; } else { - return GRPC_ERROR_CREATE("FD shutdown"); + return GRPC_ERROR_CREATE_REFERENCING("FD shutdown", &fd->shutdown_error, 1); } } @@ -462,7 +464,7 @@ static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, } else if (*st == CLOSURE_READY) { /* already ready ==> queue the closure to run immediately */ *st = CLOSURE_NOT_READY; - grpc_closure_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown)); + grpc_closure_sched(exec_ctx, closure, fd_shutdown_error(fd)); maybe_wake_one_watcher_locked(fd); } else { /* upcallptr was set to a different closure. This is an error! */ @@ -485,7 +487,7 @@ static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, return 0; } else { /* waiting ==> queue closure */ - grpc_closure_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown)); + grpc_closure_sched(exec_ctx, *st, fd_shutdown_error(fd)); *st = CLOSURE_NOT_READY; return 1; } @@ -496,15 +498,18 @@ static void set_read_notifier_pollset_locked( fd->read_notifier_pollset = read_notifier_pollset; } -static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { +static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) { gpr_mu_lock(&fd->mu); /* only shutdown once */ if (!fd->shutdown) { fd->shutdown = 1; + fd->shutdown_error = why; /* signal read/write closed to OS so that future operations fail */ shutdown(fd->fd, SHUT_RDWR); set_ready_locked(exec_ctx, fd, &fd->read_closure); set_ready_locked(exec_ctx, fd, &fd->write_closure); + } else { + GRPC_ERROR_UNREF(why); } gpr_mu_unlock(&fd->mu); } diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c index c106ba5400..5bb55631d6 100644 --- a/src/core/lib/iomgr/ev_posix.c +++ b/src/core/lib/iomgr/ev_posix.c @@ -162,8 +162,8 @@ void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done, g_event_engine->fd_orphan(exec_ctx, fd, on_done, release_fd, reason); } -void grpc_fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { - g_event_engine->fd_shutdown(exec_ctx, fd); +void grpc_fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) { + g_event_engine->fd_shutdown(exec_ctx, fd, why); } bool grpc_fd_is_shutdown(grpc_fd *fd) { diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h index 1068a4bad5..a589efdeec 100644 --- a/src/core/lib/iomgr/ev_posix.h +++ b/src/core/lib/iomgr/ev_posix.h @@ -51,7 +51,7 @@ typedef struct grpc_event_engine_vtable { int (*fd_wrapped_fd)(grpc_fd *fd); void (*fd_orphan)(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done, int *release_fd, const char *reason); - void (*fd_shutdown)(grpc_exec_ctx *exec_ctx, grpc_fd *fd); + void (*fd_shutdown)(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why); void (*fd_notify_on_read)(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure); void (*fd_notify_on_write)(grpc_exec_ctx *exec_ctx, grpc_fd *fd, @@ -140,7 +140,7 @@ void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done, bool grpc_fd_is_shutdown(grpc_fd *fd); /* Cause any current and future callbacks to fail. */ -void grpc_fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd); +void grpc_fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why); /* Register read interest, causing read_cb to be called once when fd becomes readable, on deadline specified by deadline, or on shutdown triggered by diff --git a/src/core/lib/iomgr/network_status_tracker.c b/src/core/lib/iomgr/network_status_tracker.c index a5ca9ed2c3..1601a39002 100644 --- a/src/core/lib/iomgr/network_status_tracker.c +++ b/src/core/lib/iomgr/network_status_tracker.c @@ -117,7 +117,8 @@ void grpc_network_status_shutdown_all_endpoints() { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; for (endpoint_ll_node *curr = head; curr != NULL; curr = curr->next) { - curr->ep->vtable->shutdown(&exec_ctx, curr->ep); + curr->ep->vtable->shutdown(&exec_ctx, curr->ep, + GRPC_ERROR_CREATE("Network unavailable")); } gpr_mu_unlock(&g_endpoint_mutex); grpc_exec_ctx_finish(&exec_ctx); diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c index 16b0f4e73c..0144192b71 100644 --- a/src/core/lib/iomgr/tcp_client_posix.c +++ b/src/core/lib/iomgr/tcp_client_posix.c @@ -121,7 +121,8 @@ static void tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { } gpr_mu_lock(&ac->mu); if (ac->fd != NULL) { - grpc_fd_shutdown(exec_ctx, ac->fd); + grpc_fd_shutdown(exec_ctx, ac->fd, + GRPC_ERROR_CREATE("connect() timed out")); } done = (--ac->refs == 0); gpr_mu_unlock(&ac->mu); diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index a33e63e845..a4381f8fc9 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -119,9 +119,10 @@ static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, grpc_error *error); -static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { +static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, + grpc_error *why) { grpc_tcp *tcp = (grpc_tcp *)ep; - grpc_fd_shutdown(exec_ctx, tcp->em_fd); + grpc_fd_shutdown(exec_ctx, tcp->em_fd, why); grpc_resource_user_shutdown(exec_ctx, tcp->resource_user); } diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c index 20efb678b2..e9e7511c9c 100644 --- a/src/core/lib/iomgr/tcp_server_posix.c +++ b/src/core/lib/iomgr/tcp_server_posix.c @@ -276,7 +276,8 @@ static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { if (s->active_ports) { grpc_tcp_listener *sp; for (sp = s->head; sp; sp = sp->next) { - grpc_fd_shutdown(exec_ctx, sp->emfd); + grpc_fd_shutdown(exec_ctx, sp->emfd, + GRPC_ERROR_CREATE("Server destroyed")); } gpr_mu_unlock(&s->mu); } else { @@ -773,7 +774,8 @@ void grpc_tcp_server_shutdown_listeners(grpc_exec_ctx *exec_ctx, if (s->active_ports) { grpc_tcp_listener *sp; for (sp = s->head; sp; sp = sp->next) { - grpc_fd_shutdown(exec_ctx, sp->emfd); + grpc_fd_shutdown(exec_ctx, sp->emfd, + GRPC_ERROR_CREATE("Server shutdown")); } } gpr_mu_unlock(&s->mu); diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c index dfbd295c91..3b23b47d4f 100644 --- a/src/core/lib/iomgr/udp_server.c +++ b/src/core/lib/iomgr/udp_server.c @@ -203,7 +203,8 @@ void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, for (sp = s->head; sp; sp = sp->next) { GPR_ASSERT(sp->orphan_cb); sp->orphan_cb(sp->emfd); - grpc_fd_shutdown(exec_ctx, sp->emfd); + grpc_fd_shutdown(exec_ctx, sp->emfd, + GRPC_ERROR_CREATE("Server destroyed")); } gpr_mu_unlock(&s->mu); } else { diff --git a/src/core/lib/security/transport/secure_endpoint.c b/src/core/lib/security/transport/secure_endpoint.c index 18a7a6f7e7..7d58843d69 100644 --- a/src/core/lib/security/transport/secure_endpoint.c +++ b/src/core/lib/security/transport/secure_endpoint.c @@ -341,10 +341,10 @@ static void endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *secure_ep, GPR_TIMER_END("secure_endpoint.endpoint_write", 0); } -static void endpoint_shutdown(grpc_exec_ctx *exec_ctx, - grpc_endpoint *secure_ep) { +static void endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *secure_ep, + grpc_error *why) { secure_endpoint *ep = (secure_endpoint *)secure_ep; - grpc_endpoint_shutdown(exec_ctx, ep->wrapped_ep); + grpc_endpoint_shutdown(exec_ctx, ep->wrapped_ep, why); } static void endpoint_destroy(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/security/transport/security_handshaker.c b/src/core/lib/security/transport/security_handshaker.c index 37d57d759b..bb8a3bf6cd 100644 --- a/src/core/lib/security/transport/security_handshaker.c +++ b/src/core/lib/security/transport/security_handshaker.c @@ -130,7 +130,7 @@ static void security_handshake_failed_locked(grpc_exec_ctx *exec_ctx, // before destroying them, even if we know that there are no // pending read/write callbacks. This should be fixed, at which // point this can be removed. - grpc_endpoint_shutdown(exec_ctx, h->args->endpoint); + grpc_endpoint_shutdown(exec_ctx, h->args->endpoint, GRPC_ERROR_REF(error)); // Not shutting down, so the write failed. Clean up before // invoking the callback. cleanup_args_for_failure_locked(exec_ctx, h); @@ -347,15 +347,17 @@ static void security_handshaker_destroy(grpc_exec_ctx *exec_ctx, } static void security_handshaker_shutdown(grpc_exec_ctx *exec_ctx, - grpc_handshaker *handshaker) { + grpc_handshaker *handshaker, + grpc_error *why) { security_handshaker *h = (security_handshaker *)handshaker; gpr_mu_lock(&h->mu); if (!h->shutdown) { h->shutdown = true; - grpc_endpoint_shutdown(exec_ctx, h->args->endpoint); + grpc_endpoint_shutdown(exec_ctx, h->args->endpoint, GRPC_ERROR_REF(why)); cleanup_args_for_failure_locked(exec_ctx, h); } gpr_mu_unlock(&h->mu); + GRPC_ERROR_UNREF(why); } static void security_handshaker_do_handshake(grpc_exec_ctx *exec_ctx, @@ -417,7 +419,10 @@ static void fail_handshaker_destroy(grpc_exec_ctx *exec_ctx, } static void fail_handshaker_shutdown(grpc_exec_ctx *exec_ctx, - grpc_handshaker *handshaker) {} + grpc_handshaker *handshaker, + grpc_error *why) { + GRPC_ERROR_UNREF(why); +} static void fail_handshaker_do_handshake(grpc_exec_ctx *exec_ctx, grpc_handshaker *handshaker, diff --git a/test/core/bad_client/bad_client.c b/test/core/bad_client/bad_client.c index a84086804c..b7e7606b18 100644 --- a/test/core/bad_client/bad_client.c +++ b/test/core/bad_client/bad_client.c @@ -163,7 +163,8 @@ void grpc_run_bad_client_test( gpr_event_wait(&a.done_write, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5))); if (flags & GRPC_BAD_CLIENT_DISCONNECT) { - grpc_endpoint_shutdown(&exec_ctx, sfd.client); + grpc_endpoint_shutdown(&exec_ctx, sfd.client, + GRPC_ERROR_CREATE("Forced Disconnect")); grpc_endpoint_destroy(&exec_ctx, sfd.client); grpc_exec_ctx_finish(&exec_ctx); sfd.client = NULL; @@ -189,7 +190,8 @@ void grpc_run_bad_client_test( grpc_slice_buffer_destroy_internal(&exec_ctx, &args.incoming); } // Shutdown. - grpc_endpoint_shutdown(&exec_ctx, sfd.client); + grpc_endpoint_shutdown(&exec_ctx, sfd.client, + GRPC_ERROR_CREATE("Test Shutdown")); grpc_endpoint_destroy(&exec_ctx, sfd.client); grpc_exec_ctx_finish(&exec_ctx); } diff --git a/test/core/client_channel/set_initial_connect_string_test.c b/test/core/client_channel/set_initial_connect_string_test.c index fc0aca0434..a0a33667cc 100644 --- a/test/core/client_channel/set_initial_connect_string_test.c +++ b/test/core/client_channel/set_initial_connect_string_test.c @@ -81,7 +81,9 @@ static void handle_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { state.incoming_buffer.length, strlen(magic_connect_string)); if (state.incoming_buffer.length > strlen(magic_connect_string)) { gpr_atm_rel_store(&state.done_atm, 1); - grpc_endpoint_shutdown(exec_ctx, state.tcp); + grpc_endpoint_shutdown( + exec_ctx, state.tcp, + GRPC_ERROR_CREATE("Incoming buffer longer than magic_connect_string")); grpc_endpoint_destroy(exec_ctx, state.tcp); } else { grpc_endpoint_read(exec_ctx, state.tcp, &state.temp_incoming_buffer, diff --git a/test/core/end2end/bad_server_response_test.c b/test/core/end2end/bad_server_response_test.c index 42bf4fa235..d5f428eb82 100644 --- a/test/core/end2end/bad_server_response_test.c +++ b/test/core/end2end/bad_server_response_test.c @@ -298,7 +298,8 @@ static void run_test(const char *response_payload, gpr_event_wait(&ev, gpr_inf_future(GPR_CLOCK_REALTIME)); /* clean up */ - grpc_endpoint_shutdown(&exec_ctx, state.tcp); + grpc_endpoint_shutdown(&exec_ctx, state.tcp, + GRPC_ERROR_CREATE("Test Shutdown")); grpc_endpoint_destroy(&exec_ctx, state.tcp); cleanup_rpc(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx); diff --git a/test/core/end2end/fixtures/http_proxy.c b/test/core/end2end/fixtures/http_proxy.c index dac9baf3ce..6fdc86fc12 100644 --- a/test/core/end2end/fixtures/http_proxy.c +++ b/test/core/end2end/fixtures/http_proxy.c @@ -133,9 +133,12 @@ static void proxy_connection_failed(grpc_exec_ctx* exec_ctx, const char* msg = grpc_error_string(error); gpr_log(GPR_INFO, "%s: %s", prefix, msg); - grpc_endpoint_shutdown(exec_ctx, conn->client_endpoint); - if (conn->server_endpoint != NULL) - grpc_endpoint_shutdown(exec_ctx, conn->server_endpoint); + grpc_endpoint_shutdown(exec_ctx, conn->client_endpoint, + GRPC_ERROR_REF(error)); + if (conn->server_endpoint != NULL) { + grpc_endpoint_shutdown(exec_ctx, conn->server_endpoint, + GRPC_ERROR_REF(error)); + } proxy_connection_unref(exec_ctx, conn); } diff --git a/test/core/internal_api_canaries/iomgr.c b/test/core/internal_api_canaries/iomgr.c index 3225b9dc19..d73d5c175c 100644 --- a/test/core/internal_api_canaries/iomgr.c +++ b/test/core/internal_api_canaries/iomgr.c @@ -92,7 +92,7 @@ static void test_code(void) { grpc_endpoint_read(&exec_ctx, &endpoint, NULL, NULL); grpc_endpoint_get_peer(&endpoint); grpc_endpoint_write(&exec_ctx, &endpoint, NULL, NULL); - grpc_endpoint_shutdown(&exec_ctx, &endpoint); + grpc_endpoint_shutdown(&exec_ctx, &endpoint, GRPC_ERROR_CANCELLED); grpc_endpoint_destroy(&exec_ctx, &endpoint); grpc_endpoint_add_to_pollset(&exec_ctx, &endpoint, NULL); grpc_endpoint_add_to_pollset_set(&exec_ctx, &endpoint, NULL); diff --git a/test/core/iomgr/endpoint_tests.c b/test/core/iomgr/endpoint_tests.c index df5dd20903..bbc5f383f6 100644 --- a/test/core/iomgr/endpoint_tests.c +++ b/test/core/iomgr/endpoint_tests.c @@ -233,9 +233,11 @@ static void read_and_write_test(grpc_endpoint_test_config config, if (shutdown) { gpr_log(GPR_DEBUG, "shutdown read"); - grpc_endpoint_shutdown(&exec_ctx, state.read_ep); + grpc_endpoint_shutdown(&exec_ctx, state.read_ep, + GRPC_ERROR_CREATE("Test Shutdown")); gpr_log(GPR_DEBUG, "shutdown write"); - grpc_endpoint_shutdown(&exec_ctx, state.write_ep); + grpc_endpoint_shutdown(&exec_ctx, state.write_ep, + GRPC_ERROR_CREATE("Test Shutdown")); } grpc_exec_ctx_flush(&exec_ctx); @@ -296,7 +298,8 @@ static void multiple_shutdown_test(grpc_endpoint_test_config config) { grpc_closure_create(inc_on_failure, &fail_count, grpc_schedule_on_exec_ctx)); wait_for_fail_count(&exec_ctx, &fail_count, 0); - grpc_endpoint_shutdown(&exec_ctx, f.client_ep); + grpc_endpoint_shutdown(&exec_ctx, f.client_ep, + GRPC_ERROR_CREATE("Test Shutdown")); wait_for_fail_count(&exec_ctx, &fail_count, 1); grpc_endpoint_read(&exec_ctx, f.client_ep, &slice_buffer, grpc_closure_create(inc_on_failure, &fail_count, @@ -307,7 +310,8 @@ static void multiple_shutdown_test(grpc_endpoint_test_config config) { grpc_closure_create(inc_on_failure, &fail_count, grpc_schedule_on_exec_ctx)); wait_for_fail_count(&exec_ctx, &fail_count, 3); - grpc_endpoint_shutdown(&exec_ctx, f.client_ep); + grpc_endpoint_shutdown(&exec_ctx, f.client_ep, + GRPC_ERROR_CREATE("Test Shutdown")); wait_for_fail_count(&exec_ctx, &fail_count, 3); grpc_slice_buffer_destroy_internal(&exec_ctx, &slice_buffer); diff --git a/test/core/iomgr/ev_epoll_linux_test.c b/test/core/iomgr/ev_epoll_linux_test.c index a10be7f81b..5b05ea3338 100644 --- a/test/core/iomgr/ev_epoll_linux_test.c +++ b/test/core/iomgr/ev_epoll_linux_test.c @@ -89,7 +89,8 @@ static void test_fd_cleanup(grpc_exec_ctx *exec_ctx, test_fd *tfds, int i; for (i = 0; i < num_fds; i++) { - grpc_fd_shutdown(exec_ctx, tfds[i].fd); + grpc_fd_shutdown(exec_ctx, tfds[i].fd, + GRPC_ERROR_CREATE("test_fd_cleanup")); grpc_exec_ctx_flush(exec_ctx); grpc_fd_orphan(exec_ctx, tfds[i].fd, NULL, &release_fd, "test_fd_cleanup"); diff --git a/test/core/iomgr/fd_posix_test.c b/test/core/iomgr/fd_posix_test.c index a617bfc646..4726e935d8 100644 --- a/test/core/iomgr/fd_posix_test.c +++ b/test/core/iomgr/fd_posix_test.c @@ -132,7 +132,8 @@ static void session_shutdown_cb(grpc_exec_ctx *exec_ctx, void *arg, /*session */ grpc_fd_orphan(exec_ctx, se->em_fd, NULL, NULL, "a"); gpr_free(se); /* Start to shutdown listen fd. */ - grpc_fd_shutdown(exec_ctx, sv->em_fd); + grpc_fd_shutdown(exec_ctx, sv->em_fd, + GRPC_ERROR_CREATE("session_shutdown_cb")); } /* Called when data become readable in a session. */ diff --git a/test/core/iomgr/tcp_client_posix_test.c b/test/core/iomgr/tcp_client_posix_test.c index 0ea7a000eb..6bb00bc787 100644 --- a/test/core/iomgr/tcp_client_posix_test.c +++ b/test/core/iomgr/tcp_client_posix_test.c @@ -72,7 +72,8 @@ static void must_succeed(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { GPR_ASSERT(g_connecting != NULL); GPR_ASSERT(error == GRPC_ERROR_NONE); - grpc_endpoint_shutdown(exec_ctx, g_connecting); + grpc_endpoint_shutdown(exec_ctx, g_connecting, + GRPC_ERROR_CREATE("must_succeed called")); grpc_endpoint_destroy(exec_ctx, g_connecting); g_connecting = NULL; finish_connection(); diff --git a/test/core/iomgr/tcp_server_posix_test.c b/test/core/iomgr/tcp_server_posix_test.c index 020f005980..417bc5111a 100644 --- a/test/core/iomgr/tcp_server_posix_test.c +++ b/test/core/iomgr/tcp_server_posix_test.c @@ -121,7 +121,7 @@ static void server_weak_ref_set(server_weak_ref *weak_ref, static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, grpc_pollset *pollset, grpc_tcp_server_acceptor *acceptor) { - grpc_endpoint_shutdown(exec_ctx, tcp); + grpc_endpoint_shutdown(exec_ctx, tcp, GRPC_ERROR_CREATE("Connected")); grpc_endpoint_destroy(exec_ctx, tcp); on_connect_result temp_result; diff --git a/test/core/security/secure_endpoint_test.c b/test/core/security/secure_endpoint_test.c index bcc50d0bb0..97e9c3d6db 100644 --- a/test/core/security/secure_endpoint_test.c +++ b/test/core/security/secure_endpoint_test.c @@ -166,8 +166,10 @@ static void test_leftover(grpc_endpoint_test_config config, size_t slice_size) { GPR_ASSERT(incoming.count == 1); GPR_ASSERT(grpc_slice_eq(s, incoming.slices[0])); - grpc_endpoint_shutdown(&exec_ctx, f.client_ep); - grpc_endpoint_shutdown(&exec_ctx, f.server_ep); + grpc_endpoint_shutdown(&exec_ctx, f.client_ep, + GRPC_ERROR_CREATE("test_leftover end")); + grpc_endpoint_shutdown(&exec_ctx, f.server_ep, + GRPC_ERROR_CREATE("test_leftover end")); grpc_endpoint_destroy(&exec_ctx, f.client_ep); grpc_endpoint_destroy(&exec_ctx, f.server_ep); grpc_exec_ctx_finish(&exec_ctx); diff --git a/test/core/security/ssl_server_fuzzer.c b/test/core/security/ssl_server_fuzzer.c index 55e8f5e78d..f789278add 100644 --- a/test/core/security/ssl_server_fuzzer.c +++ b/test/core/security/ssl_server_fuzzer.c @@ -121,7 +121,8 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) { // server will wait for more data. Explicitly fail the server by shutting down // the endpoint. if (!state.done_callback_called) { - grpc_endpoint_shutdown(&exec_ctx, mock_endpoint); + grpc_endpoint_shutdown(&exec_ctx, mock_endpoint, + GRPC_ERROR_CREATE("Explicit close")); grpc_exec_ctx_flush(&exec_ctx); } diff --git a/test/core/surface/concurrent_connectivity_test.c b/test/core/surface/concurrent_connectivity_test.c index 8ebe8d07e4..7071f93d8d 100644 --- a/test/core/surface/concurrent_connectivity_test.c +++ b/test/core/surface/concurrent_connectivity_test.c @@ -107,7 +107,7 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *vargs, grpc_endpoint *tcp, grpc_tcp_server_acceptor *acceptor) { gpr_free(acceptor); struct server_thread_args *args = (struct server_thread_args *)vargs; - grpc_endpoint_shutdown(exec_ctx, tcp); + grpc_endpoint_shutdown(exec_ctx, tcp, GRPC_ERROR_CREATE("Connected")); grpc_endpoint_destroy(exec_ctx, tcp); GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(args->pollset, NULL)); } diff --git a/test/core/util/mock_endpoint.c b/test/core/util/mock_endpoint.c index 04793bceab..d531ec6031 100644 --- a/test/core/util/mock_endpoint.c +++ b/test/core/util/mock_endpoint.c @@ -78,16 +78,18 @@ static void me_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, static void me_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_pollset_set *pollset) {} -static void me_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { +static void me_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, + grpc_error *why) { grpc_mock_endpoint *m = (grpc_mock_endpoint *)ep; gpr_mu_lock(&m->mu); if (m->on_read) { - grpc_closure_sched(exec_ctx, m->on_read, - GRPC_ERROR_CREATE("Endpoint Shutdown")); + grpc_closure_sched(exec_ctx, m->on_read, GRPC_ERROR_CREATE_REFERENCING( + "Endpoint Shutdown", &why, 1)); m->on_read = NULL; } gpr_mu_unlock(&m->mu); grpc_resource_user_shutdown(exec_ctx, m->resource_user); + GRPC_ERROR_UNREF(why); } static void me_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { diff --git a/test/core/util/passthru_endpoint.c b/test/core/util/passthru_endpoint.c index d42ec7f9e8..2ad019ddb0 100644 --- a/test/core/util/passthru_endpoint.c +++ b/test/core/util/passthru_endpoint.c @@ -109,21 +109,25 @@ static void me_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, static void me_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_pollset_set *pollset) {} -static void me_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { +static void me_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, + grpc_error *why) { half *m = (half *)ep; gpr_mu_lock(&m->parent->mu); m->parent->shutdown = true; if (m->on_read) { - grpc_closure_sched(exec_ctx, m->on_read, GRPC_ERROR_CREATE("Shutdown")); + grpc_closure_sched(exec_ctx, m->on_read, + GRPC_ERROR_CREATE_REFERENCING("Shutdown", &why, 1)); m->on_read = NULL; } m = other_half(m); if (m->on_read) { - grpc_closure_sched(exec_ctx, m->on_read, GRPC_ERROR_CREATE("Shutdown")); + grpc_closure_sched(exec_ctx, m->on_read, + GRPC_ERROR_CREATE_REFERENCING("Shutdown", &why, 1)); m->on_read = NULL; } gpr_mu_unlock(&m->parent->mu); grpc_resource_user_shutdown(exec_ctx, m->resource_user); + GRPC_ERROR_UNREF(why); } static void me_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { diff --git a/test/core/util/reconnect_server.c b/test/core/util/reconnect_server.c index 7bf83a74a1..7fbd0ca6aa 100644 --- a/test/core/util/reconnect_server.c +++ b/test/core/util/reconnect_server.c @@ -80,7 +80,7 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME); timestamp_list *new_tail; peer = grpc_endpoint_get_peer(tcp); - grpc_endpoint_shutdown(exec_ctx, tcp); + grpc_endpoint_shutdown(exec_ctx, tcp, GRPC_ERROR_CREATE("Connected")); grpc_endpoint_destroy(exec_ctx, tcp); if (peer) { last_colon = strrchr(peer, ':'); -- GitLab