diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index de12215bb26285ab9cb902f8abc6cbf43ec52a4e..4397890c93f76bc8fa37b0dd18dc342278fede46 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -176,6 +176,7 @@ static void tcp_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { grpc_network_status_unregister_endpoint(ep); grpc_tcp *tcp = (grpc_tcp *)ep; tcp_maybe_shutdown_buffer_user(exec_ctx, tcp); + gpr_slice_buffer_reset_and_unref(&tcp->last_read_buffer); TCP_UNREF(exec_ctx, tcp, "destroy"); } @@ -292,6 +293,7 @@ static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, if (error != GRPC_ERROR_NONE) { gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer); + gpr_slice_buffer_reset_and_unref(&tcp->last_read_buffer); call_read_cb(exec_ctx, tcp, GRPC_ERROR_REF(error)); TCP_UNREF(exec_ctx, tcp, "read"); } else { @@ -558,10 +560,13 @@ int grpc_tcp_fd(grpc_endpoint *ep) { void grpc_tcp_destroy_and_release_fd(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, int *fd, grpc_closure *done) { + grpc_network_status_unregister_endpoint(ep); grpc_tcp *tcp = (grpc_tcp *)ep; GPR_ASSERT(ep->vtable == &vtable); tcp->release_fd = fd; tcp->release_fd_cb = done; + tcp_maybe_shutdown_buffer_user(exec_ctx, tcp); + gpr_slice_buffer_reset_and_unref(&tcp->last_read_buffer); TCP_UNREF(exec_ctx, tcp, "destroy"); } diff --git a/test/core/iomgr/tcp_posix_test.c b/test/core/iomgr/tcp_posix_test.c index c2f857020aa802b859dc998a7c07ceae84e9c987..1c56ed0507700ac86ce3816014124d9c8731de55 100644 --- a/test/core/iomgr/tcp_posix_test.c +++ b/test/core/iomgr/tcp_posix_test.c @@ -455,8 +455,10 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) { "pollset_work", grpc_pollset_work(&exec_ctx, g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), deadline))); + gpr_log(GPR_DEBUG, "wakeup: read=%" PRIdPTR " target=%" PRIdPTR, + state.read_bytes, state.target_read_bytes); gpr_mu_unlock(g_mu); - grpc_exec_ctx_finish(&exec_ctx); + grpc_exec_ctx_flush(&exec_ctx); gpr_mu_lock(g_mu); } GPR_ASSERT(state.read_bytes == state.target_read_bytes); @@ -464,6 +466,7 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) { gpr_slice_buffer_destroy(&state.incoming); grpc_tcp_destroy_and_release_fd(&exec_ctx, ep, &fd, &fd_released_cb); + grpc_exec_ctx_flush(&exec_ctx); gpr_mu_lock(g_mu); while (!fd_released_done) { grpc_pollset_worker *worker = NULL; @@ -471,6 +474,7 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) { "pollset_work", grpc_pollset_work(&exec_ctx, g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), deadline))); + gpr_log(GPR_DEBUG, "wakeup: fd_released_done=%d", fd_released_done); } gpr_mu_unlock(g_mu); GPR_ASSERT(fd_released_done == 1);