Skip to content
Snippets Groups Projects
Commit 6a006cee authored by Craig Tiller's avatar Craig Tiller
Browse files

First pass of lockless delivery of new calls

parent eef4c257
No related branches found
No related tags found
No related merge requests found
......@@ -36,20 +36,22 @@
#include <stdlib.h>
#include <string.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
#include "src/core/channel/census_filter.h"
#include "src/core/channel/channel_args.h"
#include "src/core/channel/connected_channel.h"
#include "src/core/iomgr/iomgr.h"
#include "src/core/support/stack_lockfree.h"
#include "src/core/support/string.h"
#include "src/core/surface/call.h"
#include "src/core/surface/channel.h"
#include "src/core/surface/completion_queue.h"
#include "src/core/surface/init.h"
#include "src/core/transport/metadata.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
typedef struct listener {
void *arg;
......@@ -72,8 +74,8 @@ typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type;
typedef struct requested_call {
requested_call_type type;
struct requested_call *next;
void *tag;
grpc_server *server;
grpc_completion_queue *cq_bound_to_call;
grpc_completion_queue *cq_for_notification;
grpc_call **call;
......@@ -161,7 +163,7 @@ struct call_data {
struct request_matcher {
call_data *pending_head;
call_data *pending_tail;
requested_call *requests;
gpr_stack_lockfree *requests;
};
struct registered_method {
......@@ -197,8 +199,13 @@ struct grpc_server {
registered_method *registered_methods;
request_matcher unregistered_request_matcher;
/** free list of available requested_calls indices */
gpr_stack_lockfree *request_freelist;
/** requested call backing data */
requested_call *requested_calls;
int max_requested_calls;
gpr_uint8 shutdown;
gpr_atm shutdown_flag;
gpr_uint8 shutdown_published;
size_t num_shutdown_tags;
shutdown_tag *shutdown_tags;
......@@ -290,8 +297,15 @@ static void channel_broadcaster_shutdown(channel_broadcaster *cb,
* request_matcher
*/
static void request_matcher_init(request_matcher *request_matcher) {
static void request_matcher_init(request_matcher *request_matcher,
int entries) {
memset(request_matcher, 0, sizeof(*request_matcher));
request_matcher->requests = gpr_stack_lockfree_create(entries);
}
static void request_matcher_destroy(request_matcher *request_matcher) {
GPR_ASSERT(gpr_stack_lockfree_pop(request_matcher->requests) == -1);
gpr_stack_lockfree_destroy(request_matcher->requests);
}
static void kill_zombie(void *elem, int success) {
......@@ -328,6 +342,7 @@ static void server_delete(grpc_server *server) {
gpr_free(server->channel_filters);
while ((rm = server->registered_methods) != NULL) {
server->registered_methods = rm->next;
request_matcher_destroy(&rm->request_matcher);
gpr_free(rm->method);
gpr_free(rm->host);
gpr_free(rm);
......@@ -335,9 +350,12 @@ static void server_delete(grpc_server *server) {
for (i = 0; i < server->cq_count; i++) {
GRPC_CQ_INTERNAL_UNREF(server->cqs[i], "server");
}
request_matcher_destroy(&server->unregistered_request_matcher);
gpr_stack_lockfree_destroy(server->request_freelist);
gpr_free(server->cqs);
gpr_free(server->pollsets);
gpr_free(server->shutdown_tags);
gpr_free(server->requested_calls);
gpr_free(server);
}
......@@ -377,11 +395,12 @@ static void destroy_channel(channel_data *chand) {
static void finish_start_new_rpc(grpc_server *server, grpc_call_element *elem,
request_matcher *request_matcher) {
requested_call *rc;
call_data *calld = elem->call_data;
gpr_mu_lock(&server->mu_call);
rc = request_matcher->requests;
if (rc == NULL) {
int request_id;
request_id = gpr_stack_lockfree_pop(request_matcher->requests);
if (request_id == -1) {
gpr_mu_lock(&server->mu_call);
gpr_mu_lock(&calld->mu_state);
calld->state = PENDING;
gpr_mu_unlock(&calld->mu_state);
......@@ -394,12 +413,10 @@ static void finish_start_new_rpc(grpc_server *server, grpc_call_element *elem,
calld->pending_next = NULL;
gpr_mu_unlock(&server->mu_call);
} else {
request_matcher->requests = rc->next;
gpr_mu_lock(&calld->mu_state);
calld->state = ACTIVATED;
gpr_mu_unlock(&calld->mu_state);
gpr_mu_unlock(&server->mu_call);
begin_call(server, calld, rc);
begin_call(server, calld, &server->requested_calls[request_id]);
}
}
......@@ -466,7 +483,7 @@ static int num_channels(grpc_server *server) {
static void maybe_finish_shutdown(grpc_server *server) {
size_t i;
if (!server->shutdown || server->shutdown_published) {
if (!gpr_atm_acq_load(&server->shutdown_flag) || server->shutdown_published) {
return;
}
......@@ -741,7 +758,17 @@ grpc_server *grpc_server_create_from_filters(grpc_channel_filter **filters,
server->root_channel_data.next = server->root_channel_data.prev =
&server->root_channel_data;
request_matcher_init(&server->unregistered_request_matcher);
/* TODO(ctiller): expose a channel_arg for this */
server->max_requested_calls = 32768;
server->request_freelist =
gpr_stack_lockfree_create(server->max_requested_calls);
for (i = 0; i < (size_t)server->max_requested_calls; i++) {
gpr_stack_lockfree_push(server->request_freelist, i);
}
request_matcher_init(&server->unregistered_request_matcher,
server->max_requested_calls);
server->requested_calls = gpr_malloc(server->max_requested_calls *
sizeof(*server->requested_calls));
/* Server filter stack is:
......@@ -790,7 +817,7 @@ void *grpc_server_register_method(grpc_server *server, const char *method,
}
m = gpr_malloc(sizeof(registered_method));
memset(m, 0, sizeof(*m));
request_matcher_init(&m->request_matcher);
request_matcher_init(&m->request_matcher, server->max_requested_calls);
m->method = gpr_strdup(method);
m->host = gpr_strdup(host);
m->next = server->registered_methods;
......@@ -906,13 +933,49 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport,
grpc_transport_perform_op(transport, &op);
}
typedef struct {
requested_call **requests;
size_t count;
size_t capacity;
} request_killer;
static void request_killer_init(request_killer *rk) {
memset(rk, 0, sizeof(*rk));
}
static void request_killer_add(request_killer *rk, requested_call *rc) {
if (rk->capacity == rk->count) {
rk->capacity = GPR_MAX(8, rk->capacity * 2);
rk->requests =
gpr_realloc(rk->requests, rk->capacity * sizeof(*rk->requests));
}
rk->requests[rk->count++] = rc;
}
static void request_killer_add_request_matcher(request_killer *rk,
grpc_server *server,
request_matcher *rm) {
int request_id;
while ((request_id = gpr_stack_lockfree_pop(rm->requests)) != -1) {
request_killer_add(rk, &server->requested_calls[request_id]);
}
}
static void request_killer_run(request_killer *rk, grpc_server *server) {
size_t i;
for (i = 0; i < rk->count; i++) {
fail_call(server, rk->requests[i]);
}
gpr_free(rk->requests);
}
void grpc_server_shutdown_and_notify(grpc_server *server,
grpc_completion_queue *cq, void *tag) {
listener *l;
requested_call *requests = NULL;
registered_method *rm;
shutdown_tag *sdt;
channel_broadcaster broadcaster;
request_killer reqkill;
/* lock, and gather up some stuff to do */
gpr_mu_lock(&server->mu_global);
......@@ -923,7 +986,7 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
sdt = &server->shutdown_tags[server->num_shutdown_tags++];
sdt->tag = tag;
sdt->cq = cq;
if (server->shutdown) {
if (gpr_atm_acq_load(&server->shutdown_flag)) {
gpr_mu_unlock(&server->mu_global);
return;
}
......@@ -931,34 +994,26 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);
channel_broadcaster_init(server, &broadcaster);
request_killer_init(&reqkill);
/* collect all unregistered then registered calls */
gpr_mu_lock(&server->mu_call);
requests = server->unregistered_request_matcher.requests;
server->unregistered_request_matcher.requests = NULL;
request_killer_add_request_matcher(&reqkill, server,
&server->unregistered_request_matcher);
request_matcher_zombify_all_pending_calls(
&server->unregistered_request_matcher);
for (rm = server->registered_methods; rm; rm = rm->next) {
while (rm->request_matcher.requests != NULL) {
requested_call *c = rm->request_matcher.requests;
rm->request_matcher.requests = c->next;
c->next = requests;
requests = c;
}
request_killer_add_request_matcher(&reqkill, server, &rm->request_matcher);
request_matcher_zombify_all_pending_calls(&rm->request_matcher);
}
gpr_mu_unlock(&server->mu_call);
server->shutdown = 1;
gpr_atm_rel_store(&server->shutdown_flag, 1);
maybe_finish_shutdown(server);
gpr_mu_unlock(&server->mu_global);
/* terminate all the requested calls */
while (requests != NULL) {
requested_call *next = requests->next;
fail_call(server, requests);
requests = next;
}
request_killer_run(&reqkill, server);
/* Shutdown listeners */
for (l = server->listeners; l; l = l->next) {
......@@ -990,7 +1045,7 @@ void grpc_server_destroy(grpc_server *server) {
listener *l;
gpr_mu_lock(&server->mu_global);
GPR_ASSERT(server->shutdown || !server->listeners);
GPR_ASSERT(gpr_atm_acq_load(&server->shutdown_flag) || !server->listeners);
GPR_ASSERT(server->listeners_destroyed == num_listeners(server));
while (server->listeners) {
......@@ -1021,9 +1076,14 @@ static grpc_call_error queue_call_request(grpc_server *server,
requested_call *rc) {
call_data *calld = NULL;
request_matcher *request_matcher = NULL;
gpr_mu_lock(&server->mu_call);
if (server->shutdown) {
gpr_mu_unlock(&server->mu_call);
int request_id;
if (gpr_atm_acq_load(&server->shutdown_flag)) {
fail_call(server, rc);
return GRPC_CALL_OK;
}
request_id = gpr_stack_lockfree_pop(server->request_freelist);
if (request_id == -1) {
/* out of request ids: just fail this one */
fail_call(server, rc);
return GRPC_CALL_OK;
}
......@@ -1035,32 +1095,35 @@ static grpc_call_error queue_call_request(grpc_server *server,
request_matcher = &rc->data.registered.registered_method->request_matcher;
break;
}
if (request_matcher->pending_head != NULL) {
calld = request_matcher->pending_head;
request_matcher->pending_head = calld->pending_next;
}
if (calld != NULL) {
gpr_mu_unlock(&server->mu_call);
gpr_mu_lock(&calld->mu_state);
if (calld->state == ZOMBIED) {
gpr_mu_unlock(&calld->mu_state);
grpc_iomgr_closure_init(
&calld->kill_zombie_closure, kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
grpc_iomgr_add_callback(&calld->kill_zombie_closure);
return queue_call_request(server, rc); /* retry */
server->requested_calls[request_id] = *rc;
gpr_free(rc);
if (gpr_stack_lockfree_push(request_matcher->requests, request_id)) {
/* this was the first queued request: we need to lock and start
matching calls */
gpr_mu_lock(&server->mu_call);
while ((calld = request_matcher->pending_head) != NULL) {
request_id = gpr_stack_lockfree_pop(request_matcher->requests);
if (request_id == -1) break;
request_matcher->pending_head = calld->pending_next;
gpr_mu_unlock(&server->mu_call);
gpr_mu_lock(&calld->mu_state);
if (calld->state == ZOMBIED) {
gpr_mu_unlock(&calld->mu_state);
grpc_iomgr_closure_init(
&calld->kill_zombie_closure, kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
grpc_iomgr_add_callback(&calld->kill_zombie_closure);
} else {
GPR_ASSERT(calld->state == PENDING);
calld->state = ACTIVATED;
gpr_mu_unlock(&calld->mu_state);
begin_call(server, calld, rc);
}
gpr_mu_lock(&server->mu_call);
}
GPR_ASSERT(calld->state == PENDING);
calld->state = ACTIVATED;
gpr_mu_unlock(&calld->mu_state);
begin_call(server, calld, rc);
return GRPC_CALL_OK;
} else {
rc->next = request_matcher->requests;
request_matcher->requests = rc;
gpr_mu_unlock(&server->mu_call);
return GRPC_CALL_OK;
}
return GRPC_CALL_OK;
}
grpc_call_error grpc_server_request_call(
......@@ -1078,6 +1141,7 @@ grpc_call_error grpc_server_request_call(
}
grpc_cq_begin_op(cq_for_notification);
rc->type = BATCH_CALL;
rc->server = server;
rc->tag = tag;
rc->cq_bound_to_call = cq_bound_to_call;
rc->cq_for_notification = cq_for_notification;
......@@ -1100,6 +1164,7 @@ grpc_call_error grpc_server_request_registered_call(
}
grpc_cq_begin_op(cq_for_notification);
rc->type = REGISTERED_CALL;
rc->server = server;
rc->tag = tag;
rc->cq_bound_to_call = cq_bound_to_call;
rc->cq_for_notification = cq_for_notification;
......@@ -1179,7 +1244,16 @@ static void begin_call(grpc_server *server, call_data *calld,
}
static void done_request_event(void *req, grpc_cq_completion *c) {
gpr_free(req);
requested_call *rc = req;
grpc_server *server = rc->server;
if (rc >= server->requested_calls &&
rc < server->requested_calls + server->max_requested_calls) {
gpr_stack_lockfree_push(server->request_freelist,
rc - server->requested_calls);
} else {
gpr_free(req);
}
}
static void fail_call(grpc_server *server, requested_call *rc) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment