Skip to content
Snippets Groups Projects
Commit 52ff44f0 authored by Yang Gao's avatar Yang Gao Committed by GitHub
Browse files

Merge pull request #10440 from danzh2010/udpserver

Modify  udp_server to postpone shutdown_fd() during shutdown.
parents 940b2bd0 2d38b056
No related branches found
No related tags found
No related merge requests found
...@@ -79,10 +79,15 @@ struct grpc_udp_listener { ...@@ -79,10 +79,15 @@ struct grpc_udp_listener {
grpc_resolved_address addr; grpc_resolved_address addr;
grpc_closure read_closure; grpc_closure read_closure;
grpc_closure write_closure; grpc_closure write_closure;
// To be called when corresponding QuicGrpcServer closes all active
// connections.
grpc_closure orphan_fd_closure;
grpc_closure destroyed_closure; grpc_closure destroyed_closure;
grpc_udp_server_read_cb read_cb; grpc_udp_server_read_cb read_cb;
grpc_udp_server_write_cb write_cb; grpc_udp_server_write_cb write_cb;
grpc_udp_server_orphan_cb orphan_cb; grpc_udp_server_orphan_cb orphan_cb;
// True if orphan_cb is trigered.
bool orphan_notified;
struct grpc_udp_listener *next; struct grpc_udp_listener *next;
}; };
...@@ -146,6 +151,14 @@ grpc_udp_server *grpc_udp_server_create(const grpc_channel_args *args) { ...@@ -146,6 +151,14 @@ grpc_udp_server *grpc_udp_server_create(const grpc_channel_args *args) {
return s; return s;
} }
static void shutdown_fd(grpc_exec_ctx *exec_ctx, void *fd, grpc_error *error) {
grpc_fd_shutdown(exec_ctx, (grpc_fd *)fd, GRPC_ERROR_REF(error));
}
static void dummy_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
// No-op.
}
static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) {
if (s->shutdown_complete != NULL) { if (s->shutdown_complete != NULL) {
grpc_closure_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE); grpc_closure_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE);
...@@ -195,12 +208,16 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { ...@@ -195,12 +208,16 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) {
grpc_closure_init(&sp->destroyed_closure, destroyed_port, s, grpc_closure_init(&sp->destroyed_closure, destroyed_port, s,
grpc_schedule_on_exec_ctx); grpc_schedule_on_exec_ctx);
if (!sp->orphan_notified) {
/* Call the orphan_cb to signal that the FD is about to be closed and /* Call the orphan_cb to signal that the FD is about to be closed and
* should no longer be used. */ * should no longer be used. Because at this point, all listening ports
* have been shutdown already, no need to shutdown again.*/
grpc_closure_init(&sp->orphan_fd_closure, dummy_cb, sp->emfd,
grpc_schedule_on_exec_ctx);
GPR_ASSERT(sp->orphan_cb); GPR_ASSERT(sp->orphan_cb);
sp->orphan_cb(exec_ctx, sp->emfd, sp->server->user_data); sp->orphan_cb(exec_ctx, sp->emfd, &sp->orphan_fd_closure,
sp->server->user_data);
}
grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, NULL, grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, NULL,
"udp_listener_shutdown"); "udp_listener_shutdown");
} }
...@@ -225,9 +242,11 @@ void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, ...@@ -225,9 +242,11 @@ void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s,
if (s->active_ports) { if (s->active_ports) {
for (sp = s->head; sp; sp = sp->next) { for (sp = s->head; sp; sp = sp->next) {
GPR_ASSERT(sp->orphan_cb); GPR_ASSERT(sp->orphan_cb);
sp->orphan_cb(exec_ctx, sp->emfd, sp->server->user_data); grpc_closure_init(&sp->orphan_fd_closure, shutdown_fd, sp->emfd,
grpc_fd_shutdown(exec_ctx, sp->emfd, GRPC_ERROR_CREATE_FROM_STATIC_STRING( grpc_schedule_on_exec_ctx);
"Server destroyed")); sp->orphan_cb(exec_ctx, sp->emfd, &sp->orphan_fd_closure,
sp->server->user_data);
sp->orphan_notified = true;
} }
gpr_mu_unlock(&s->mu); gpr_mu_unlock(&s->mu);
} else { } else {
...@@ -391,6 +410,7 @@ static int add_socket_to_server(grpc_udp_server *s, int fd, ...@@ -391,6 +410,7 @@ static int add_socket_to_server(grpc_udp_server *s, int fd,
sp->read_cb = read_cb; sp->read_cb = read_cb;
sp->write_cb = write_cb; sp->write_cb = write_cb;
sp->orphan_cb = orphan_cb; sp->orphan_cb = orphan_cb;
sp->orphan_notified = false;
GPR_ASSERT(sp->emfd); GPR_ASSERT(sp->emfd);
gpr_mu_unlock(&s->mu); gpr_mu_unlock(&s->mu);
gpr_free(name); gpr_free(name);
......
...@@ -55,7 +55,9 @@ typedef void (*grpc_udp_server_write_cb)(grpc_exec_ctx *exec_ctx, grpc_fd *emfd, ...@@ -55,7 +55,9 @@ typedef void (*grpc_udp_server_write_cb)(grpc_exec_ctx *exec_ctx, grpc_fd *emfd,
/* Called when the grpc_fd is about to be orphaned (and the FD closed). */ /* Called when the grpc_fd is about to be orphaned (and the FD closed). */
typedef void (*grpc_udp_server_orphan_cb)(grpc_exec_ctx *exec_ctx, typedef void (*grpc_udp_server_orphan_cb)(grpc_exec_ctx *exec_ctx,
grpc_fd *emfd, void *user_data); grpc_fd *emfd,
grpc_closure *shutdown_fd_callback,
void *user_data);
/* Create a server, initially not bound to any ports */ /* Create a server, initially not bound to any ports */
grpc_udp_server *grpc_udp_server_create(const grpc_channel_args *args); grpc_udp_server *grpc_udp_server_create(const grpc_channel_args *args);
......
...@@ -91,7 +91,7 @@ static void on_write(grpc_exec_ctx *exec_ctx, grpc_fd *emfd, void *user_data) { ...@@ -91,7 +91,7 @@ static void on_write(grpc_exec_ctx *exec_ctx, grpc_fd *emfd, void *user_data) {
} }
static void on_fd_orphaned(grpc_exec_ctx *exec_ctx, grpc_fd *emfd, static void on_fd_orphaned(grpc_exec_ctx *exec_ctx, grpc_fd *emfd,
void *user_data) { grpc_closure *closure, void *user_data) {
gpr_log(GPR_INFO, "gRPC FD about to be orphaned: %d", gpr_log(GPR_INFO, "gRPC FD about to be orphaned: %d",
grpc_fd_wrapped_fd(emfd)); grpc_fd_wrapped_fd(emfd));
g_number_of_orphan_calls++; g_number_of_orphan_calls++;
...@@ -228,9 +228,9 @@ static void test_no_op_with_port_and_start(void) { ...@@ -228,9 +228,9 @@ static void test_no_op_with_port_and_start(void) {
grpc_udp_server_destroy(&exec_ctx, s, NULL); grpc_udp_server_destroy(&exec_ctx, s, NULL);
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx);
/* The server had a single FD, which is orphaned once in * /* The server had a single FD, which is orphaned exactly once in *
* deactivated_all_ports, and once in grpc_udp_server_destroy. */ * grpc_udp_server_destroy. */
GPR_ASSERT(g_number_of_orphan_calls == 2); GPR_ASSERT(g_number_of_orphan_calls == 1);
} }
static void test_receive(int number_of_clients) { static void test_receive(int number_of_clients) {
...@@ -297,9 +297,9 @@ static void test_receive(int number_of_clients) { ...@@ -297,9 +297,9 @@ static void test_receive(int number_of_clients) {
grpc_udp_server_destroy(&exec_ctx, s, NULL); grpc_udp_server_destroy(&exec_ctx, s, NULL);
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx);
/* The server had a single FD, which is orphaned once in * /* The server had a single FD, which is orphaned exactly once in *
* deactivated_all_ports, and once in grpc_udp_server_destroy. */ * grpc_udp_server_destroy. */
GPR_ASSERT(g_number_of_orphan_calls == 2); GPR_ASSERT(g_number_of_orphan_calls == 1);
/* The write callback should have fired a few times. */ /* The write callback should have fired a few times. */
GPR_ASSERT(g_number_of_writes > 0); GPR_ASSERT(g_number_of_writes > 0);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment