From ef6b97659edb575a002d574db89d90f7ebf4b979 Mon Sep 17 00:00:00 2001 From: Craig Tiller <ctiller@google.com> Date: Wed, 28 Sep 2016 08:37:51 -0700 Subject: [PATCH] Add tracing, fix some transport bugs wrt buffer_pools --- include/grpc/grpc.h | 2 +- .../chttp2/transport/chttp2_transport.c | 24 +- src/core/lib/iomgr/buffer_pool.c | 66 ++++- src/core/lib/iomgr/buffer_pool.h | 6 +- src/core/lib/iomgr/tcp_client_posix.c | 2 +- src/core/lib/iomgr/tcp_posix.c | 2 +- src/core/lib/iomgr/tcp_server_posix.c | 2 +- .../google_default_credentials.c | 3 +- .../security/credentials/jwt/jwt_verifier.c | 4 +- .../credentials/oauth2/oauth2_credentials.c | 5 +- src/core/lib/surface/init.c | 2 + src/ruby/ext/grpc/rb_grpc_imports.generated.h | 2 +- test/core/end2end/tests/buffer_pool_server.c | 228 +++++++++++++++++- test/core/util/mock_endpoint.c | 12 +- test/core/util/passthru_endpoint.c | 16 +- test/core/util/port_server_client.c | 11 +- 16 files changed, 354 insertions(+), 33 deletions(-) diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index a6e02cd072..4bdf744d91 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -402,7 +402,7 @@ GRPCAPI int grpc_is_binary_header(const char *key, size_t length); GRPCAPI const char *grpc_call_error_to_string(grpc_call_error error); /** Create a buffer pool */ -GRPCAPI grpc_buffer_pool *grpc_buffer_pool_create(void); +GRPCAPI grpc_buffer_pool *grpc_buffer_pool_create(const char *trace_name); /** Add a reference to a buffer pool */ GRPCAPI void grpc_buffer_pool_ref(grpc_buffer_pool *buffer_pool); diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 3ebb467332..13241f6abe 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -257,7 +257,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_closure_init(&t->benign_reclaimer, benign_reclaimer, t); grpc_closure_init(&t->destructive_reclaimer, destructive_reclaimer, t); grpc_closure_init(&t->benign_reclaimer_locked, benign_reclaimer_locked, t); - grpc_closure_init(&t->destructive_reclaimer_locked, destructive_reclaimer_locked, t); + grpc_closure_init(&t->destructive_reclaimer_locked, + destructive_reclaimer_locked, t); grpc_chttp2_goaway_parser_init(&t->goaway_parser); grpc_chttp2_hpack_parser_init(&t->hpack_parser); @@ -2124,10 +2125,21 @@ static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_chttp2_transport *t = arg; if (error == GRPC_ERROR_NONE && grpc_chttp2_stream_map_size(&t->stream_map) == 0) { + if (grpc_buffer_pool_trace) { + gpr_log(GPR_DEBUG, "HTTP2: %s - send goaway to free memory", + t->peer_string); + } send_goaway(exec_ctx, t, GRPC_CHTTP2_ENHANCE_YOUR_CALM, gpr_slice_from_static_string("Buffers full")); + } else if (grpc_buffer_pool_trace) { + gpr_log(GPR_DEBUG, + "HTTP2: %s - skip benign reclaimation, there are still %" PRIdPTR + " streams", + t->peer_string, grpc_chttp2_stream_map_size(&t->stream_map)); } t->benign_reclaimer_registered = false; + grpc_buffer_user_finish_reclaimation(exec_ctx, + grpc_endpoint_get_buffer_user(t->ep)); GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "benign_reclaimer"); } @@ -2138,18 +2150,20 @@ static void destructive_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg, t->destructive_reclaimer_registered = false; if (error == GRPC_ERROR_NONE && n > 0) { grpc_chttp2_stream *s = grpc_chttp2_stream_map_rand(&t->stream_map); + if (grpc_buffer_pool_trace) { + gpr_log(GPR_DEBUG, "HTTP2: %s - abandon stream id %d", t->peer_string, + s->id); + } grpc_chttp2_cancel_stream( exec_ctx, t, s, grpc_error_set_int(GRPC_ERROR_CREATE("Buffers full"), GRPC_ERROR_INT_HTTP2_ERROR, GRPC_CHTTP2_ENHANCE_YOUR_CALM)); if (n > 1) { post_destructive_reclaimer(exec_ctx, t); - t->destructive_reclaimer_registered = true; - grpc_buffer_user_post_reclaimer(exec_ctx, - grpc_endpoint_get_buffer_user(t->ep), - true, &t->destructive_reclaimer); } } + grpc_buffer_user_finish_reclaimation(exec_ctx, + grpc_endpoint_get_buffer_user(t->ep)); GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "destructive_reclaimer"); } diff --git a/src/core/lib/iomgr/buffer_pool.c b/src/core/lib/iomgr/buffer_pool.c index 88f37c9fc1..cfa171684b 100644 --- a/src/core/lib/iomgr/buffer_pool.c +++ b/src/core/lib/iomgr/buffer_pool.c @@ -37,6 +37,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> #include <grpc/support/useful.h> #include "src/core/lib/iomgr/combiner.h" @@ -62,6 +63,8 @@ struct grpc_buffer_pool { grpc_closure bpreclaimation_done_closure; grpc_buffer_user *roots[GRPC_BULIST_COUNT]; + + char *name; }; /******************************************************************************* @@ -175,8 +178,18 @@ static bool bpalloc(grpc_exec_ctx *exec_ctx, grpc_buffer_pool *buffer_pool) { gpr_mu_lock(&buffer_user->mu); if (buffer_user->free_pool < 0 && -buffer_user->free_pool <= buffer_pool->free_pool) { - buffer_pool->free_pool += buffer_user->free_pool; + int64_t amt = -buffer_user->free_pool; buffer_user->free_pool = 0; + buffer_pool->free_pool -= amt; + if (grpc_buffer_pool_trace) { + gpr_log(GPR_DEBUG, "BP %s %s: grant alloc %" PRId64 + " bytes; bp_free_pool -> %" PRId64, + buffer_pool->name, buffer_user->name, amt, + buffer_pool->free_pool); + } + } else if (grpc_buffer_pool_trace && buffer_user->free_pool >= 0) { + gpr_log(GPR_DEBUG, "BP %s %s: discard already satisfied alloc request", + buffer_pool->name, buffer_user->name); } if (buffer_user->free_pool >= 0) { buffer_user->allocating = false; @@ -198,8 +211,15 @@ static bool bpscavenge(grpc_exec_ctx *exec_ctx, grpc_buffer_pool *buffer_pool) { bulist_pop(buffer_pool, GRPC_BULIST_NON_EMPTY_FREE_POOL))) { gpr_mu_lock(&buffer_user->mu); if (buffer_user->free_pool > 0) { - buffer_pool->free_pool += buffer_user->free_pool; + int64_t amt = buffer_user->free_pool; buffer_user->free_pool = 0; + buffer_pool->free_pool += amt; + if (grpc_buffer_pool_trace) { + gpr_log(GPR_DEBUG, "BP %s %s: scavenge %" PRId64 + " bytes; bp_free_pool -> %" PRId64, + buffer_pool->name, buffer_user->name, amt, + buffer_pool->free_pool); + } gpr_mu_unlock(&buffer_user->mu); return true; } else { @@ -217,6 +237,10 @@ static bool bpreclaim(grpc_exec_ctx *exec_ctx, grpc_buffer_pool *buffer_pool, : GRPC_BULIST_RECLAIMER_BENIGN; grpc_buffer_user *buffer_user = bulist_pop(buffer_pool, list); if (buffer_user == NULL) return false; + if (grpc_buffer_pool_trace) { + gpr_log(GPR_DEBUG, "BP %s %s: initiate %s reclaimation", buffer_pool->name, + buffer_user->name, destructive ? "destructive" : "benign"); + } buffer_pool->reclaiming = true; grpc_closure *c = buffer_user->reclaimers[destructive]; buffer_user->reclaimers[destructive] = NULL; @@ -384,7 +408,7 @@ static void bp_reclaimation_done(grpc_exec_ctx *exec_ctx, void *bp, * grpc_buffer_pool api */ -grpc_buffer_pool *grpc_buffer_pool_create(void) { +grpc_buffer_pool *grpc_buffer_pool_create(const char *name) { grpc_buffer_pool *buffer_pool = gpr_malloc(sizeof(*buffer_pool)); gpr_ref_init(&buffer_pool->refs, 1); buffer_pool->combiner = grpc_combiner_create(NULL); @@ -392,6 +416,12 @@ grpc_buffer_pool *grpc_buffer_pool_create(void) { buffer_pool->size = INT64_MAX; buffer_pool->step_scheduled = false; buffer_pool->reclaiming = false; + if (name != NULL) { + buffer_pool->name = gpr_strdup(name); + } else { + gpr_asprintf(&buffer_pool->name, "anonymous_pool_%" PRIxPTR, + (intptr_t)buffer_pool); + } grpc_closure_init(&buffer_pool->bpstep_closure, bpstep, buffer_pool); grpc_closure_init(&buffer_pool->bpreclaimation_done_closure, bp_reclaimation_done, buffer_pool); @@ -451,7 +481,7 @@ grpc_buffer_pool *grpc_buffer_pool_from_channel_args( } } } - return grpc_buffer_pool_create(); + return grpc_buffer_pool_create(NULL); } static void *bp_copy(void *bp) { @@ -473,7 +503,7 @@ const grpc_arg_pointer_vtable *grpc_buffer_pool_arg_vtable(void) { */ void grpc_buffer_user_init(grpc_buffer_user *buffer_user, - grpc_buffer_pool *buffer_pool) { + grpc_buffer_pool *buffer_pool, const char *name) { buffer_user->buffer_pool = grpc_buffer_pool_internal_ref(buffer_pool); grpc_closure_init(&buffer_user->allocate_closure, &bu_allocate, buffer_user); grpc_closure_init(&buffer_user->add_to_free_pool_closure, @@ -498,6 +528,12 @@ void grpc_buffer_user_init(grpc_buffer_user *buffer_user, #ifndef NDEBUG buffer_user->asan_canary = gpr_malloc(1); #endif + if (name != NULL) { + buffer_user->name = gpr_strdup(name); + } else { + gpr_asprintf(&buffer_user->name, "anonymous_buffer_user_%" PRIxPTR, + (intptr_t)buffer_user); + } } void grpc_buffer_user_shutdown(grpc_exec_ctx *exec_ctx, @@ -533,6 +569,10 @@ void grpc_buffer_user_alloc(grpc_exec_ctx *exec_ctx, &buffer_user->on_done_destroy_closure); if (on_done_destroy != NULL) { /* already shutdown */ + if (grpc_buffer_pool_trace) { + gpr_log(GPR_DEBUG, "BP %s %s: alloc %" PRIdPTR " after shutdown", + buffer_user->buffer_pool->name, buffer_user->name, size); + } grpc_exec_ctx_sched( exec_ctx, optional_on_done, GRPC_ERROR_CREATE("Buffer pool user is already shutdown"), NULL); @@ -541,6 +581,12 @@ void grpc_buffer_user_alloc(grpc_exec_ctx *exec_ctx, } buffer_user->allocated += (int64_t)size; buffer_user->free_pool -= (int64_t)size; + if (grpc_buffer_pool_trace) { + gpr_log(GPR_DEBUG, "BP %s %s: alloc %" PRIdPTR "; allocated -> %" PRId64 + ", free_pool -> %" PRId64, + buffer_user->buffer_pool->name, buffer_user->name, size, + buffer_user->allocated, buffer_user->free_pool); + } if (buffer_user->free_pool < 0) { grpc_closure_list_append(&buffer_user->on_allocated, optional_on_done, GRPC_ERROR_NONE); @@ -563,6 +609,12 @@ void grpc_buffer_user_free(grpc_exec_ctx *exec_ctx, bool was_zero_or_negative = buffer_user->free_pool <= 0; buffer_user->free_pool += (int64_t)size; buffer_user->allocated -= (int64_t)size; + if (grpc_buffer_pool_trace) { + gpr_log(GPR_DEBUG, "BP %s %s: free %" PRIdPTR "; allocated -> %" PRId64 + ", free_pool -> %" PRId64, + buffer_user->buffer_pool->name, buffer_user->name, size, + buffer_user->allocated, buffer_user->free_pool); + } bool is_bigger_than_zero = buffer_user->free_pool > 0; if (is_bigger_than_zero && was_zero_or_negative && !buffer_user->added_to_free_pool) { @@ -597,6 +649,10 @@ void grpc_buffer_user_post_reclaimer(grpc_exec_ctx *exec_ctx, void grpc_buffer_user_finish_reclaimation(grpc_exec_ctx *exec_ctx, grpc_buffer_user *buffer_user) { + if (grpc_buffer_pool_trace) { + gpr_log(GPR_DEBUG, "BP %s %s: reclaimation complete", + buffer_user->buffer_pool->name, buffer_user->name); + } grpc_combiner_execute(exec_ctx, buffer_user->buffer_pool->combiner, &buffer_user->buffer_pool->bpreclaimation_done_closure, GRPC_ERROR_NONE, false); diff --git a/src/core/lib/iomgr/buffer_pool.h b/src/core/lib/iomgr/buffer_pool.h index 301e059c18..1564872b5d 100644 --- a/src/core/lib/iomgr/buffer_pool.h +++ b/src/core/lib/iomgr/buffer_pool.h @@ -38,6 +38,8 @@ #include "src/core/lib/iomgr/exec_ctx.h" +extern int grpc_buffer_pool_trace; + grpc_buffer_pool *grpc_buffer_pool_internal_ref(grpc_buffer_pool *buffer_pool); void grpc_buffer_pool_internal_unref(grpc_exec_ctx *exec_ctx, grpc_buffer_pool *buffer_pool); @@ -83,10 +85,12 @@ struct grpc_buffer_user { gpr_atm on_done_destroy_closure; grpc_buffer_user_link links[GRPC_BULIST_COUNT]; + + char *name; }; void grpc_buffer_user_init(grpc_buffer_user *buffer_user, - grpc_buffer_pool *buffer_pool); + grpc_buffer_pool *buffer_pool, const char *name); void grpc_buffer_user_shutdown(grpc_exec_ctx *exec_ctx, grpc_buffer_user *buffer_user, grpc_closure *on_done); diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c index cf01623f09..4e8f9e53bb 100644 --- a/src/core/lib/iomgr/tcp_client_posix.c +++ b/src/core/lib/iomgr/tcp_client_posix.c @@ -125,7 +125,7 @@ grpc_endpoint *grpc_tcp_client_create_from_fd( grpc_exec_ctx *exec_ctx, grpc_fd *fd, const grpc_channel_args *channel_args, const char *addr_str) { size_t tcp_read_chunk_size = GRPC_TCP_DEFAULT_READ_SLICE_SIZE; - grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create(); + grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create(NULL); if (channel_args != NULL) { for (size_t i = 0; i < channel_args->num_args; i++) { if (0 == diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index 812c39235b..120622e817 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -544,7 +544,7 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, grpc_buffer_pool *buffer_pool, tcp->write_closure.cb = tcp_handle_write; tcp->write_closure.cb_arg = tcp; gpr_slice_buffer_init(&tcp->last_read_buffer); - grpc_buffer_user_init(&tcp->buffer_user, buffer_pool); + grpc_buffer_user_init(&tcp->buffer_user, buffer_pool, peer_string); grpc_buffer_user_slice_allocator_init( &tcp->slice_allocator, &tcp->buffer_user, tcp_read_allocation_done, tcp); /* Tell network status tracker about new endpoint */ diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c index 3304152385..4d36fd4caf 100644 --- a/src/core/lib/iomgr/tcp_server_posix.c +++ b/src/core/lib/iomgr/tcp_server_posix.c @@ -163,7 +163,7 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server)); s->so_reuseport = has_so_reuseport; - s->buffer_pool = grpc_buffer_pool_create(); + s->buffer_pool = grpc_buffer_pool_create(NULL); for (size_t i = 0; i < (args == NULL ? 0 : args->num_args); i++) { if (0 == strcmp(GRPC_ARG_ALLOW_REUSEPORT, args->args[i].key)) { if (args->args[i].type == GRPC_ARG_INTEGER) { diff --git a/src/core/lib/security/credentials/google_default/google_default_credentials.c b/src/core/lib/security/credentials/google_default/google_default_credentials.c index 3bcde3da8b..4e703aa9f4 100644 --- a/src/core/lib/security/credentials/google_default/google_default_credentials.c +++ b/src/core/lib/security/credentials/google_default/google_default_credentials.c @@ -124,7 +124,8 @@ static int is_stack_running_on_compute_engine(void) { grpc_httpcli_context_init(&context); - grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create(); + grpc_buffer_pool *buffer_pool = + grpc_buffer_pool_create("google_default_credentials"); grpc_httpcli_get( &exec_ctx, &context, &detector.pollent, buffer_pool, &request, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), max_detection_delay), diff --git a/src/core/lib/security/credentials/jwt/jwt_verifier.c b/src/core/lib/security/credentials/jwt/jwt_verifier.c index c1a3eb7eab..ffcd0b3910 100644 --- a/src/core/lib/security/credentials/jwt/jwt_verifier.c +++ b/src/core/lib/security/credentials/jwt/jwt_verifier.c @@ -660,7 +660,7 @@ static void on_openid_config_retrieved(grpc_exec_ctx *exec_ctx, void *user_data, /* TODO(ctiller): Carry the buffer_pool in ctx and share it with the host channel. This would allow us to cancel an authentication query when under extreme memory pressure. */ - grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create(); + grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create("jwt_verifier"); grpc_httpcli_get( exec_ctx, &ctx->verifier->http_ctx, &ctx->pollent, buffer_pool, &req, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), grpc_jwt_verifier_max_delay), @@ -772,7 +772,7 @@ static void retrieve_key_and_verify(grpc_exec_ctx *exec_ctx, /* TODO(ctiller): Carry the buffer_pool in ctx and share it with the host channel. This would allow us to cancel an authentication query when under extreme memory pressure. */ - grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create(); + grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create("jwt_verifier"); grpc_httpcli_get( exec_ctx, &ctx->verifier->http_ctx, &ctx->pollent, buffer_pool, &req, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), grpc_jwt_verifier_max_delay), diff --git a/src/core/lib/security/credentials/oauth2/oauth2_credentials.c b/src/core/lib/security/credentials/oauth2/oauth2_credentials.c index e9e83d1468..61c0815b2a 100644 --- a/src/core/lib/security/credentials/oauth2/oauth2_credentials.c +++ b/src/core/lib/security/credentials/oauth2/oauth2_credentials.c @@ -310,7 +310,7 @@ static void compute_engine_fetch_oauth2( /* TODO(ctiller): Carry the buffer_pool in ctx and share it with the host channel. This would allow us to cancel an authentication query when under extreme memory pressure. */ - grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create(); + grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create("oauth2_credentials"); grpc_httpcli_get(exec_ctx, httpcli_context, pollent, buffer_pool, &request, deadline, grpc_closure_create(response_cb, metadata_req), &metadata_req->response); @@ -365,7 +365,8 @@ static void refresh_token_fetch_oauth2( /* TODO(ctiller): Carry the buffer_pool in ctx and share it with the host channel. This would allow us to cancel an authentication query when under extreme memory pressure. */ - grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create(); + grpc_buffer_pool *buffer_pool = + grpc_buffer_pool_create("oauth2_credentials_refresh"); grpc_httpcli_post(exec_ctx, httpcli_context, pollent, buffer_pool, &request, body, strlen(body), deadline, grpc_closure_create(response_cb, metadata_req), diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c index 3cbbaa7b0c..de913af4ee 100644 --- a/src/core/lib/surface/init.c +++ b/src/core/lib/surface/init.c @@ -48,6 +48,7 @@ #include "src/core/lib/channel/message_size_filter.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/http/parser.h" +#include "src/core/lib/iomgr/buffer_pool.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr.h" @@ -184,6 +185,7 @@ void grpc_init(void) { // Default timeout trace to 1 grpc_cq_event_timeout_trace = 1; grpc_register_tracer("op_failure", &grpc_trace_operation_failures); + grpc_register_tracer("buffer_pool", &grpc_buffer_pool_trace); #ifndef NDEBUG grpc_register_tracer("pending_tags", &grpc_trace_pending_tags); #endif diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h index 4926275fa2..703ea59e6e 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h @@ -347,7 +347,7 @@ extern grpc_is_binary_header_type grpc_is_binary_header_import; typedef const char *(*grpc_call_error_to_string_type)(grpc_call_error error); extern grpc_call_error_to_string_type grpc_call_error_to_string_import; #define grpc_call_error_to_string grpc_call_error_to_string_import -typedef grpc_buffer_pool *(*grpc_buffer_pool_create_type)(void); +typedef grpc_buffer_pool *(*grpc_buffer_pool_create_type)(const char *trace_name); extern grpc_buffer_pool_create_type grpc_buffer_pool_create_import; #define grpc_buffer_pool_create grpc_buffer_pool_create_import typedef void(*grpc_buffer_pool_ref_type)(grpc_buffer_pool *buffer_pool); diff --git a/test/core/end2end/tests/buffer_pool_server.c b/test/core/end2end/tests/buffer_pool_server.c index 0b06efd02e..7f07ec79d5 100644 --- a/test/core/end2end/tests/buffer_pool_server.c +++ b/test/core/end2end/tests/buffer_pool_server.c @@ -95,9 +95,235 @@ static void end_test(grpc_end2end_test_fixture *f) { grpc_completion_queue_destroy(f->cq); } +/* Creates and returns a gpr_slice containing random alphanumeric characters. */ +static gpr_slice generate_random_slice() { + size_t i; + static const char chars[] = "abcdefghijklmnopqrstuvwxyz1234567890"; + char output[1024 * 1024]; + for (i = 0; i < GPR_ARRAY_SIZE(output) - 1; ++i) { + output[i] = chars[rand() % (int)(sizeof(chars) - 1)]; + } + output[GPR_ARRAY_SIZE(output) - 1] = '\0'; + return gpr_slice_from_copied_string(output); +} + void buffer_pool_server(grpc_end2end_test_config config) { + grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create("test_server"); + grpc_buffer_pool_resize(buffer_pool, 5 * 1024 * 1024); + +#define NUM_CALLS 100 +#define CLIENT_BASE_TAG 1000 +#define SERVER_START_BASE_TAG 2000 +#define SERVER_RECV_BASE_TAG 3000 +#define SERVER_END_BASE_TAG 4000 + + grpc_arg arg; + arg.key = GRPC_ARG_BUFFER_POOL; + arg.type = GRPC_ARG_POINTER; + arg.value.pointer.p = buffer_pool; + arg.value.pointer.vtable = grpc_buffer_pool_arg_vtable(); + grpc_channel_args args = {1, &arg}; + grpc_end2end_test_fixture f = - begin_test(config, "buffer_pool_server", NULL, NULL); + begin_test(config, "buffer_pool_server", NULL, &args); + + /* Create large request and response bodies. These are big enough to require + * multiple round trips to deliver to the peer, and their exact contents of + * will be verified on completion. */ + gpr_slice request_payload_slice = generate_random_slice(); + + grpc_call *client_calls[NUM_CALLS]; + grpc_call *server_calls[NUM_CALLS]; + grpc_metadata_array initial_metadata_recv[NUM_CALLS]; + grpc_metadata_array trailing_metadata_recv[NUM_CALLS]; + grpc_metadata_array request_metadata_recv[NUM_CALLS]; + grpc_call_details call_details[NUM_CALLS]; + grpc_status_code status[NUM_CALLS]; + char *details[NUM_CALLS]; + size_t details_capacity[NUM_CALLS]; + grpc_byte_buffer *request_payload_recv[NUM_CALLS]; + int was_cancelled[NUM_CALLS]; + grpc_call_error error; + int pending_client_calls = 0; + int pending_server_start_calls = 0; + int pending_server_recv_calls = 0; + int pending_server_end_calls = 0; + int cancelled_calls_on_client = 0; + int cancelled_calls_on_server = 0; + + grpc_byte_buffer *request_payload = + grpc_raw_byte_buffer_create(&request_payload_slice, 1); + + grpc_op ops[6]; + grpc_op *op; + + for (int i = 0; i < NUM_CALLS; i++) { + grpc_metadata_array_init(&initial_metadata_recv[i]); + grpc_metadata_array_init(&trailing_metadata_recv[i]); + grpc_metadata_array_init(&request_metadata_recv[i]); + grpc_call_details_init(&call_details[i]); + details[i] = NULL; + details_capacity[i] = 0; + request_payload_recv[i] = NULL; + was_cancelled[i] = 0; + } + + for (int i = 0; i < NUM_CALLS; i++) { + error = grpc_server_request_call( + f.server, &server_calls[i], &call_details[i], &request_metadata_recv[i], + f.cq, f.cq, tag(SERVER_START_BASE_TAG + i)); + GPR_ASSERT(GRPC_CALL_OK == error); + + pending_server_start_calls++; + } + + for (int i = 0; i < NUM_CALLS; i++) { + client_calls[i] = grpc_channel_create_call( + f.client, NULL, GRPC_PROPAGATE_DEFAULTS, f.cq, "/foo", + "foo.test.google.fr", n_seconds_time(60), NULL); + + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message = request_payload; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_RECV_INITIAL_METADATA; + op->data.recv_initial_metadata = &initial_metadata_recv[i]; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; + op->data.recv_status_on_client.trailing_metadata = + &trailing_metadata_recv[i]; + op->data.recv_status_on_client.status = &status[i]; + op->data.recv_status_on_client.status_details = &details[i]; + op->data.recv_status_on_client.status_details_capacity = + &details_capacity[i]; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(client_calls[i], ops, (size_t)(op - ops), + tag(CLIENT_BASE_TAG + i), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + + pending_client_calls++; + } + + while (pending_client_calls + pending_server_recv_calls + + pending_server_end_calls > + 0) { + gpr_log(GPR_DEBUG, + "pending: client_calls=%d server_start_calls=%d " + "server_recv_calls=%d server_end_calls=%d", + pending_client_calls, pending_server_start_calls, + pending_server_recv_calls, pending_server_end_calls); + + grpc_event ev = grpc_completion_queue_next(f.cq, n_seconds_time(10), NULL); + GPR_ASSERT(ev.type == GRPC_OP_COMPLETE); + + int ev_tag = (int)(intptr_t)ev.tag; + if (ev_tag < CLIENT_BASE_TAG) { + abort(); /* illegal tag */ + } else if (ev_tag < SERVER_START_BASE_TAG) { + /* client call finished */ + int call_id = ev_tag - CLIENT_BASE_TAG; + GPR_ASSERT(call_id >= 0); + GPR_ASSERT(call_id < NUM_CALLS); + switch (status[call_id]) { + case GRPC_STATUS_RESOURCE_EXHAUSTED: + cancelled_calls_on_client++; + break; + case GRPC_STATUS_OK: + break; + default: + gpr_log(GPR_ERROR, "Unexpected status code: %d", status[call_id]); + abort(); + } + GPR_ASSERT(pending_client_calls > 0); + pending_client_calls--; + } else if (ev_tag < SERVER_RECV_BASE_TAG) { + /* new incoming call to the server */ + int call_id = ev_tag - SERVER_START_BASE_TAG; + GPR_ASSERT(call_id >= 0); + GPR_ASSERT(call_id < NUM_CALLS); + + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message = &request_payload_recv[call_id]; + op->flags = 0; + op->reserved = NULL; + op++; + error = + grpc_call_start_batch(server_calls[call_id], ops, (size_t)(op - ops), + tag(SERVER_RECV_BASE_TAG + call_id), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + + GPR_ASSERT(pending_server_start_calls > 0); + pending_server_start_calls--; + pending_server_recv_calls++; + } else if (ev_tag < SERVER_END_BASE_TAG) { + /* finished read on the server */ + int call_id = ev_tag - SERVER_RECV_BASE_TAG; + GPR_ASSERT(call_id >= 0); + GPR_ASSERT(call_id < NUM_CALLS); + + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; + op->data.recv_close_on_server.cancelled = &was_cancelled[call_id]; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; + op->data.send_status_from_server.trailing_metadata_count = 0; + op->data.send_status_from_server.status = GRPC_STATUS_OK; + op->data.send_status_from_server.status_details = "xyz"; + op->flags = 0; + op->reserved = NULL; + op++; + error = + grpc_call_start_batch(server_calls[call_id], ops, (size_t)(op - ops), + tag(SERVER_END_BASE_TAG + call_id), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + + GPR_ASSERT(pending_server_recv_calls > 0); + pending_server_recv_calls--; + pending_server_end_calls++; + } else { + int call_id = ev_tag - SERVER_END_BASE_TAG; + GPR_ASSERT(call_id >= 0); + GPR_ASSERT(call_id < NUM_CALLS); + + if (was_cancelled[call_id]) { + cancelled_calls_on_server++; + } + GPR_ASSERT(pending_server_end_calls > 0); + pending_server_end_calls--; + } + } + + gpr_log( + GPR_INFO, + "Done. %d total calls: %d cancelled at server, %d cancelled at client.", + NUM_CALLS, cancelled_calls_on_server, cancelled_calls_on_client); + end_test(&f); config.tear_down_data(&f); } diff --git a/test/core/util/mock_endpoint.c b/test/core/util/mock_endpoint.c index dcb4e5896f..a70de7678c 100644 --- a/test/core/util/mock_endpoint.c +++ b/test/core/util/mock_endpoint.c @@ -33,6 +33,8 @@ #include "test/core/util/mock_endpoint.h" +#include <inttypes.h> + #include <grpc/support/alloc.h> #include <grpc/support/string_util.h> @@ -88,7 +90,8 @@ static void unref(grpc_exec_ctx *exec_ctx, grpc_mock_endpoint *m) { } } -static void me_finish_shutdown(grpc_exec_ctx *exec_ctx, void *me, grpc_error *error) { +static void me_finish_shutdown(grpc_exec_ctx *exec_ctx, void *me, + grpc_error *error) { grpc_mock_endpoint *m = me; unref(exec_ctx, m); } @@ -108,7 +111,7 @@ static void me_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { static void me_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { grpc_mock_endpoint *m = (grpc_mock_endpoint *)ep; - unref(exec_ctx,m); + unref(exec_ctx, m); } static char *me_get_peer(grpc_endpoint *ep) { @@ -139,7 +142,10 @@ grpc_endpoint *grpc_mock_endpoint_create(void (*on_write)(gpr_slice slice), grpc_mock_endpoint *m = gpr_malloc(sizeof(*m)); m->base.vtable = &vtable; m->refs = 2; - grpc_buffer_user_init(&m->buffer_user, buffer_pool); + char *name; + gpr_asprintf(&name, "mock_endpoint_%" PRIxPTR, (intptr_t)m); + grpc_buffer_user_init(&m->buffer_user, buffer_pool, name); + gpr_free(name); gpr_slice_buffer_init(&m->read_buffer); gpr_mu_init(&m->mu); m->on_write = on_write; diff --git a/test/core/util/passthru_endpoint.c b/test/core/util/passthru_endpoint.c index bdf75ce587..a1aaeda916 100644 --- a/test/core/util/passthru_endpoint.c +++ b/test/core/util/passthru_endpoint.c @@ -33,6 +33,8 @@ #include "test/core/util/passthru_endpoint.h" +#include <inttypes.h> + #include <grpc/support/alloc.h> #include <grpc/support/string_util.h> @@ -141,7 +143,7 @@ static void me_really_destroy(grpc_exec_ctx *exec_ctx, void *ep, static void me_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { half *m = (half *)ep; grpc_buffer_user_shutdown(exec_ctx, &m->buffer_user, - grpc_closure_create(me_really_destroy, m)); + grpc_closure_create(me_really_destroy, m)); } static char *me_get_peer(grpc_endpoint *ep) { @@ -168,12 +170,16 @@ static const grpc_endpoint_vtable vtable = { }; static void half_init(half *m, passthru_endpoint *parent, - grpc_buffer_pool *buffer_pool) { + grpc_buffer_pool *buffer_pool, const char *half_name) { m->base.vtable = &vtable; m->parent = parent; gpr_slice_buffer_init(&m->read_buffer); m->on_read = NULL; - grpc_buffer_user_init(&m->buffer_user, buffer_pool); + char *name; + gpr_asprintf(&name, "passthru_endpoint_%s_%" PRIxPTR, half_name, + (intptr_t)parent); + grpc_buffer_user_init(&m->buffer_user, buffer_pool, name); + gpr_free(name); } void grpc_passthru_endpoint_create(grpc_endpoint **client, @@ -182,8 +188,8 @@ void grpc_passthru_endpoint_create(grpc_endpoint **client, passthru_endpoint *m = gpr_malloc(sizeof(*m)); m->halves = 2; m->shutdown = 0; - half_init(&m->client, m, buffer_pool); - half_init(&m->server, m, buffer_pool); + half_init(&m->client, m, buffer_pool, "client"); + half_init(&m->server, m, buffer_pool, "server"); gpr_mu_init(&m->mu); *client = &m->client.base; *server = &m->server.base; diff --git a/test/core/util/port_server_client.c b/test/core/util/port_server_client.c index dd444236e9..9bd34677cb 100644 --- a/test/core/util/port_server_client.c +++ b/test/core/util/port_server_client.c @@ -49,6 +49,8 @@ #include "src/core/lib/http/httpcli.h" +int grpc_buffer_pool_trace = 0; + typedef struct freereq { gpr_mu *mu; grpc_polling_entity pops; @@ -99,7 +101,8 @@ void grpc_free_port_using_server(char *server, int port) { req.http.path = path; grpc_httpcli_context_init(&context); - grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create(); + grpc_buffer_pool *buffer_pool = + grpc_buffer_pool_create("port_server_client/free"); grpc_httpcli_get(&exec_ctx, &context, &pr.pops, buffer_pool, &req, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10), grpc_closure_create(freed_port_from_server, &pr), &rsp); @@ -169,7 +172,8 @@ static void got_port_from_server(grpc_exec_ctx *exec_ctx, void *arg, req.http.path = "/get"; grpc_http_response_destroy(&pr->response); memset(&pr->response, 0, sizeof(pr->response)); - grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create(); + grpc_buffer_pool *buffer_pool = + grpc_buffer_pool_create("port_server_client/pick_retry"); grpc_httpcli_get(exec_ctx, pr->ctx, &pr->pops, buffer_pool, &req, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10), grpc_closure_create(got_port_from_server, pr), @@ -215,7 +219,8 @@ int grpc_pick_port_using_server(char *server) { req.http.path = "/get"; grpc_httpcli_context_init(&context); - grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create(); + grpc_buffer_pool *buffer_pool = + grpc_buffer_pool_create("port_server_client/pick"); grpc_httpcli_get(&exec_ctx, &context, &pr.pops, buffer_pool, &req, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10), grpc_closure_create(got_port_from_server, &pr), -- GitLab