Skip to content
Snippets Groups Projects
Commit 6c656704 authored by Craig Tiller's avatar Craig Tiller
Browse files

Free stream ops outside transport lock

Resolves a deadlock caused by flow control callbacks being made under the lock.
parent 7448d3d6
No related branches found
No related tags found
No related merge requests found
...@@ -237,6 +237,9 @@ struct transport { ...@@ -237,6 +237,9 @@ struct transport {
/* state for a stream that's not yet been created */ /* state for a stream that's not yet been created */
grpc_stream_op_buffer new_stream_sopb; grpc_stream_op_buffer new_stream_sopb;
/* stream ops that need to be destroyed, but outside of the lock */
grpc_stream_op_buffer nuke_later_sopb;
/* active parser */ /* active parser */
void *parser_data; void *parser_data;
stream *incoming_stream; stream *incoming_stream;
...@@ -343,6 +346,7 @@ static void unref_transport(transport *t) { ...@@ -343,6 +346,7 @@ static void unref_transport(transport *t) {
grpc_chttp2_hpack_parser_destroy(&t->hpack_parser); grpc_chttp2_hpack_parser_destroy(&t->hpack_parser);
grpc_chttp2_hpack_compressor_destroy(&t->hpack_compressor); grpc_chttp2_hpack_compressor_destroy(&t->hpack_compressor);
grpc_chttp2_goaway_parser_destroy(&t->goaway_parser); grpc_chttp2_goaway_parser_destroy(&t->goaway_parser);
grpc_sopb_destroy(&t->nuke_later_sopb);
grpc_mdstr_unref(t->str_grpc_timeout); grpc_mdstr_unref(t->str_grpc_timeout);
...@@ -416,6 +420,7 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup, ...@@ -416,6 +420,7 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
t->cap_pending_goaways = 0; t->cap_pending_goaways = 0;
gpr_slice_buffer_init(&t->outbuf); gpr_slice_buffer_init(&t->outbuf);
gpr_slice_buffer_init(&t->qbuf); gpr_slice_buffer_init(&t->qbuf);
grpc_sopb_init(&t->nuke_later_sopb);
if (is_client) { if (is_client) {
gpr_slice_buffer_add(&t->qbuf, gpr_slice_buffer_add(&t->qbuf,
gpr_slice_from_copied_string(CLIENT_CONNECT_STRING)); gpr_slice_from_copied_string(CLIENT_CONNECT_STRING));
...@@ -555,6 +560,11 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs, ...@@ -555,6 +560,11 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs,
return 0; return 0;
} }
static void schedule_nuke_sopb(transport *t, grpc_stream_op_buffer *sopb) {
grpc_sopb_append(&t->nuke_later_sopb, sopb->ops, sopb->nops);
sopb->nops = 0;
}
static void destroy_stream(grpc_transport *gt, grpc_stream *gs) { static void destroy_stream(grpc_transport *gt, grpc_stream *gs) {
transport *t = (transport *)gt; transport *t = (transport *)gt;
stream *s = (stream *)gs; stream *s = (stream *)gs;
...@@ -681,6 +691,11 @@ static void unlock(transport *t) { ...@@ -681,6 +691,11 @@ static void unlock(transport *t) {
int i; int i;
pending_goaway *goaways = NULL; pending_goaway *goaways = NULL;
grpc_endpoint *ep = t->ep; grpc_endpoint *ep = t->ep;
grpc_stream_op_buffer nuke_now = t->nuke_later_sopb;
if (nuke_now.nops) {
memset(&t->nuke_later_sopb, 0, sizeof(t->nuke_later_sopb));
}
/* see if we need to trigger a write - and if so, get the data ready */ /* see if we need to trigger a write - and if so, get the data ready */
if (ep && !t->writing) { if (ep && !t->writing) {
...@@ -750,6 +765,10 @@ static void unlock(transport *t) { ...@@ -750,6 +765,10 @@ static void unlock(transport *t) {
unref_transport(t); unref_transport(t);
} }
if (nuke_now.nops) {
grpc_sopb_reset(&nuke_now);
}
gpr_free(goaways); gpr_free(goaways);
} }
...@@ -1006,9 +1025,9 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id, ...@@ -1006,9 +1025,9 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
if (s) { if (s) {
/* clear out any unreported input & output: nobody cares anymore */ /* clear out any unreported input & output: nobody cares anymore */
grpc_sopb_reset(&s->parser.incoming_sopb);
had_outgoing = s->outgoing_sopb.nops != 0; had_outgoing = s->outgoing_sopb.nops != 0;
grpc_sopb_reset(&s->outgoing_sopb); schedule_nuke_sopb(t, &s->parser.incoming_sopb);
schedule_nuke_sopb(t, &s->outgoing_sopb);
if (s->cancelled) { if (s->cancelled) {
send_rst = 0; send_rst = 0;
} else if (!s->read_closed || !s->sent_write_closed || had_outgoing) { } else if (!s->read_closed || !s->sent_write_closed || had_outgoing) {
...@@ -1503,8 +1522,8 @@ static int process_read(transport *t, gpr_slice slice) { ...@@ -1503,8 +1522,8 @@ static int process_read(transport *t, gpr_slice slice) {
"Connect string mismatch: expected '%c' (%d) got '%c' (%d) " "Connect string mismatch: expected '%c' (%d) got '%c' (%d) "
"at byte %d", "at byte %d",
CLIENT_CONNECT_STRING[t->deframe_state], CLIENT_CONNECT_STRING[t->deframe_state],
(int)(gpr_uint8)CLIENT_CONNECT_STRING[t->deframe_state], *cur, (int)(gpr_uint8) CLIENT_CONNECT_STRING[t->deframe_state],
(int)*cur, t->deframe_state); *cur, (int)*cur, t->deframe_state);
drop_connection(t); drop_connection(t);
return 0; return 0;
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment