Skip to content
Snippets Groups Projects
Commit 8851269f authored by Craig Tiller's avatar Craig Tiller
Browse files

Fix head-of-line blocking in server

parent 2839edd2
No related branches found
No related tags found
No related merge requests found
......@@ -95,7 +95,6 @@ typedef struct requested_call {
grpc_byte_buffer **optional_payload;
} registered;
} data;
grpc_closure publish;
} requested_call;
typedef struct channel_registered_method {
......@@ -156,15 +155,21 @@ struct call_data {
bool recv_idempotent_request;
grpc_metadata_array initial_metadata;
request_matcher *request_matcher;
grpc_byte_buffer *payload;
grpc_closure got_initial_metadata;
grpc_closure server_on_recv_initial_metadata;
grpc_closure kill_zombie_closure;
grpc_closure *on_done_recv_initial_metadata;
grpc_closure publish;
call_data *pending_next;
};
struct request_matcher {
grpc_server *server;
call_data *pending_head;
call_data *pending_tail;
gpr_stack_lockfree *requests;
......@@ -227,8 +232,7 @@ struct grpc_server {
#define SERVER_FROM_CALL_ELEM(elem) \
(((channel_data *)(elem)->channel_data)->server)
static void begin_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
call_data *calld, requested_call *rc);
static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *calld, bool success);
static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
requested_call *rc);
/* Before calling maybe_finish_shutdown, we must hold mu_global and not
......@@ -304,8 +308,10 @@ static void channel_broadcaster_shutdown(grpc_exec_ctx *exec_ctx,
* request_matcher
*/
static void request_matcher_init(request_matcher *rm, size_t entries) {
static void request_matcher_init(request_matcher *rm, size_t entries,
grpc_server *server) {
memset(rm, 0, sizeof(*rm));
rm->server = server;
rm->requests = gpr_stack_lockfree_create(entries);
}
......@@ -418,9 +424,6 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand) {
&op);
}
static void publish_registered_or_batch(grpc_exec_ctx *exec_ctx,
void *user_data, bool success);
static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) {
gpr_slice slice = value->slice;
size_t len = GPR_SLICE_LENGTH(slice);
......@@ -432,22 +435,28 @@ static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) {
memcpy(*dest, grpc_mdstr_as_c_string(value), len + 1);
}
static void begin_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
call_data *calld, requested_call *rc) {
grpc_op ops[1];
grpc_op *op = ops;
static void done_request_event(grpc_exec_ctx *exec_ctx, void *req,
grpc_cq_completion *c) {
requested_call *rc = req;
grpc_server *server = rc->server;
memset(ops, 0, sizeof(ops));
if (rc >= server->requested_calls &&
rc < server->requested_calls + server->max_requested_calls) {
GPR_ASSERT(rc - server->requested_calls <= INT_MAX);
gpr_stack_lockfree_push(server->request_freelist,
(int)(rc - server->requested_calls));
} else {
gpr_free(req);
}
/* called once initial metadata has been read by the call, but BEFORE
the ioreq to fetch it out of the call has been executed.
This means metadata related fields can be relied on in calld, but to
fill in the metadata array passed by the client, we need to perform
an ioreq op, that should complete immediately. */
server_unref(exec_ctx, server);
}
static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
call_data *calld, requested_call *rc) {
grpc_call_set_completion_queue(exec_ctx, calld->call, rc->cq_bound_to_call);
grpc_closure_init(&rc->publish, publish_registered_or_batch, rc);
*rc->call = calld->call;
grpc_call *call = calld->call;
*rc->call = call;
calld->cq_new = rc->cq_for_notification;
GPR_SWAP(grpc_metadata_array, *rc->initial_metadata, calld->initial_metadata);
switch (rc->type) {
......@@ -467,35 +476,38 @@ static void begin_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
case REGISTERED_CALL:
*rc->data.registered.deadline = calld->deadline;
if (rc->data.registered.optional_payload) {
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message = rc->data.registered.optional_payload;
op++;
*rc->data.registered.optional_payload = calld->payload;
}
break;
default:
GPR_UNREACHABLE_CODE(return );
}
GRPC_CALL_INTERNAL_REF(calld->call, "server");
grpc_call_start_batch_and_execute(exec_ctx, calld->call, ops,
(size_t)(op - ops), &rc->publish);
grpc_call_element *elem =
grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
channel_data *chand = elem->channel_data;
server_ref(chand->server);
grpc_cq_end_op(exec_ctx, calld->cq_new, rc->tag, true, done_request_event, rc,
&rc->completion);
}
static void finish_start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_server *server,
grpc_call_element *elem, request_matcher *rm) {
call_data *calld = elem->call_data;
int request_id;
static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
call_data *calld = arg;
request_matcher *rm = calld->request_matcher;
grpc_server *server = rm->server;
if (gpr_atm_acq_load(&server->shutdown_flag)) {
if (!success || gpr_atm_acq_load(&server->shutdown_flag)) {
gpr_mu_lock(&calld->mu_state);
calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
grpc_closure_init(
&calld->kill_zombie_closure, kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, true, NULL);
return;
}
request_id = gpr_stack_lockfree_pop(rm->requests);
int request_id = gpr_stack_lockfree_pop(rm->requests);
if (request_id == -1) {
gpr_mu_lock(&server->mu_call);
gpr_mu_lock(&calld->mu_state);
......@@ -513,7 +525,41 @@ static void finish_start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_server *server,
gpr_mu_lock(&calld->mu_state);
calld->state = ACTIVATED;
gpr_mu_unlock(&calld->mu_state);
begin_call(exec_ctx, server, calld, &server->requested_calls[request_id]);
publish_call(exec_ctx, server, calld, &server->requested_calls[request_id]);
}
}
static void finish_start_new_rpc(
grpc_exec_ctx *exec_ctx, grpc_server *server, grpc_call_element *elem,
request_matcher *rm,
grpc_server_register_method_payload_handling payload_handling) {
call_data *calld = elem->call_data;
if (gpr_atm_acq_load(&server->shutdown_flag)) {
gpr_mu_lock(&calld->mu_state);
calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, true, NULL);
return;
}
calld->request_matcher = rm;
switch (payload_handling) {
case GRPC_SRM_PAYLOAD_NONE:
publish_new_rpc(exec_ctx, calld, true);
break;
case GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER: {
grpc_op op;
memset(&op, 0, sizeof(op));
op.op = GRPC_OP_RECV_MESSAGE;
op.data.recv_message = &calld->payload;
grpc_closure_init(&calld->publish, publish_new_rpc, calld);
grpc_call_start_batch_and_execute(exec_ctx, calld->call, &op, 1,
&calld->publish);
break;
}
}
}
......@@ -539,7 +585,8 @@ static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
!calld->recv_idempotent_request)
continue;
finish_start_new_rpc(exec_ctx, server, elem,
&rm->server_registered_method->request_matcher);
&rm->server_registered_method->request_matcher,
rm->server_registered_method->payload_handling);
return;
}
/* check for a wildcard method definition (no host set) */
......@@ -554,12 +601,14 @@ static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
!calld->recv_idempotent_request)
continue;
finish_start_new_rpc(exec_ctx, server, elem,
&rm->server_registered_method->request_matcher);
&rm->server_registered_method->request_matcher,
rm->server_registered_method->payload_handling);
return;
}
}
finish_start_new_rpc(exec_ctx, server, elem,
&server->unregistered_request_matcher);
&server->unregistered_request_matcher,
GRPC_SRM_PAYLOAD_NONE);
}
static int num_listeners(grpc_server *server) {
......@@ -888,7 +937,7 @@ grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) {
gpr_stack_lockfree_push(server->request_freelist, (int)i);
}
request_matcher_init(&server->unregistered_request_matcher,
server->max_requested_calls);
server->max_requested_calls, server);
server->requested_calls = gpr_malloc(server->max_requested_calls *
sizeof(*server->requested_calls));
......@@ -932,7 +981,8 @@ void *grpc_server_register_method(
}
m = gpr_malloc(sizeof(registered_method));
memset(m, 0, sizeof(*m));
request_matcher_init(&m->request_matcher, server->max_requested_calls);
request_matcher_init(&m->request_matcher, server->max_requested_calls,
server);
m->method = gpr_strdup(method);
m->host = gpr_strdup(host);
m->next = server->registered_methods;
......@@ -1210,8 +1260,8 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(calld->state == PENDING);
calld->state = ACTIVATED;
gpr_mu_unlock(&calld->mu_state);
begin_call(exec_ctx, server, calld,
&server->requested_calls[request_id]);
publish_call(exec_ctx, server, calld,
&server->requested_calls[request_id]);
}
gpr_mu_lock(&server->mu_call);
}
......@@ -1299,23 +1349,6 @@ done:
return error;
}
static void done_request_event(grpc_exec_ctx *exec_ctx, void *req,
grpc_cq_completion *c) {
requested_call *rc = req;
grpc_server *server = rc->server;
if (rc >= server->requested_calls &&
rc < server->requested_calls + server->max_requested_calls) {
GPR_ASSERT(rc - server->requested_calls <= INT_MAX);
gpr_stack_lockfree_push(server->request_freelist,
(int)(rc - server->requested_calls));
} else {
gpr_free(req);
}
server_unref(exec_ctx, server);
}
static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
requested_call *rc) {
*rc->call = NULL;
......@@ -1326,20 +1359,6 @@ static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
done_request_event, rc, &rc->completion);
}
static void publish_registered_or_batch(grpc_exec_ctx *exec_ctx, void *prc,
bool success) {
requested_call *rc = prc;
grpc_call *call = *rc->call;
grpc_call_element *elem =
grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
server_ref(chand->server);
grpc_cq_end_op(exec_ctx, calld->cq_new, rc->tag, success, done_request_event,
rc, &rc->completion);
GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "server");
}
const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) {
return server->channel_args;
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment