diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 667e601c46b37be6ae936e2cb0efc72855544575..ce295ea71caf00827992fcb8fff270cc54c69c16 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -113,6 +113,7 @@ typedef struct channel_registered_method { struct channel_data { grpc_server *server; + size_t num_calls; grpc_channel *channel; grpc_mdstr *path_key; grpc_mdstr *authority_key; @@ -390,7 +391,10 @@ static int num_listeners(grpc_server *server) { static void maybe_finish_shutdown(grpc_server *server) { size_t i; - if (server->shutdown && !server->shutdown_published && server->lists[ALL_CALLS] == NULL && server->listeners_destroyed == num_listeners(server)) { + if (server->shutdown && !server->shutdown_published && + server->root_channel_data.next == &server->root_channel_data && + server->lists[ALL_CALLS] == NULL && + server->listeners_destroyed == num_listeners(server)) { server->shutdown_published = 1; for (i = 0; i < server->num_shutdown_tags; i++) { grpc_cq_end_op(server->shutdown_tags[i].cq, server->shutdown_tags[i].tag, @@ -520,20 +524,44 @@ static void channel_op(grpc_channel_element *elem, } } -static void finish_shutdown_channel(void *cd, int success) { - channel_data *chand = cd; +typedef struct { + channel_data *chand; + int send_goaway; + int send_disconnect; +} shutdown_channel_args; + +static void finish_shutdown_channel(void *p, int success) { + shutdown_channel_args *sca = p; grpc_channel_op op; - op.type = GRPC_CHANNEL_DISCONNECT; - op.dir = GRPC_CALL_DOWN; - channel_op(grpc_channel_stack_element( - grpc_channel_get_channel_stack(chand->channel), 0), - NULL, &op); - grpc_channel_internal_unref(chand->channel); + + if (sca->send_goaway) { + op.type = GRPC_CHANNEL_GOAWAY; + op.dir = GRPC_CALL_DOWN; + op.data.goaway.status = GRPC_STATUS_OK; + op.data.goaway.message = gpr_slice_from_copied_string("Server shutdown"); + channel_op(grpc_channel_stack_element( + grpc_channel_get_channel_stack(sca->chand->channel), 0), + NULL, &op); + } + if (sca->send_disconnect) { + op.type = GRPC_CHANNEL_DISCONNECT; + op.dir = GRPC_CALL_DOWN; + channel_op(grpc_channel_stack_element( + grpc_channel_get_channel_stack(sca->chand->channel), 0), + NULL, &op); + } + grpc_channel_internal_unref(sca->chand->channel); } -static void shutdown_channel(channel_data *chand) { +static void shutdown_channel(channel_data *chand, int send_goaway, + int send_disconnect) { + shutdown_channel_args *sca; grpc_channel_internal_ref(chand->channel); - grpc_iomgr_add_callback(finish_shutdown_channel, chand); + sca = gpr_malloc(sizeof(shutdown_channel_args)); + sca->chand = chand; + sca->send_goaway = send_goaway; + sca->send_disconnect = send_disconnect; + grpc_iomgr_add_callback(finish_shutdown_channel, sca); } static void init_call_elem(grpc_call_element *elem, @@ -547,6 +575,7 @@ static void init_call_elem(grpc_call_element *elem, gpr_mu_lock(&chand->server->mu); call_list_join(&chand->server->lists[ALL_CALLS], calld, ALL_CALLS); + chand->num_calls++; gpr_mu_unlock(&chand->server->mu); server_ref(chand->server); @@ -564,6 +593,10 @@ static void destroy_call_elem(grpc_call_element *elem) { for (i = 0; i < CALL_LIST_COUNT; i++) { removed[i] = call_list_remove(elem->call_data, i); } + chand->num_calls--; + if (0 == chand->num_calls && chand->server->shutdown) { + shutdown_channel(chand, 0, 1); + } if (removed[ALL_CALLS]) { maybe_finish_shutdown(chand->server); } @@ -587,6 +620,7 @@ static void init_channel_elem(grpc_channel_element *elem, GPR_ASSERT(is_first); GPR_ASSERT(!is_last); chand->server = NULL; + chand->num_calls = 0; chand->channel = NULL; chand->path_key = grpc_mdstr_from_string(metadata_context, ":path"); chand->authority_key = grpc_mdstr_from_string(metadata_context, ":authority"); @@ -613,6 +647,7 @@ static void destroy_channel_elem(grpc_channel_element *elem) { chand->next->prev = chand->prev; chand->prev->next = chand->next; chand->next = chand->prev = chand; + maybe_finish_shutdown(chand->server); gpr_mu_unlock(&chand->server->mu); grpc_mdstr_unref(chand->path_key); grpc_mdstr_unref(chand->authority_key); @@ -621,9 +656,15 @@ static void destroy_channel_elem(grpc_channel_element *elem) { } static const grpc_channel_filter server_surface_filter = { - server_start_transport_op, channel_op, sizeof(call_data), init_call_elem, - destroy_call_elem, sizeof(channel_data), init_channel_elem, - destroy_channel_elem, "server", + server_start_transport_op, + channel_op, + sizeof(call_data), + init_call_elem, + destroy_call_elem, + sizeof(channel_data), + init_channel_elem, + destroy_channel_elem, + "server", }; void grpc_server_register_completion_queue(grpc_server *server, @@ -691,7 +732,8 @@ void *grpc_server_register_method(grpc_server *server, const char *method, const char *host) { registered_method *m; if (!method) { - gpr_log(GPR_ERROR, "grpc_server_register_method method string cannot be NULL"); + gpr_log(GPR_ERROR, + "grpc_server_register_method method string cannot be NULL"); return NULL; } for (m = server->registered_methods; m; m = m->next) { @@ -814,12 +856,8 @@ void grpc_server_shutdown_and_notify(grpc_server *server, grpc_completion_queue *cq, void *tag) { listener *l; requested_call_array requested_calls; - channel_data **channels; channel_data *c; - size_t nchannels; size_t i; - grpc_channel_op op; - grpc_channel_element *elem; registered_method *rm; shutdown_tag *sdt; @@ -837,18 +875,9 @@ void grpc_server_shutdown_and_notify(grpc_server *server, return; } - nchannels = 0; - for (c = server->root_channel_data.next; c != &server->root_channel_data; - c = c->next) { - nchannels++; - } - channels = gpr_malloc(sizeof(channel_data *) * nchannels); - i = 0; for (c = server->root_channel_data.next; c != &server->root_channel_data; c = c->next) { - grpc_channel_internal_ref(c->channel); - channels[i] = c; - i++; + shutdown_channel(c, 1, c->num_calls == 0); } /* collect all unregistered then registered calls */ @@ -875,21 +904,6 @@ void grpc_server_shutdown_and_notify(grpc_server *server, maybe_finish_shutdown(server); gpr_mu_unlock(&server->mu); - for (i = 0; i < nchannels; i++) { - c = channels[i]; - elem = grpc_channel_stack_element( - grpc_channel_get_channel_stack(c->channel), 0); - - op.type = GRPC_CHANNEL_GOAWAY; - op.dir = GRPC_CALL_DOWN; - op.data.goaway.status = GRPC_STATUS_OK; - op.data.goaway.message = gpr_slice_from_copied_string("Server shutdown"); - elem->filter->channel_op(elem, NULL, &op); - - grpc_channel_internal_unref(c->channel); - } - gpr_free(channels); - /* terminate all the requested calls */ for (i = 0; i < requested_calls.count; i++) { fail_call(server, &requested_calls.calls[i]); @@ -931,7 +945,9 @@ void grpc_server_cancel_all_calls(grpc_server *server) { call_count = 0; calls = gpr_malloc(sizeof(grpc_call *) * call_capacity); - for (calld = server->lists[ALL_CALLS]; calld != server->lists[ALL_CALLS] || is_first; calld = calld->links[ALL_CALLS].next) { + for (calld = server->lists[ALL_CALLS]; + calld != server->lists[ALL_CALLS] || is_first; + calld = calld->links[ALL_CALLS].next) { if (call_count == call_capacity) { call_capacity *= 2; calls = gpr_realloc(calls, sizeof(grpc_call *) * call_capacity); @@ -944,7 +960,8 @@ void grpc_server_cancel_all_calls(grpc_server *server) { gpr_mu_unlock(&server->mu); for (i = 0; i < call_count; i++) { - grpc_call_cancel_with_status(calls[i], GRPC_STATUS_UNAVAILABLE, "Unavailable"); + grpc_call_cancel_with_status(calls[i], GRPC_STATUS_UNAVAILABLE, + "Unavailable"); GRPC_CALL_INTERNAL_UNREF(calls[i], "cancel_all", 1); } @@ -952,9 +969,7 @@ void grpc_server_cancel_all_calls(grpc_server *server) { } void grpc_server_destroy(grpc_server *server) { - channel_data *c; listener *l; - call_data *calld; gpr_mu_lock(&server->mu); GPR_ASSERT(server->shutdown); @@ -965,19 +980,6 @@ void grpc_server_destroy(grpc_server *server) { server->listeners = l->next; gpr_free(l); } - - while ((calld = call_list_remove_head(&server->lists[PENDING_START], - PENDING_START)) != NULL) { - calld->state = ZOMBIED; - grpc_iomgr_add_callback( - kill_zombie, - grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); - } - - for (c = server->root_channel_data.next; c != &server->root_channel_data; - c = c->next) { - shutdown_channel(c); - } gpr_mu_unlock(&server->mu); server_unref(server); @@ -1166,4 +1168,3 @@ int grpc_server_has_open_connections(grpc_server *server) { gpr_mu_unlock(&server->mu); return r; } -