Skip to content
Snippets Groups Projects
Commit bce999f5 authored by Craig Tiller's avatar Craig Tiller
Browse files

Refine shutdown API

parent afa2d634
No related branches found
No related tags found
No related merge requests found
Showing
with 42 additions and 26 deletions
......@@ -77,6 +77,7 @@ class Server GRPC_FINAL : public GrpcLibrary,
class SyncRequest;
class AsyncRequest;
class ShutdownRequest;
// ServerBuilder use only
Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
......
......@@ -491,7 +491,8 @@ void grpc_server_start(grpc_server *server);
Send a GRPC_OP_COMPLETE event when there are no more calls being serviced.
Shutdown is idempotent, and all tags will be notified at once if multiple
grpc_server_shutdown_and_notify calls are made. */
void grpc_server_shutdown_and_notify(grpc_server *server, void *tag);
void grpc_server_shutdown_and_notify(grpc_server *server,
grpc_completion_queue *cq, void *tag);
/* Cancel all in-progress calls.
Only usable after shutdown. */
......
......@@ -150,7 +150,6 @@ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call,
int success) {
event *ev;
int shutdown = 0;
gpr_log(GPR_DEBUG, "end_op:%p", tag);
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
ev = add_locked(cc, GRPC_OP_COMPLETE, tag, call);
ev->base.success = success;
......
......@@ -124,6 +124,11 @@ struct channel_data {
gpr_uint32 registered_method_max_probes;
};
typedef struct shutdown_tag {
void *tag;
grpc_completion_queue *cq;
} shutdown_tag;
struct grpc_server {
size_t channel_filter_count;
const grpc_channel_filter **channel_filters;
......@@ -140,7 +145,7 @@ struct grpc_server {
gpr_uint8 shutdown;
size_t num_shutdown_tags;
void **shutdown_tags;
shutdown_tag *shutdown_tags;
call_data *lists[CALL_LIST_COUNT];
channel_data root_channel_data;
......@@ -383,13 +388,11 @@ static int num_listeners(grpc_server *server) {
}
static void maybe_finish_shutdown(grpc_server *server) {
size_t i, j;
size_t i;
if (server->shutdown && server->lists[ALL_CALLS] == NULL && server->listeners_destroyed == num_listeners(server)) {
for (i = 0; i < server->num_shutdown_tags; i++) {
for (j = 0; j < server->cq_count; j++) {
grpc_cq_end_op(server->cqs[j], server->shutdown_tags[i],
NULL, 1);
}
grpc_cq_end_op(server->shutdown_tags[i].cq, server->shutdown_tags[i].tag,
NULL, 1);
}
}
}
......@@ -804,7 +807,8 @@ grpc_transport_setup_result grpc_server_setup_transport(
return result;
}
void grpc_server_shutdown_and_notify(grpc_server *server, void *shutdown_tag) {
void grpc_server_shutdown_and_notify(grpc_server *server,
grpc_completion_queue *cq, void *tag) {
listener *l;
requested_call_array requested_calls;
channel_data **channels;
......@@ -814,6 +818,7 @@ void grpc_server_shutdown_and_notify(grpc_server *server, void *shutdown_tag) {
grpc_channel_op op;
grpc_channel_element *elem;
registered_method *rm;
shutdown_tag *sdt;
/* lock, and gather up some stuff to do */
gpr_mu_lock(&server->mu);
......@@ -823,7 +828,9 @@ void grpc_server_shutdown_and_notify(grpc_server *server, void *shutdown_tag) {
server->shutdown_tags =
gpr_realloc(server->shutdown_tags,
sizeof(void *) * (server->num_shutdown_tags + 1));
server->shutdown_tags[server->num_shutdown_tags++] = shutdown_tag;
sdt = &server->shutdown_tags[server->num_shutdown_tags++];
sdt->tag = tag;
sdt->cq = cq;
if (server->shutdown) {
gpr_mu_unlock(&server->mu);
return;
......
......@@ -52,6 +52,14 @@
namespace grpc {
class Server::ShutdownRequest GRPC_FINAL : public CompletionQueueTag {
public:
bool FinalizeResult(void** tag, bool* status) {
delete this;
return false;
}
};
class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
public:
SyncRequest(RpcServiceMethod* method, void* tag)
......@@ -286,7 +294,7 @@ void Server::Shutdown() {
grpc::unique_lock<grpc::mutex> lock(mu_);
if (started_ && !shutdown_) {
shutdown_ = true;
grpc_server_shutdown(server_);
grpc_server_shutdown_and_notify(server_, cq_.cq(), new ShutdownRequest());
cq_.Shutdown();
// Wait for running callbacks to finish.
......
......@@ -206,7 +206,7 @@ void test_connect(const char *server_host, const char *client_host, int port,
grpc_completion_queue_destroy(client_cq);
/* Destroy server. */
grpc_server_shutdown_and_notify(server, tag(1000));
grpc_server_shutdown_and_notify(server, server_cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(server);
grpc_completion_queue_shutdown(server_cq);
......
......@@ -76,7 +76,7 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return;
grpc_server_shutdown_and_notify(f->server, tag(1000));
grpc_server_shutdown_and_notify(f->server, f->server_cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server);
f->server = NULL;
......
......@@ -75,7 +75,7 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return;
grpc_server_shutdown_and_notify(f->server, tag(1000));
grpc_server_shutdown_and_notify(f->server, f->server_cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server);
f->server = NULL;
......
......@@ -75,7 +75,7 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return;
grpc_server_shutdown_and_notify(f->server, tag(1000));
grpc_server_shutdown_and_notify(f->server, f->server_cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server);
f->server = NULL;
......
......@@ -76,7 +76,7 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return;
grpc_server_shutdown_and_notify(f->server, tag(1000));
grpc_server_shutdown_and_notify(f->server, f->server_cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server);
f->server = NULL;
......
......@@ -74,7 +74,7 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return;
grpc_server_shutdown_and_notify(f->server, tag(1000));
grpc_server_shutdown_and_notify(f->server, f->server_cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server);
f->server = NULL;
......
......@@ -75,7 +75,7 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return;
grpc_server_shutdown_and_notify(f->server, tag(1000));
grpc_server_shutdown_and_notify(f->server, f->server_cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server);
f->server = NULL;
......
......@@ -64,7 +64,7 @@ static void *tag(gpr_intptr t) { return (void *)t; }
static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return;
grpc_server_shutdown_and_notify(f->server, tag(1000));
grpc_server_shutdown_and_notify(f->server, f->server_cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server);
f->server = NULL;
......
......@@ -136,7 +136,7 @@ static void do_request_and_shutdown_server(grpc_end2end_test_fixture *f,
/* should be able to shut down the server early
- and still complete the request */
grpc_server_shutdown_and_notify(f->server, tag(1000));
grpc_server_shutdown_and_notify(f->server, f->server_cq, tag(1000));
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
......
......@@ -149,7 +149,7 @@ static void test_early_server_shutdown_finishes_inflight_calls(
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102)));
/* shutdown and destroy the server */
grpc_server_shutdown_and_notify(f.server, tag(1000));
grpc_server_shutdown_and_notify(f.server, f.server_cq, tag(1000));
grpc_server_cancel_all_calls(f.server);
cq_expect_completion(v_server, tag(102), 1);
......
......@@ -106,7 +106,7 @@ static void test_early_server_shutdown_finishes_tags(
grpc_server_request_call(f.server, &s, &call_details,
&request_metadata_recv, f.server_cq,
f.server_cq, tag(101)));
grpc_server_shutdown_and_notify(f.server, tag(1000));
grpc_server_shutdown_and_notify(f.server, f.server_cq, tag(1000));
cq_expect_completion(v_server, tag(101), 0);
cq_expect_completion(v_server, tag(1000), 1);
cq_verify(v_server);
......
......@@ -76,7 +76,7 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return;
grpc_server_shutdown_and_notify(f->server, tag(1000));
grpc_server_shutdown_and_notify(f->server, f->server_cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server);
f->server = NULL;
......
......@@ -150,7 +150,7 @@ static void test_early_server_shutdown_finishes_inflight_calls(
cq_verify(v_server);
/* shutdown and destroy the server */
grpc_server_shutdown_and_notify(f.server, tag(0xdead));
grpc_server_shutdown_and_notify(f.server, f.server_cq, tag(0xdead));
cq_verify_empty(v_server);
op = ops;
......
......@@ -72,7 +72,7 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return;
grpc_server_shutdown_and_notify(f->server, tag(1000));
grpc_server_shutdown_and_notify(f->server, f->server_cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server);
f->server = NULL;
......
......@@ -74,7 +74,7 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return;
grpc_server_shutdown_and_notify(f->server, tag(1000));
grpc_server_shutdown_and_notify(f->server, f->server_cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)).type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server);
f->server = NULL;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment