diff --git a/src/core/lib/iomgr/buffer_pool.c b/src/core/lib/iomgr/buffer_pool.c index 8ba6f822f8a0b3ee2b2ee2258c2bce4824d7c080..31ca08cc948e1f3727b9eb549827d00131351fa6 100644 --- a/src/core/lib/iomgr/buffer_pool.c +++ b/src/core/lib/iomgr/buffer_pool.c @@ -41,7 +41,14 @@ typedef bool (*bpstate_func)(grpc_exec_ctx *exec_ctx, grpc_buffer_pool *buffer_pool); +typedef struct { + grpc_buffer_user *head; + grpc_buffer_user *tail; +} grpc_buffer_user_list; + struct grpc_buffer_pool { + gpr_refcount refs; + grpc_combiner *combiner; int64_t size; int64_t free_pool; @@ -49,17 +56,57 @@ struct grpc_buffer_pool { bool step_scheduled; bool reclaiming; grpc_closure bpstep_closure; + + grpc_buffer_user_list lists[GRPC_BULIST_COUNT]; }; /******************************************************************************* * list management */ -void bulist_add(grpc_buffer_user *buffer_user, grpc_bulist list); -bool bulist_empty(grpc_buffer_pool *buffer_pool, grpc_bulist list); -grpc_buffer_user *bulist_head(grpc_buffer_pool *buffer_pool, grpc_bulist list); -grpc_buffer_user *bulist_pop(grpc_buffer_pool *buffer_pool, grpc_bulist list); -void bulist_remove(grpc_buffer_user *buffer_pool, grpc_bulist list); +static void bulist_add_tail(grpc_buffer_user *buffer_user, grpc_bulist list) { + grpc_buffer_pool *buffer_pool = buffer_user->buffer_pool; + grpc_buffer_user_list *lst = &buffer_pool->lists[list]; + if (lst->head == NULL) { + lst->head = lst->tail = buffer_user; + } else { + lst->tail->next[list] = buffer_user; + lst->tail = buffer_user; + } + buffer_user->next[list] = NULL; +} + +static void bulist_add_head(grpc_buffer_user *buffer_user, grpc_bulist list) { + grpc_buffer_pool *buffer_pool = buffer_user->buffer_pool; + grpc_buffer_user_list *lst = &buffer_pool->lists[list]; + if (lst->head == NULL) { + lst->head = lst->tail = buffer_user; + buffer_user->next[list] = NULL; + } else { + buffer_user->next[list] = lst->head; + lst->head = buffer_user; + } +} + +static bool bulist_empty(grpc_buffer_pool *buffer_pool, grpc_bulist list) { + return buffer_pool->lists[list].head == NULL; +} + +static grpc_buffer_user *bulist_pop(grpc_buffer_pool *buffer_pool, + grpc_bulist list) { + grpc_buffer_user_list *lst = &buffer_pool->lists[list]; + grpc_buffer_user *buffer_user = lst->head; + if (buffer_user == NULL) { + return NULL; + } + if (buffer_user == lst->tail) { + lst->head = lst->tail = NULL; + } else { + lst->head = buffer_user->next[list]; + } + buffer_user->next[list] = NULL; + return buffer_user; +} /******************************************************************************* * buffer pool state machine @@ -93,7 +140,7 @@ static void bpstep_sched(grpc_exec_ctx *exec_ctx, static bool bpalloc(grpc_exec_ctx *exec_ctx, grpc_buffer_pool *buffer_pool) { grpc_buffer_user *buffer_user; while ((buffer_user = - bulist_head(buffer_pool, GRPC_BULIST_AWAITING_ALLOCATION))) { + bulist_pop(buffer_pool, GRPC_BULIST_AWAITING_ALLOCATION))) { gpr_mu_lock(&buffer_user->mu); if (buffer_user->free_pool < 0 && -buffer_user->free_pool < buffer_pool->free_pool) { @@ -103,9 +150,9 @@ static bool bpalloc(grpc_exec_ctx *exec_ctx, grpc_buffer_pool *buffer_pool) { if (buffer_user->free_pool >= 0) { buffer_user->allocating = false; grpc_exec_ctx_enqueue_list(exec_ctx, &buffer_user->on_allocated, NULL); - bulist_remove(buffer_user, GRPC_BULIST_AWAITING_ALLOCATION); gpr_mu_unlock(&buffer_user->mu); } else { + bulist_add_head(buffer_user, GRPC_BULIST_AWAITING_ALLOCATION); gpr_mu_unlock(&buffer_user->mu); return false; } @@ -155,7 +202,7 @@ static void bu_allocate(grpc_exec_ctx *exec_ctx, void *bu, grpc_error *error) { if (bulist_empty(buffer_user->buffer_pool, GRPC_BULIST_AWAITING_ALLOCATION)) { bpstep_sched(exec_ctx, buffer_user->buffer_pool); } - bulist_add(buffer_user, GRPC_BULIST_AWAITING_ALLOCATION); + bulist_add_tail(buffer_user, GRPC_BULIST_AWAITING_ALLOCATION); } static void bu_add_to_free_pool(grpc_exec_ctx *exec_ctx, void *bu, @@ -166,7 +213,7 @@ static void bu_add_to_free_pool(grpc_exec_ctx *exec_ctx, void *bu, bulist_empty(buffer_user->buffer_pool, GRPC_BULIST_NON_EMPTY_FREE_POOL)) { bpstep_sched(exec_ctx, buffer_user->buffer_pool); } - bulist_add(buffer_user, GRPC_BULIST_AWAITING_ALLOCATION); + bulist_add_tail(buffer_user, GRPC_BULIST_AWAITING_ALLOCATION); } static void bu_post_benign_reclaimer(grpc_exec_ctx *exec_ctx, void *bu, @@ -178,7 +225,7 @@ static void bu_post_benign_reclaimer(grpc_exec_ctx *exec_ctx, void *bu, bulist_empty(buffer_user->buffer_pool, GRPC_BULIST_RECLAIMER_BENIGN)) { bpstep_sched(exec_ctx, buffer_user->buffer_pool); } - bulist_add(buffer_user, GRPC_BULIST_RECLAIMER_BENIGN); + bulist_add_tail(buffer_user, GRPC_BULIST_RECLAIMER_BENIGN); } static void bu_post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *bu, @@ -192,7 +239,7 @@ static void bu_post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *bu, GRPC_BULIST_RECLAIMER_DESTRUCTIVE)) { bpstep_sched(exec_ctx, buffer_user->buffer_pool); } - bulist_add(buffer_user, GRPC_BULIST_RECLAIMER_DESTRUCTIVE); + bulist_add_tail(buffer_user, GRPC_BULIST_RECLAIMER_DESTRUCTIVE); } /******************************************************************************* @@ -201,6 +248,7 @@ static void bu_post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *bu, grpc_buffer_pool *grpc_buffer_pool_create(void) { grpc_buffer_pool *buffer_pool = gpr_malloc(sizeof(*buffer_pool)); + gpr_ref_init(&buffer_pool->refs, 1); buffer_pool->combiner = grpc_combiner_create(NULL); buffer_pool->free_pool = INT64_MAX; buffer_pool->size = INT64_MAX; @@ -208,13 +256,36 @@ grpc_buffer_pool *grpc_buffer_pool_create(void) { return buffer_pool; } +void grpc_buffer_pool_internal_unref(grpc_exec_ctx *exec_ctx, + grpc_buffer_pool *buffer_pool) { + if (gpr_unref(&buffer_pool->refs)) { + grpc_combiner_destroy(exec_ctx, buffer_pool->combiner); + gpr_free(buffer_pool); + } +} + +void grpc_buffer_pool_unref(grpc_buffer_pool *buffer_pool) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_buffer_pool_internal_unref(&exec_ctx, buffer_pool); + grpc_exec_ctx_finish(&exec_ctx); +} + +grpc_buffer_pool *grpc_buffer_pool_internal_ref(grpc_buffer_pool *buffer_pool) { + gpr_ref(&buffer_pool->refs); + return buffer_pool; +} + +void grpc_buffer_pool_ref(grpc_buffer_pool *buffer_pool) { + grpc_buffer_pool_internal_ref(buffer_pool); +} + /******************************************************************************* * grpc_buffer_user api */ void grpc_buffer_user_init(grpc_buffer_user *buffer_user, grpc_buffer_pool *buffer_pool) { - buffer_user->buffer_pool = buffer_pool; + buffer_user->buffer_pool = grpc_buffer_pool_internal_ref(buffer_pool); grpc_closure_init(&buffer_user->allocate_closure, &bu_allocate, buffer_user); grpc_closure_init(&buffer_user->add_to_free_pool_closure, &bu_add_to_free_pool, buffer_user); diff --git a/src/core/lib/iomgr/buffer_pool.h b/src/core/lib/iomgr/buffer_pool.h index eec3d4c1e047f7f6752dc3a34f241b75b869da0f..087d9a167f3f1352063a9c584f5cef15a2144f2d 100644 --- a/src/core/lib/iomgr/buffer_pool.h +++ b/src/core/lib/iomgr/buffer_pool.h @@ -43,9 +43,12 @@ typedef enum { GRPC_BULIST_NON_EMPTY_FREE_POOL, GRPC_BULIST_RECLAIMER_BENIGN, GRPC_BULIST_RECLAIMER_DESTRUCTIVE, + GRPC_BULIST_COUNT } grpc_bulist; -typedef struct grpc_buffer_user { +typedef struct grpc_buffer_user grpc_buffer_user; + +struct grpc_buffer_user { grpc_buffer_pool *buffer_pool; grpc_closure allocate_closure; @@ -60,7 +63,9 @@ typedef struct grpc_buffer_user { grpc_closure *reclaimers[2]; grpc_closure post_reclaimer_closure[2]; -} grpc_buffer_user; + + grpc_buffer_user *next[GRPC_BULIST_COUNT]; +}; void grpc_buffer_user_init(grpc_buffer_user *buffer_user, grpc_buffer_pool *buffer_pool); diff --git a/test/core/iomgr/buffer_pool_test.c b/test/core/iomgr/buffer_pool_test.c index 306612f775d75dd9d56d619c716571c984a8e832..670ca0acfac261881b222155975c52633b392000 100644 --- a/test/core/iomgr/buffer_pool_test.c +++ b/test/core/iomgr/buffer_pool_test.c @@ -33,11 +33,19 @@ #include "src/core/lib/iomgr/buffer_pool.h" +#include <grpc/support/log.h> + #include "test/core/util/test_config.h" +static void test_no_op(void) { + gpr_log(GPR_DEBUG, "** test_no_op **"); + grpc_buffer_pool_unref(grpc_buffer_pool_create()); +} + int main(int argc, char **argv) { grpc_test_init(argc, argv); grpc_init(); + test_no_op(); grpc_shutdown(); return 0; }