Skip to content
Snippets Groups Projects
Commit b9eb180d authored by Craig Tiller's avatar Craig Tiller
Browse files

Fix some shutdown races

parent f2350183
No related branches found
No related tags found
No related merge requests found
...@@ -519,12 +519,22 @@ static void destroy_transport(grpc_transport *gt) { ...@@ -519,12 +519,22 @@ static void destroy_transport(grpc_transport *gt) {
lock(t); lock(t);
t->destroying = 1; t->destroying = 1;
while (t->calling_back) { /* Wait for pending stuff to finish.
We need to be not calling back to ensure that closed() gets a chance to
trigger if needed during unlock() before we die.
We need to be not writing as cancellation finalization may produce some
callbacks that NEED to be made to close out some streams when t->writing
becomes 0. */
while (t->calling_back || t->writing) {
gpr_cv_wait(&t->cv, &t->mu, gpr_inf_future); gpr_cv_wait(&t->cv, &t->mu, gpr_inf_future);
} }
drop_connection(t); drop_connection(t);
unlock(t); unlock(t);
lock(t);
GPR_ASSERT(!t->cb);
unlock(t);
unref_transport(t); unref_transport(t);
} }
...@@ -681,6 +691,7 @@ static void stream_list_add_tail(transport *t, stream *s, stream_list_id id) { ...@@ -681,6 +691,7 @@ static void stream_list_add_tail(transport *t, stream *s, stream_list_id id) {
} }
static void stream_list_join(transport *t, stream *s, stream_list_id id) { static void stream_list_join(transport *t, stream *s, stream_list_id id) {
if (id == PENDING_CALLBACKS) GPR_ASSERT(t->cb != NULL || t->error_state == ERROR_STATE_NONE);
if (s->included[id]) { if (s->included[id]) {
return; return;
} }
...@@ -739,7 +750,7 @@ static void unlock(transport *t) { ...@@ -739,7 +750,7 @@ static void unlock(transport *t) {
if (perform_callbacks) { if (perform_callbacks) {
t->calling_back = 1; t->calling_back = 1;
} }
if (t->error_state == ERROR_STATE_SEEN) { if (t->error_state == ERROR_STATE_SEEN && !t->writing) {
call_closed = 1; call_closed = 1;
t->calling_back = 1; t->calling_back = 1;
t->cb = NULL; /* no more callbacks */ t->cb = NULL; /* no more callbacks */
...@@ -904,13 +915,16 @@ static void finish_write_common(transport *t, int success) { ...@@ -904,13 +915,16 @@ static void finish_write_common(transport *t, int success) {
} }
while ((s = stream_list_remove_head(t, WRITTEN_CLOSED))) { while ((s = stream_list_remove_head(t, WRITTEN_CLOSED))) {
s->sent_write_closed = 1; s->sent_write_closed = 1;
stream_list_join(t, s, PENDING_CALLBACKS); if (!s->cancelled) stream_list_join(t, s, PENDING_CALLBACKS);
} }
t->outbuf.count = 0; t->outbuf.count = 0;
t->outbuf.length = 0; t->outbuf.length = 0;
/* leave the writing flag up on shutdown to prevent further writes in unlock() /* leave the writing flag up on shutdown to prevent further writes in unlock()
from starting */ from starting */
t->writing = 0; t->writing = 0;
if (t->destroying) {
gpr_cv_signal(&t->cv);
}
if (!t->reading) { if (!t->reading) {
grpc_endpoint_destroy(t->ep); grpc_endpoint_destroy(t->ep);
t->ep = NULL; t->ep = NULL;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment