Skip to content
Snippets Groups Projects
Commit eab5c335 authored by Jan Tattermusch's avatar Jan Tattermusch
Browse files

fix race in server shutdown

parent 357f0bdc
No related branches found
No related tags found
No related merge requests found
...@@ -79,6 +79,8 @@ struct grpc_tcp_server { ...@@ -79,6 +79,8 @@ struct grpc_tcp_server {
/* active port count: how many ports are actually still listening */ /* active port count: how many ports are actually still listening */
int active_ports; int active_ports;
/* number of iomgr callbacks that have been explicitly scheduled during shutdown */
int iomgr_callbacks_pending;
/* all listening ports */ /* all listening ports */
server_port *ports; server_port *ports;
...@@ -93,6 +95,7 @@ grpc_tcp_server *grpc_tcp_server_create(void) { ...@@ -93,6 +95,7 @@ grpc_tcp_server *grpc_tcp_server_create(void) {
gpr_mu_init(&s->mu); gpr_mu_init(&s->mu);
gpr_cv_init(&s->cv); gpr_cv_init(&s->cv);
s->active_ports = 0; s->active_ports = 0;
s->iomgr_callbacks_pending = 0;
s->cb = NULL; s->cb = NULL;
s->cb_arg = NULL; s->cb_arg = NULL;
s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP); s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP);
...@@ -112,10 +115,10 @@ void grpc_tcp_server_destroy(grpc_tcp_server *s, ...@@ -112,10 +115,10 @@ void grpc_tcp_server_destroy(grpc_tcp_server *s,
for (i = 0; i < s->nports; i++) { for (i = 0; i < s->nports; i++) {
server_port *sp = &s->ports[i]; server_port *sp = &s->ports[i];
sp->shutting_down = 1; sp->shutting_down = 1;
grpc_winsocket_shutdown(sp->socket); s->iomgr_callbacks_pending += grpc_winsocket_shutdown(sp->socket);
} }
/* This happens asynchronously. Wait while that happens. */ /* This happens asynchronously. Wait while that happens. */
while (s->active_ports) { while (s->active_ports || s->iomgr_callbacks_pending) {
gpr_cv_wait(&s->cv, &s->mu, gpr_inf_future(GPR_CLOCK_REALTIME)); gpr_cv_wait(&s->cv, &s->mu, gpr_inf_future(GPR_CLOCK_REALTIME));
} }
gpr_mu_unlock(&s->mu); gpr_mu_unlock(&s->mu);
...@@ -254,8 +257,16 @@ static void on_accept(void *arg, int from_iocp) { ...@@ -254,8 +257,16 @@ static void on_accept(void *arg, int from_iocp) {
/* The general mechanism for shutting down is to queue abortion calls. While /* The general mechanism for shutting down is to queue abortion calls. While
this is necessary in the read/write case, it's useless for the accept this is necessary in the read/write case, it's useless for the accept
case. Let's do nothing. */ case. We only need to adjust the pending callback count */
if (!from_iocp) return; if (!from_iocp) {
gpr_mu_lock(&sp->server->mu);
GPR_ASSERT(sp->server->iomgr_callbacks_pending > 0);
if (0 == --sp->server->iomgr_callbacks_pending) {
gpr_cv_broadcast(&sp->server->cv);
}
gpr_mu_unlock(&sp->server->mu);
return;
}
/* The IOCP notified us of a completed operation. Let's grab the results, /* The IOCP notified us of a completed operation. Let's grab the results,
and act accordingly. */ and act accordingly. */
...@@ -264,11 +275,12 @@ static void on_accept(void *arg, int from_iocp) { ...@@ -264,11 +275,12 @@ static void on_accept(void *arg, int from_iocp) {
&transfered_bytes, FALSE, &flags); &transfered_bytes, FALSE, &flags);
if (!wsa_success) { if (!wsa_success) {
if (sp->shutting_down) { if (sp->shutting_down) {
/* During the shutdown case, we ARE expecting an error. So that's swell, /* During the shutdown case, we ARE expecting an error. So that's well,
and we can wake up the shutdown thread. */ and we can wake up the shutdown thread. */
sp->shutting_down = 0; sp->shutting_down = 0;
sp->socket->read_info.outstanding = 0; sp->socket->read_info.outstanding = 0;
gpr_mu_lock(&sp->server->mu); gpr_mu_lock(&sp->server->mu);
GPR_ASSERT(sp->server->active_ports > 0);
if (0 == --sp->server->active_ports) { if (0 == --sp->server->active_ports) {
gpr_cv_broadcast(&sp->server->cv); gpr_cv_broadcast(&sp->server->cv);
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment