From 61ecb9259b0a9e09d1b4f29ea8ee9ba57af0e7c7 Mon Sep 17 00:00:00 2001 From: Craig Tiller <ctiller@google.com> Date: Mon, 26 Sep 2016 13:22:12 -0700 Subject: [PATCH] TCP buffer pool integration done --- src/core/lib/iomgr/buffer_pool.c | 30 +++++++++++++++++++++-------- src/core/lib/iomgr/buffer_pool.h | 6 +++--- src/core/lib/iomgr/tcp_posix.c | 4 ++-- test/core/iomgr/buffer_pool_test.c | 31 +++++++++++++++++++++++++++++- test/core/util/mock_endpoint.c | 2 +- test/core/util/passthru_endpoint.c | 2 +- 6 files changed, 59 insertions(+), 16 deletions(-) diff --git a/src/core/lib/iomgr/buffer_pool.c b/src/core/lib/iomgr/buffer_pool.c index 837642f3bc..16445a4265 100644 --- a/src/core/lib/iomgr/buffer_pool.c +++ b/src/core/lib/iomgr/buffer_pool.c @@ -115,6 +115,7 @@ static grpc_buffer_user *bulist_pop(grpc_buffer_pool *buffer_pool, buffer_user->links[list].prev; buffer_user->links[list].prev->links[list].next = buffer_user->links[list].next; + *root = buffer_user->links[list].next; } buffer_user->links[list].next = buffer_user->links[list].prev = NULL; return buffer_user; @@ -365,8 +366,8 @@ static void bp_resize(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) { gpr_free(a); } -static void bpreclaimation_done_closure(grpc_exec_ctx *exec_ctx, void *bp, - grpc_error *error) { +static void bp_reclaimation_done(grpc_exec_ctx *exec_ctx, void *bp, + grpc_error *error) { grpc_buffer_pool *buffer_pool = bp; buffer_pool->reclaiming = false; bpstep_sched(exec_ctx, buffer_pool); @@ -386,7 +387,7 @@ grpc_buffer_pool *grpc_buffer_pool_create(void) { buffer_pool->reclaiming = false; grpc_closure_init(&buffer_pool->bpstep_closure, bpstep, buffer_pool); grpc_closure_init(&buffer_pool->bpreclaimation_done_closure, - bpreclaimation_done_closure, buffer_pool); + bp_reclaimation_done, buffer_pool); for (int i = 0; i < GRPC_BULIST_COUNT; i++) { buffer_pool->roots[i] = NULL; } @@ -481,6 +482,7 @@ void grpc_buffer_user_init(grpc_buffer_user *buffer_user, grpc_closure_list_init(&buffer_user->on_allocated); buffer_user->allocating = false; buffer_user->added_to_free_pool = false; + buffer_user->on_done_destroy = NULL; buffer_user->reclaimers[0] = NULL; buffer_user->reclaimers[1] = NULL; for (int i = 0; i < GRPC_BULIST_COUNT; i++) { @@ -488,18 +490,25 @@ void grpc_buffer_user_init(grpc_buffer_user *buffer_user, } } -void grpc_buffer_user_destroy(grpc_exec_ctx *exec_ctx, - grpc_buffer_user *buffer_user, - grpc_closure *on_done) { +void grpc_buffer_user_shutdown(grpc_exec_ctx *exec_ctx, + grpc_buffer_user *buffer_user, + grpc_closure *on_done) { + gpr_mu_lock(&buffer_user->mu); + GPR_ASSERT(buffer_user->on_done_destroy == NULL); buffer_user->on_done_destroy = on_done; - grpc_combiner_execute(exec_ctx, buffer_user->buffer_pool->combiner, - &buffer_user->destroy_closure, GRPC_ERROR_NONE, false); + if (buffer_user->allocated == 0) { + grpc_combiner_execute(exec_ctx, buffer_user->buffer_pool->combiner, + &buffer_user->destroy_closure, GRPC_ERROR_NONE, + false); + } + gpr_mu_unlock(&buffer_user->mu); } void grpc_buffer_user_alloc(grpc_exec_ctx *exec_ctx, grpc_buffer_user *buffer_user, size_t size, grpc_closure *optional_on_done) { gpr_mu_lock(&buffer_user->mu); + GPR_ASSERT(buffer_user->on_done_destroy == NULL); buffer_user->allocated += (int64_t)size; buffer_user->free_pool -= (int64_t)size; if (buffer_user->free_pool < 0) { @@ -532,6 +541,11 @@ void grpc_buffer_user_free(grpc_exec_ctx *exec_ctx, &buffer_user->add_to_free_pool_closure, GRPC_ERROR_NONE, false); } + if (buffer_user->on_done_destroy != NULL && buffer_user->allocated == 0) { + grpc_combiner_execute(exec_ctx, buffer_user->buffer_pool->combiner, + &buffer_user->destroy_closure, GRPC_ERROR_NONE, + false); + } gpr_mu_unlock(&buffer_user->mu); } diff --git a/src/core/lib/iomgr/buffer_pool.h b/src/core/lib/iomgr/buffer_pool.h index 968454ec94..317eb6b458 100644 --- a/src/core/lib/iomgr/buffer_pool.h +++ b/src/core/lib/iomgr/buffer_pool.h @@ -83,9 +83,9 @@ struct grpc_buffer_user { void grpc_buffer_user_init(grpc_buffer_user *buffer_user, grpc_buffer_pool *buffer_pool); -void grpc_buffer_user_destroy(grpc_exec_ctx *exec_ctx, - grpc_buffer_user *buffer_user, - grpc_closure *on_done); +void grpc_buffer_user_shutdown(grpc_exec_ctx *exec_ctx, + grpc_buffer_user *buffer_user, + grpc_closure *on_done); void grpc_buffer_user_alloc(grpc_exec_ctx *exec_ctx, grpc_buffer_user *buffer_user, size_t size, diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index 27a7f83b4d..8b0841cacc 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -125,8 +125,8 @@ static void tcp_begin_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { "tcp_unref_orphan"); gpr_slice_buffer_destroy(&tcp->last_read_buffer); gpr_free(tcp->peer_string); - grpc_buffer_user_destroy(exec_ctx, &tcp->buffer_user, - grpc_closure_create(tcp_end_free, tcp)); + grpc_buffer_user_shutdown(exec_ctx, &tcp->buffer_user, + grpc_closure_create(tcp_end_free, tcp)); } /*#define GRPC_TCP_REFCOUNT_DEBUG*/ diff --git a/test/core/iomgr/buffer_pool_test.c b/test/core/iomgr/buffer_pool_test.c index 7438d088ac..265ece72ac 100644 --- a/test/core/iomgr/buffer_pool_test.c +++ b/test/core/iomgr/buffer_pool_test.c @@ -78,7 +78,7 @@ grpc_closure *make_unused_reclaimer(grpc_closure *then) { static void destroy_user(grpc_buffer_user *usr) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; bool done = false; - grpc_buffer_user_destroy(&exec_ctx, usr, set_bool(&done)); + grpc_buffer_user_shutdown(&exec_ctx, usr, set_bool(&done)); grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(done); } @@ -498,6 +498,34 @@ static void test_multiple_reclaims_can_be_triggered(void) { GPR_ASSERT(destructive_done); } +static void test_buffer_user_stays_allocated_until_memory_released(void) { + gpr_log(GPR_INFO, + "** test_buffer_user_stays_allocated_until_memory_released **"); + grpc_buffer_pool *p = grpc_buffer_pool_create(); + grpc_buffer_pool_resize(p, 1024 * 1024); + grpc_buffer_user usr; + grpc_buffer_user_init(&usr, p); + bool done = false; + { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_buffer_user_alloc(&exec_ctx, &usr, 1024, NULL); + grpc_exec_ctx_finish(&exec_ctx); + } + { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_buffer_pool_unref(p); + grpc_buffer_user_shutdown(&exec_ctx, &usr, set_bool(&done)); + grpc_exec_ctx_finish(&exec_ctx); + GPR_ASSERT(!done); + } + { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_buffer_user_free(&exec_ctx, &usr, 1024); + grpc_exec_ctx_finish(&exec_ctx); + GPR_ASSERT(done); + } +} + int main(int argc, char **argv) { grpc_test_init(argc, argv); grpc_init(); @@ -516,6 +544,7 @@ int main(int argc, char **argv) { test_unused_reclaim_is_cancelled(); test_benign_reclaim_is_preferred(); test_multiple_reclaims_can_be_triggered(); + test_buffer_user_stays_allocated_until_memory_released(); grpc_shutdown(); return 0; } diff --git a/test/core/util/mock_endpoint.c b/test/core/util/mock_endpoint.c index 173a2d8963..e4c478281f 100644 --- a/test/core/util/mock_endpoint.c +++ b/test/core/util/mock_endpoint.c @@ -95,7 +95,7 @@ static void me_really_destroy(grpc_exec_ctx *exec_ctx, void *mp, static void me_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { grpc_mock_endpoint *m = (grpc_mock_endpoint *)ep; - grpc_buffer_user_destroy(exec_ctx, &m->buffer_user, + grpc_buffer_user_shutdown(exec_ctx, &m->buffer_user, grpc_closure_create(me_really_destroy, m)); } diff --git a/test/core/util/passthru_endpoint.c b/test/core/util/passthru_endpoint.c index 239cd3e275..bdf75ce587 100644 --- a/test/core/util/passthru_endpoint.c +++ b/test/core/util/passthru_endpoint.c @@ -140,7 +140,7 @@ static void me_really_destroy(grpc_exec_ctx *exec_ctx, void *ep, static void me_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { half *m = (half *)ep; - grpc_buffer_user_destroy(exec_ctx, &m->buffer_user, + grpc_buffer_user_shutdown(exec_ctx, &m->buffer_user, grpc_closure_create(me_really_destroy, m)); } -- GitLab