diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 6f63f999bfd1f77d336002a0f26262e32656a418..9eb501decbadc32c6f6729fef87a16dffbbe11b3 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -36,20 +36,22 @@ #include <stdlib.h> #include <string.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> +#include <grpc/support/useful.h> + #include "src/core/channel/census_filter.h" #include "src/core/channel/channel_args.h" #include "src/core/channel/connected_channel.h" #include "src/core/iomgr/iomgr.h" +#include "src/core/support/stack_lockfree.h" #include "src/core/support/string.h" #include "src/core/surface/call.h" #include "src/core/surface/channel.h" #include "src/core/surface/completion_queue.h" #include "src/core/surface/init.h" #include "src/core/transport/metadata.h" -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include <grpc/support/string_util.h> -#include <grpc/support/useful.h> typedef struct listener { void *arg; @@ -72,8 +74,8 @@ typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type; typedef struct requested_call { requested_call_type type; - struct requested_call *next; void *tag; + grpc_server *server; grpc_completion_queue *cq_bound_to_call; grpc_completion_queue *cq_for_notification; grpc_call **call; @@ -161,7 +163,7 @@ struct call_data { struct request_matcher { call_data *pending_head; call_data *pending_tail; - requested_call *requests; + gpr_stack_lockfree *requests; }; struct registered_method { @@ -197,8 +199,13 @@ struct grpc_server { registered_method *registered_methods; request_matcher unregistered_request_matcher; + /** free list of available requested_calls indices */ + gpr_stack_lockfree *request_freelist; + /** requested call backing data */ + requested_call *requested_calls; + int max_requested_calls; - gpr_uint8 shutdown; + gpr_atm shutdown_flag; gpr_uint8 shutdown_published; size_t num_shutdown_tags; shutdown_tag *shutdown_tags; @@ -290,8 +297,15 @@ static void channel_broadcaster_shutdown(channel_broadcaster *cb, * request_matcher */ -static void request_matcher_init(request_matcher *request_matcher) { +static void request_matcher_init(request_matcher *request_matcher, + int entries) { memset(request_matcher, 0, sizeof(*request_matcher)); + request_matcher->requests = gpr_stack_lockfree_create(entries); +} + +static void request_matcher_destroy(request_matcher *request_matcher) { + GPR_ASSERT(gpr_stack_lockfree_pop(request_matcher->requests) == -1); + gpr_stack_lockfree_destroy(request_matcher->requests); } static void kill_zombie(void *elem, int success) { @@ -328,6 +342,7 @@ static void server_delete(grpc_server *server) { gpr_free(server->channel_filters); while ((rm = server->registered_methods) != NULL) { server->registered_methods = rm->next; + request_matcher_destroy(&rm->request_matcher); gpr_free(rm->method); gpr_free(rm->host); gpr_free(rm); @@ -335,9 +350,12 @@ static void server_delete(grpc_server *server) { for (i = 0; i < server->cq_count; i++) { GRPC_CQ_INTERNAL_UNREF(server->cqs[i], "server"); } + request_matcher_destroy(&server->unregistered_request_matcher); + gpr_stack_lockfree_destroy(server->request_freelist); gpr_free(server->cqs); gpr_free(server->pollsets); gpr_free(server->shutdown_tags); + gpr_free(server->requested_calls); gpr_free(server); } @@ -377,11 +395,12 @@ static void destroy_channel(channel_data *chand) { static void finish_start_new_rpc(grpc_server *server, grpc_call_element *elem, request_matcher *request_matcher) { - requested_call *rc; call_data *calld = elem->call_data; - gpr_mu_lock(&server->mu_call); - rc = request_matcher->requests; - if (rc == NULL) { + int request_id; + + request_id = gpr_stack_lockfree_pop(request_matcher->requests); + if (request_id == -1) { + gpr_mu_lock(&server->mu_call); gpr_mu_lock(&calld->mu_state); calld->state = PENDING; gpr_mu_unlock(&calld->mu_state); @@ -394,12 +413,10 @@ static void finish_start_new_rpc(grpc_server *server, grpc_call_element *elem, calld->pending_next = NULL; gpr_mu_unlock(&server->mu_call); } else { - request_matcher->requests = rc->next; gpr_mu_lock(&calld->mu_state); calld->state = ACTIVATED; gpr_mu_unlock(&calld->mu_state); - gpr_mu_unlock(&server->mu_call); - begin_call(server, calld, rc); + begin_call(server, calld, &server->requested_calls[request_id]); } } @@ -466,7 +483,7 @@ static int num_channels(grpc_server *server) { static void maybe_finish_shutdown(grpc_server *server) { size_t i; - if (!server->shutdown || server->shutdown_published) { + if (!gpr_atm_acq_load(&server->shutdown_flag) || server->shutdown_published) { return; } @@ -741,7 +758,17 @@ grpc_server *grpc_server_create_from_filters(grpc_channel_filter **filters, server->root_channel_data.next = server->root_channel_data.prev = &server->root_channel_data; - request_matcher_init(&server->unregistered_request_matcher); + /* TODO(ctiller): expose a channel_arg for this */ + server->max_requested_calls = 32768; + server->request_freelist = + gpr_stack_lockfree_create(server->max_requested_calls); + for (i = 0; i < (size_t)server->max_requested_calls; i++) { + gpr_stack_lockfree_push(server->request_freelist, i); + } + request_matcher_init(&server->unregistered_request_matcher, + server->max_requested_calls); + server->requested_calls = gpr_malloc(server->max_requested_calls * + sizeof(*server->requested_calls)); /* Server filter stack is: @@ -790,7 +817,7 @@ void *grpc_server_register_method(grpc_server *server, const char *method, } m = gpr_malloc(sizeof(registered_method)); memset(m, 0, sizeof(*m)); - request_matcher_init(&m->request_matcher); + request_matcher_init(&m->request_matcher, server->max_requested_calls); m->method = gpr_strdup(method); m->host = gpr_strdup(host); m->next = server->registered_methods; @@ -906,13 +933,49 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport, grpc_transport_perform_op(transport, &op); } +typedef struct { + requested_call **requests; + size_t count; + size_t capacity; +} request_killer; + +static void request_killer_init(request_killer *rk) { + memset(rk, 0, sizeof(*rk)); +} + +static void request_killer_add(request_killer *rk, requested_call *rc) { + if (rk->capacity == rk->count) { + rk->capacity = GPR_MAX(8, rk->capacity * 2); + rk->requests = + gpr_realloc(rk->requests, rk->capacity * sizeof(*rk->requests)); + } + rk->requests[rk->count++] = rc; +} + +static void request_killer_add_request_matcher(request_killer *rk, + grpc_server *server, + request_matcher *rm) { + int request_id; + while ((request_id = gpr_stack_lockfree_pop(rm->requests)) != -1) { + request_killer_add(rk, &server->requested_calls[request_id]); + } +} + +static void request_killer_run(request_killer *rk, grpc_server *server) { + size_t i; + for (i = 0; i < rk->count; i++) { + fail_call(server, rk->requests[i]); + } + gpr_free(rk->requests); +} + void grpc_server_shutdown_and_notify(grpc_server *server, grpc_completion_queue *cq, void *tag) { listener *l; - requested_call *requests = NULL; registered_method *rm; shutdown_tag *sdt; channel_broadcaster broadcaster; + request_killer reqkill; /* lock, and gather up some stuff to do */ gpr_mu_lock(&server->mu_global); @@ -923,7 +986,7 @@ void grpc_server_shutdown_and_notify(grpc_server *server, sdt = &server->shutdown_tags[server->num_shutdown_tags++]; sdt->tag = tag; sdt->cq = cq; - if (server->shutdown) { + if (gpr_atm_acq_load(&server->shutdown_flag)) { gpr_mu_unlock(&server->mu_global); return; } @@ -931,34 +994,26 @@ void grpc_server_shutdown_and_notify(grpc_server *server, server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME); channel_broadcaster_init(server, &broadcaster); + request_killer_init(&reqkill); /* collect all unregistered then registered calls */ gpr_mu_lock(&server->mu_call); - requests = server->unregistered_request_matcher.requests; - server->unregistered_request_matcher.requests = NULL; + request_killer_add_request_matcher(&reqkill, server, + &server->unregistered_request_matcher); request_matcher_zombify_all_pending_calls( &server->unregistered_request_matcher); for (rm = server->registered_methods; rm; rm = rm->next) { - while (rm->request_matcher.requests != NULL) { - requested_call *c = rm->request_matcher.requests; - rm->request_matcher.requests = c->next; - c->next = requests; - requests = c; - } + request_killer_add_request_matcher(&reqkill, server, &rm->request_matcher); request_matcher_zombify_all_pending_calls(&rm->request_matcher); } gpr_mu_unlock(&server->mu_call); - server->shutdown = 1; + gpr_atm_rel_store(&server->shutdown_flag, 1); maybe_finish_shutdown(server); gpr_mu_unlock(&server->mu_global); /* terminate all the requested calls */ - while (requests != NULL) { - requested_call *next = requests->next; - fail_call(server, requests); - requests = next; - } + request_killer_run(&reqkill, server); /* Shutdown listeners */ for (l = server->listeners; l; l = l->next) { @@ -990,7 +1045,7 @@ void grpc_server_destroy(grpc_server *server) { listener *l; gpr_mu_lock(&server->mu_global); - GPR_ASSERT(server->shutdown || !server->listeners); + GPR_ASSERT(gpr_atm_acq_load(&server->shutdown_flag) || !server->listeners); GPR_ASSERT(server->listeners_destroyed == num_listeners(server)); while (server->listeners) { @@ -1021,9 +1076,14 @@ static grpc_call_error queue_call_request(grpc_server *server, requested_call *rc) { call_data *calld = NULL; request_matcher *request_matcher = NULL; - gpr_mu_lock(&server->mu_call); - if (server->shutdown) { - gpr_mu_unlock(&server->mu_call); + int request_id; + if (gpr_atm_acq_load(&server->shutdown_flag)) { + fail_call(server, rc); + return GRPC_CALL_OK; + } + request_id = gpr_stack_lockfree_pop(server->request_freelist); + if (request_id == -1) { + /* out of request ids: just fail this one */ fail_call(server, rc); return GRPC_CALL_OK; } @@ -1035,32 +1095,35 @@ static grpc_call_error queue_call_request(grpc_server *server, request_matcher = &rc->data.registered.registered_method->request_matcher; break; } - if (request_matcher->pending_head != NULL) { - calld = request_matcher->pending_head; - request_matcher->pending_head = calld->pending_next; - } - if (calld != NULL) { - gpr_mu_unlock(&server->mu_call); - gpr_mu_lock(&calld->mu_state); - if (calld->state == ZOMBIED) { - gpr_mu_unlock(&calld->mu_state); - grpc_iomgr_closure_init( - &calld->kill_zombie_closure, kill_zombie, - grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); - grpc_iomgr_add_callback(&calld->kill_zombie_closure); - return queue_call_request(server, rc); /* retry */ + server->requested_calls[request_id] = *rc; + gpr_free(rc); + if (gpr_stack_lockfree_push(request_matcher->requests, request_id)) { + /* this was the first queued request: we need to lock and start + matching calls */ + gpr_mu_lock(&server->mu_call); + while ((calld = request_matcher->pending_head) != NULL) { + request_id = gpr_stack_lockfree_pop(request_matcher->requests); + if (request_id == -1) break; + request_matcher->pending_head = calld->pending_next; + gpr_mu_unlock(&server->mu_call); + gpr_mu_lock(&calld->mu_state); + if (calld->state == ZOMBIED) { + gpr_mu_unlock(&calld->mu_state); + grpc_iomgr_closure_init( + &calld->kill_zombie_closure, kill_zombie, + grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); + grpc_iomgr_add_callback(&calld->kill_zombie_closure); + } else { + GPR_ASSERT(calld->state == PENDING); + calld->state = ACTIVATED; + gpr_mu_unlock(&calld->mu_state); + begin_call(server, calld, rc); + } + gpr_mu_lock(&server->mu_call); } - GPR_ASSERT(calld->state == PENDING); - calld->state = ACTIVATED; - gpr_mu_unlock(&calld->mu_state); - begin_call(server, calld, rc); - return GRPC_CALL_OK; - } else { - rc->next = request_matcher->requests; - request_matcher->requests = rc; gpr_mu_unlock(&server->mu_call); - return GRPC_CALL_OK; } + return GRPC_CALL_OK; } grpc_call_error grpc_server_request_call( @@ -1078,6 +1141,7 @@ grpc_call_error grpc_server_request_call( } grpc_cq_begin_op(cq_for_notification); rc->type = BATCH_CALL; + rc->server = server; rc->tag = tag; rc->cq_bound_to_call = cq_bound_to_call; rc->cq_for_notification = cq_for_notification; @@ -1100,6 +1164,7 @@ grpc_call_error grpc_server_request_registered_call( } grpc_cq_begin_op(cq_for_notification); rc->type = REGISTERED_CALL; + rc->server = server; rc->tag = tag; rc->cq_bound_to_call = cq_bound_to_call; rc->cq_for_notification = cq_for_notification; @@ -1179,7 +1244,16 @@ static void begin_call(grpc_server *server, call_data *calld, } static void done_request_event(void *req, grpc_cq_completion *c) { - gpr_free(req); + requested_call *rc = req; + grpc_server *server = rc->server; + + if (rc >= server->requested_calls && + rc < server->requested_calls + server->max_requested_calls) { + gpr_stack_lockfree_push(server->request_freelist, + rc - server->requested_calls); + } else { + gpr_free(req); + } } static void fail_call(grpc_server *server, requested_call *rc) {