Skip to content
Snippets Groups Projects
Commit 9aa6f40d authored by Robbie Shade's avatar Robbie Shade
Browse files

Add callback when gRPC FD is about to be orphaned.

parent 26dd2b8d
No related branches found
No related tags found
No related merge requests found
...@@ -81,6 +81,7 @@ typedef struct { ...@@ -81,6 +81,7 @@ typedef struct {
grpc_closure read_closure; grpc_closure read_closure;
grpc_closure destroyed_closure; grpc_closure destroyed_closure;
grpc_udp_server_read_cb read_cb; grpc_udp_server_read_cb read_cb;
grpc_udp_server_orphan_cb orphan_cb;
} server_port; } server_port;
/* the overall server */ /* the overall server */
...@@ -168,6 +169,10 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { ...@@ -168,6 +169,10 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) {
server_port *sp = &s->ports[i]; server_port *sp = &s->ports[i];
sp->destroyed_closure.cb = destroyed_port; sp->destroyed_closure.cb = destroyed_port;
sp->destroyed_closure.cb_arg = s; sp->destroyed_closure.cb_arg = s;
GPR_ASSERT(sp->orphan_cb);
sp->orphan_cb(sp->emfd);
grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, NULL, grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, NULL,
"udp_listener_shutdown"); "udp_listener_shutdown");
} }
...@@ -268,7 +273,8 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, bool success) { ...@@ -268,7 +273,8 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
static int add_socket_to_server(grpc_udp_server *s, int fd, static int add_socket_to_server(grpc_udp_server *s, int fd,
const struct sockaddr *addr, size_t addr_len, const struct sockaddr *addr, size_t addr_len,
grpc_udp_server_read_cb read_cb) { grpc_udp_server_read_cb read_cb,
grpc_udp_server_orphan_cb orphan_cb) {
server_port *sp; server_port *sp;
int port; int port;
char *addr_str; char *addr_str;
...@@ -292,6 +298,7 @@ static int add_socket_to_server(grpc_udp_server *s, int fd, ...@@ -292,6 +298,7 @@ static int add_socket_to_server(grpc_udp_server *s, int fd,
memcpy(sp->addr.untyped, addr, addr_len); memcpy(sp->addr.untyped, addr, addr_len);
sp->addr_len = addr_len; sp->addr_len = addr_len;
sp->read_cb = read_cb; sp->read_cb = read_cb;
sp->orphan_cb = orphan_cb;
GPR_ASSERT(sp->emfd); GPR_ASSERT(sp->emfd);
gpr_mu_unlock(&s->mu); gpr_mu_unlock(&s->mu);
gpr_free(name); gpr_free(name);
...@@ -301,7 +308,8 @@ static int add_socket_to_server(grpc_udp_server *s, int fd, ...@@ -301,7 +308,8 @@ static int add_socket_to_server(grpc_udp_server *s, int fd,
} }
int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr, int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr,
size_t addr_len, grpc_udp_server_read_cb read_cb) { size_t addr_len, grpc_udp_server_read_cb read_cb,
grpc_udp_server_orphan_cb orphan_cb) {
int allocated_port1 = -1; int allocated_port1 = -1;
int allocated_port2 = -1; int allocated_port2 = -1;
unsigned i; unsigned i;
...@@ -348,7 +356,8 @@ int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr, ...@@ -348,7 +356,8 @@ int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr,
addr = (struct sockaddr *)&wild6; addr = (struct sockaddr *)&wild6;
addr_len = sizeof(wild6); addr_len = sizeof(wild6);
fd = grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode); fd = grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode);
allocated_port1 = add_socket_to_server(s, fd, addr, addr_len, read_cb); allocated_port1 =
add_socket_to_server(s, fd, addr, addr_len, read_cb, orphan_cb);
if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) { if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) {
goto done; goto done;
} }
...@@ -370,7 +379,8 @@ int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr, ...@@ -370,7 +379,8 @@ int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr,
addr = (struct sockaddr *)&addr4_copy; addr = (struct sockaddr *)&addr4_copy;
addr_len = sizeof(addr4_copy); addr_len = sizeof(addr4_copy);
} }
allocated_port2 = add_socket_to_server(s, fd, addr, addr_len, read_cb); allocated_port2 =
add_socket_to_server(s, fd, addr, addr_len, read_cb, orphan_cb);
done: done:
gpr_free(allocated_addr); gpr_free(allocated_addr);
......
...@@ -48,6 +48,9 @@ typedef struct grpc_udp_server grpc_udp_server; ...@@ -48,6 +48,9 @@ typedef struct grpc_udp_server grpc_udp_server;
typedef void (*grpc_udp_server_read_cb)(grpc_exec_ctx *exec_ctx, grpc_fd *emfd, typedef void (*grpc_udp_server_read_cb)(grpc_exec_ctx *exec_ctx, grpc_fd *emfd,
struct grpc_server *server); struct grpc_server *server);
/* Called when the grpc_fd is about to be orphaned (and the FD closed). */
typedef void (*grpc_udp_server_orphan_cb)(grpc_fd *emfd);
/* Create a server, initially not bound to any ports */ /* Create a server, initially not bound to any ports */
grpc_udp_server *grpc_udp_server_create(void); grpc_udp_server *grpc_udp_server_create(void);
...@@ -69,7 +72,8 @@ int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index); ...@@ -69,7 +72,8 @@ int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index);
/* TODO(ctiller): deprecate this, and make grpc_udp_server_add_ports to handle /* TODO(ctiller): deprecate this, and make grpc_udp_server_add_ports to handle
all of the multiple socket port matching logic in one place */ all of the multiple socket port matching logic in one place */
int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr, int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr,
size_t addr_len, grpc_udp_server_read_cb read_cb); size_t addr_len, grpc_udp_server_read_cb read_cb,
grpc_udp_server_orphan_cb orphan_cb);
void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *server, void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *server,
grpc_closure *on_done); grpc_closure *on_done);
......
...@@ -54,6 +54,7 @@ static grpc_pollset *g_pollset; ...@@ -54,6 +54,7 @@ static grpc_pollset *g_pollset;
static gpr_mu *g_mu; static gpr_mu *g_mu;
static int g_number_of_reads = 0; static int g_number_of_reads = 0;
static int g_number_of_bytes_read = 0; static int g_number_of_bytes_read = 0;
static int g_number_of_orphan_calls = 0;
static void on_read(grpc_exec_ctx *exec_ctx, grpc_fd *emfd, static void on_read(grpc_exec_ctx *exec_ctx, grpc_fd *emfd,
grpc_server *server) { grpc_server *server) {
...@@ -71,6 +72,12 @@ static void on_read(grpc_exec_ctx *exec_ctx, grpc_fd *emfd, ...@@ -71,6 +72,12 @@ static void on_read(grpc_exec_ctx *exec_ctx, grpc_fd *emfd,
gpr_mu_unlock(g_mu); gpr_mu_unlock(g_mu);
} }
static void on_fd_orphaned(grpc_fd *emfd) {
gpr_log(GPR_INFO, "gRPC FD about to be orphaned: %d",
grpc_fd_wrapped_fd(emfd));
g_number_of_orphan_calls++;
}
static void test_no_op(void) { static void test_no_op(void) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_udp_server *s = grpc_udp_server_create(); grpc_udp_server *s = grpc_udp_server_create();
...@@ -88,6 +95,7 @@ static void test_no_op_with_start(void) { ...@@ -88,6 +95,7 @@ static void test_no_op_with_start(void) {
} }
static void test_no_op_with_port(void) { static void test_no_op_with_port(void) {
g_number_of_orphan_calls = 0;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
struct sockaddr_in addr; struct sockaddr_in addr;
grpc_udp_server *s = grpc_udp_server_create(); grpc_udp_server *s = grpc_udp_server_create();
...@@ -96,13 +104,17 @@ static void test_no_op_with_port(void) { ...@@ -96,13 +104,17 @@ static void test_no_op_with_port(void) {
memset(&addr, 0, sizeof(addr)); memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET; addr.sin_family = AF_INET;
GPR_ASSERT(grpc_udp_server_add_port(s, (struct sockaddr *)&addr, sizeof(addr), GPR_ASSERT(grpc_udp_server_add_port(s, (struct sockaddr *)&addr, sizeof(addr),
on_read)); on_read, on_fd_orphaned));
grpc_udp_server_destroy(&exec_ctx, s, NULL); grpc_udp_server_destroy(&exec_ctx, s, NULL);
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx);
/* The server had a single FD, which should be orphaned. */
GPR_ASSERT(g_number_of_orphan_calls == 1);
} }
static void test_no_op_with_port_and_start(void) { static void test_no_op_with_port_and_start(void) {
g_number_of_orphan_calls = 0;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
struct sockaddr_in addr; struct sockaddr_in addr;
grpc_udp_server *s = grpc_udp_server_create(); grpc_udp_server *s = grpc_udp_server_create();
...@@ -111,12 +123,15 @@ static void test_no_op_with_port_and_start(void) { ...@@ -111,12 +123,15 @@ static void test_no_op_with_port_and_start(void) {
memset(&addr, 0, sizeof(addr)); memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET; addr.sin_family = AF_INET;
GPR_ASSERT(grpc_udp_server_add_port(s, (struct sockaddr *)&addr, sizeof(addr), GPR_ASSERT(grpc_udp_server_add_port(s, (struct sockaddr *)&addr, sizeof(addr),
on_read)); on_read, on_fd_orphaned));
grpc_udp_server_start(&exec_ctx, s, NULL, 0, NULL); grpc_udp_server_start(&exec_ctx, s, NULL, 0, NULL);
grpc_udp_server_destroy(&exec_ctx, s, NULL); grpc_udp_server_destroy(&exec_ctx, s, NULL);
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx);
/* The server had a single FD which should be orphaned. */
GPR_ASSERT(g_number_of_orphan_calls == 1);
} }
static void test_receive(int number_of_clients) { static void test_receive(int number_of_clients) {
...@@ -133,11 +148,12 @@ static void test_receive(int number_of_clients) { ...@@ -133,11 +148,12 @@ static void test_receive(int number_of_clients) {
gpr_log(GPR_INFO, "clients=%d", number_of_clients); gpr_log(GPR_INFO, "clients=%d", number_of_clients);
g_number_of_bytes_read = 0; g_number_of_bytes_read = 0;
g_number_of_orphan_calls = 0;
memset(&addr, 0, sizeof(addr)); memset(&addr, 0, sizeof(addr));
addr.ss_family = AF_INET; addr.ss_family = AF_INET;
GPR_ASSERT( GPR_ASSERT(grpc_udp_server_add_port(s, (struct sockaddr *)&addr, addr_len,
grpc_udp_server_add_port(s, (struct sockaddr *)&addr, addr_len, on_read)); on_read, on_fd_orphaned));
svrfd = grpc_udp_server_get_fd(s, 0); svrfd = grpc_udp_server_get_fd(s, 0);
GPR_ASSERT(svrfd >= 0); GPR_ASSERT(svrfd >= 0);
...@@ -176,6 +192,8 @@ static void test_receive(int number_of_clients) { ...@@ -176,6 +192,8 @@ static void test_receive(int number_of_clients) {
grpc_udp_server_destroy(&exec_ctx, s, NULL); grpc_udp_server_destroy(&exec_ctx, s, NULL);
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(g_number_of_orphan_calls == 5);
} }
static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, bool success) { static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, bool success) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment