Skip to content
Snippets Groups Projects
Commit b76d05b8 authored by Craig Tiller's avatar Craig Tiller
Browse files

Cleanup server shutdown some

parent 553eae3d
No related branches found
No related tags found
No related merge requests found
......@@ -113,6 +113,7 @@ typedef struct channel_registered_method {
struct channel_data {
grpc_server *server;
size_t num_calls;
grpc_channel *channel;
grpc_mdstr *path_key;
grpc_mdstr *authority_key;
......@@ -390,7 +391,10 @@ static int num_listeners(grpc_server *server) {
static void maybe_finish_shutdown(grpc_server *server) {
size_t i;
if (server->shutdown && !server->shutdown_published && server->lists[ALL_CALLS] == NULL && server->listeners_destroyed == num_listeners(server)) {
if (server->shutdown && !server->shutdown_published &&
server->root_channel_data.next == &server->root_channel_data &&
server->lists[ALL_CALLS] == NULL &&
server->listeners_destroyed == num_listeners(server)) {
server->shutdown_published = 1;
for (i = 0; i < server->num_shutdown_tags; i++) {
grpc_cq_end_op(server->shutdown_tags[i].cq, server->shutdown_tags[i].tag,
......@@ -520,20 +524,44 @@ static void channel_op(grpc_channel_element *elem,
}
}
static void finish_shutdown_channel(void *cd, int success) {
channel_data *chand = cd;
typedef struct {
channel_data *chand;
int send_goaway;
int send_disconnect;
} shutdown_channel_args;
static void finish_shutdown_channel(void *p, int success) {
shutdown_channel_args *sca = p;
grpc_channel_op op;
op.type = GRPC_CHANNEL_DISCONNECT;
op.dir = GRPC_CALL_DOWN;
channel_op(grpc_channel_stack_element(
grpc_channel_get_channel_stack(chand->channel), 0),
NULL, &op);
grpc_channel_internal_unref(chand->channel);
if (sca->send_goaway) {
op.type = GRPC_CHANNEL_GOAWAY;
op.dir = GRPC_CALL_DOWN;
op.data.goaway.status = GRPC_STATUS_OK;
op.data.goaway.message = gpr_slice_from_copied_string("Server shutdown");
channel_op(grpc_channel_stack_element(
grpc_channel_get_channel_stack(sca->chand->channel), 0),
NULL, &op);
}
if (sca->send_disconnect) {
op.type = GRPC_CHANNEL_DISCONNECT;
op.dir = GRPC_CALL_DOWN;
channel_op(grpc_channel_stack_element(
grpc_channel_get_channel_stack(sca->chand->channel), 0),
NULL, &op);
}
grpc_channel_internal_unref(sca->chand->channel);
}
static void shutdown_channel(channel_data *chand) {
static void shutdown_channel(channel_data *chand, int send_goaway,
int send_disconnect) {
shutdown_channel_args *sca;
grpc_channel_internal_ref(chand->channel);
grpc_iomgr_add_callback(finish_shutdown_channel, chand);
sca = gpr_malloc(sizeof(shutdown_channel_args));
sca->chand = chand;
sca->send_goaway = send_goaway;
sca->send_disconnect = send_disconnect;
grpc_iomgr_add_callback(finish_shutdown_channel, sca);
}
static void init_call_elem(grpc_call_element *elem,
......@@ -547,6 +575,7 @@ static void init_call_elem(grpc_call_element *elem,
gpr_mu_lock(&chand->server->mu);
call_list_join(&chand->server->lists[ALL_CALLS], calld, ALL_CALLS);
chand->num_calls++;
gpr_mu_unlock(&chand->server->mu);
server_ref(chand->server);
......@@ -564,6 +593,10 @@ static void destroy_call_elem(grpc_call_element *elem) {
for (i = 0; i < CALL_LIST_COUNT; i++) {
removed[i] = call_list_remove(elem->call_data, i);
}
chand->num_calls--;
if (0 == chand->num_calls && chand->server->shutdown) {
shutdown_channel(chand, 0, 1);
}
if (removed[ALL_CALLS]) {
maybe_finish_shutdown(chand->server);
}
......@@ -587,6 +620,7 @@ static void init_channel_elem(grpc_channel_element *elem,
GPR_ASSERT(is_first);
GPR_ASSERT(!is_last);
chand->server = NULL;
chand->num_calls = 0;
chand->channel = NULL;
chand->path_key = grpc_mdstr_from_string(metadata_context, ":path");
chand->authority_key = grpc_mdstr_from_string(metadata_context, ":authority");
......@@ -613,6 +647,7 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
chand->next->prev = chand->prev;
chand->prev->next = chand->next;
chand->next = chand->prev = chand;
maybe_finish_shutdown(chand->server);
gpr_mu_unlock(&chand->server->mu);
grpc_mdstr_unref(chand->path_key);
grpc_mdstr_unref(chand->authority_key);
......@@ -621,9 +656,15 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
static const grpc_channel_filter server_surface_filter = {
server_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
destroy_call_elem, sizeof(channel_data), init_channel_elem,
destroy_channel_elem, "server",
server_start_transport_op,
channel_op,
sizeof(call_data),
init_call_elem,
destroy_call_elem,
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
"server",
};
void grpc_server_register_completion_queue(grpc_server *server,
......@@ -691,7 +732,8 @@ void *grpc_server_register_method(grpc_server *server, const char *method,
const char *host) {
registered_method *m;
if (!method) {
gpr_log(GPR_ERROR, "grpc_server_register_method method string cannot be NULL");
gpr_log(GPR_ERROR,
"grpc_server_register_method method string cannot be NULL");
return NULL;
}
for (m = server->registered_methods; m; m = m->next) {
......@@ -814,12 +856,8 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
grpc_completion_queue *cq, void *tag) {
listener *l;
requested_call_array requested_calls;
channel_data **channels;
channel_data *c;
size_t nchannels;
size_t i;
grpc_channel_op op;
grpc_channel_element *elem;
registered_method *rm;
shutdown_tag *sdt;
......@@ -837,18 +875,9 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
return;
}
nchannels = 0;
for (c = server->root_channel_data.next; c != &server->root_channel_data;
c = c->next) {
nchannels++;
}
channels = gpr_malloc(sizeof(channel_data *) * nchannels);
i = 0;
for (c = server->root_channel_data.next; c != &server->root_channel_data;
c = c->next) {
grpc_channel_internal_ref(c->channel);
channels[i] = c;
i++;
shutdown_channel(c, 1, c->num_calls == 0);
}
/* collect all unregistered then registered calls */
......@@ -875,21 +904,6 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
maybe_finish_shutdown(server);
gpr_mu_unlock(&server->mu);
for (i = 0; i < nchannels; i++) {
c = channels[i];
elem = grpc_channel_stack_element(
grpc_channel_get_channel_stack(c->channel), 0);
op.type = GRPC_CHANNEL_GOAWAY;
op.dir = GRPC_CALL_DOWN;
op.data.goaway.status = GRPC_STATUS_OK;
op.data.goaway.message = gpr_slice_from_copied_string("Server shutdown");
elem->filter->channel_op(elem, NULL, &op);
grpc_channel_internal_unref(c->channel);
}
gpr_free(channels);
/* terminate all the requested calls */
for (i = 0; i < requested_calls.count; i++) {
fail_call(server, &requested_calls.calls[i]);
......@@ -931,7 +945,9 @@ void grpc_server_cancel_all_calls(grpc_server *server) {
call_count = 0;
calls = gpr_malloc(sizeof(grpc_call *) * call_capacity);
for (calld = server->lists[ALL_CALLS]; calld != server->lists[ALL_CALLS] || is_first; calld = calld->links[ALL_CALLS].next) {
for (calld = server->lists[ALL_CALLS];
calld != server->lists[ALL_CALLS] || is_first;
calld = calld->links[ALL_CALLS].next) {
if (call_count == call_capacity) {
call_capacity *= 2;
calls = gpr_realloc(calls, sizeof(grpc_call *) * call_capacity);
......@@ -944,7 +960,8 @@ void grpc_server_cancel_all_calls(grpc_server *server) {
gpr_mu_unlock(&server->mu);
for (i = 0; i < call_count; i++) {
grpc_call_cancel_with_status(calls[i], GRPC_STATUS_UNAVAILABLE, "Unavailable");
grpc_call_cancel_with_status(calls[i], GRPC_STATUS_UNAVAILABLE,
"Unavailable");
GRPC_CALL_INTERNAL_UNREF(calls[i], "cancel_all", 1);
}
......@@ -952,9 +969,7 @@ void grpc_server_cancel_all_calls(grpc_server *server) {
}
void grpc_server_destroy(grpc_server *server) {
channel_data *c;
listener *l;
call_data *calld;
gpr_mu_lock(&server->mu);
GPR_ASSERT(server->shutdown);
......@@ -965,19 +980,6 @@ void grpc_server_destroy(grpc_server *server) {
server->listeners = l->next;
gpr_free(l);
}
while ((calld = call_list_remove_head(&server->lists[PENDING_START],
PENDING_START)) != NULL) {
calld->state = ZOMBIED;
grpc_iomgr_add_callback(
kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
}
for (c = server->root_channel_data.next; c != &server->root_channel_data;
c = c->next) {
shutdown_channel(c);
}
gpr_mu_unlock(&server->mu);
server_unref(server);
......@@ -1166,4 +1168,3 @@ int grpc_server_has_open_connections(grpc_server *server) {
gpr_mu_unlock(&server->mu);
return r;
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment