Skip to content
Snippets Groups Projects
Commit cb818ba7 authored by Craig Tiller's avatar Craig Tiller
Browse files

Free stream ops outside transport lock

Resolves a deadlock caused by flow control callbacks being made under the lock.
parent 4433e6e6
No related branches found
No related tags found
No related merge requests found
...@@ -237,6 +237,9 @@ struct transport { ...@@ -237,6 +237,9 @@ struct transport {
/* state for a stream that's not yet been created */ /* state for a stream that's not yet been created */
grpc_stream_op_buffer new_stream_sopb; grpc_stream_op_buffer new_stream_sopb;
/* stream ops that need to be destroyed, but outside of the lock */
grpc_stream_op_buffer nuke_later_sopb;
/* active parser */ /* active parser */
void *parser_data; void *parser_data;
stream *incoming_stream; stream *incoming_stream;
...@@ -343,6 +346,7 @@ static void unref_transport(transport *t) { ...@@ -343,6 +346,7 @@ static void unref_transport(transport *t) {
grpc_chttp2_hpack_parser_destroy(&t->hpack_parser); grpc_chttp2_hpack_parser_destroy(&t->hpack_parser);
grpc_chttp2_hpack_compressor_destroy(&t->hpack_compressor); grpc_chttp2_hpack_compressor_destroy(&t->hpack_compressor);
grpc_chttp2_goaway_parser_destroy(&t->goaway_parser); grpc_chttp2_goaway_parser_destroy(&t->goaway_parser);
grpc_sopb_destroy(&t->nuke_later_sopb);
grpc_mdstr_unref(t->str_grpc_timeout); grpc_mdstr_unref(t->str_grpc_timeout);
...@@ -416,6 +420,7 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup, ...@@ -416,6 +420,7 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
t->cap_pending_goaways = 0; t->cap_pending_goaways = 0;
gpr_slice_buffer_init(&t->outbuf); gpr_slice_buffer_init(&t->outbuf);
gpr_slice_buffer_init(&t->qbuf); gpr_slice_buffer_init(&t->qbuf);
grpc_sopb_init(&t->nuke_later_sopb);
if (is_client) { if (is_client) {
gpr_slice_buffer_add(&t->qbuf, gpr_slice_buffer_add(&t->qbuf,
gpr_slice_from_copied_string(CLIENT_CONNECT_STRING)); gpr_slice_from_copied_string(CLIENT_CONNECT_STRING));
...@@ -525,7 +530,7 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs, ...@@ -525,7 +530,7 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs,
lock(t); lock(t);
s->id = 0; s->id = 0;
} else { } else {
s->id = (gpr_uint32)(gpr_uintptr)server_data; s->id = (gpr_uint32)(gpr_uintptr) server_data;
t->incoming_stream = s; t->incoming_stream = s;
grpc_chttp2_stream_map_add(&t->stream_map, s->id, s); grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
} }
...@@ -555,6 +560,11 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs, ...@@ -555,6 +560,11 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs,
return 0; return 0;
} }
static void schedule_nuke_sopb(transport *t, grpc_stream_op_buffer *sopb) {
grpc_sopb_append(&t->nuke_later_sopb, sopb->ops, sopb->nops);
sopb->nops = 0;
}
static void destroy_stream(grpc_transport *gt, grpc_stream *gs) { static void destroy_stream(grpc_transport *gt, grpc_stream *gs) {
transport *t = (transport *)gt; transport *t = (transport *)gt;
stream *s = (stream *)gs; stream *s = (stream *)gs;
...@@ -681,6 +691,11 @@ static void unlock(transport *t) { ...@@ -681,6 +691,11 @@ static void unlock(transport *t) {
int i; int i;
pending_goaway *goaways = NULL; pending_goaway *goaways = NULL;
grpc_endpoint *ep = t->ep; grpc_endpoint *ep = t->ep;
grpc_stream_op_buffer nuke_now = t->nuke_later_sopb;
if (nuke_now.nops) {
memset(&t->nuke_later_sopb, 0, sizeof(t->nuke_later_sopb));
}
/* see if we need to trigger a write - and if so, get the data ready */ /* see if we need to trigger a write - and if so, get the data ready */
if (ep && !t->writing) { if (ep && !t->writing) {
...@@ -750,6 +765,10 @@ static void unlock(transport *t) { ...@@ -750,6 +765,10 @@ static void unlock(transport *t) {
unref_transport(t); unref_transport(t);
} }
if (nuke_now.nops) {
grpc_sopb_reset(&nuke_now);
}
gpr_free(goaways); gpr_free(goaways);
} }
...@@ -1006,9 +1025,9 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id, ...@@ -1006,9 +1025,9 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
if (s) { if (s) {
/* clear out any unreported input & output: nobody cares anymore */ /* clear out any unreported input & output: nobody cares anymore */
grpc_sopb_reset(&s->parser.incoming_sopb);
had_outgoing = s->outgoing_sopb.nops != 0; had_outgoing = s->outgoing_sopb.nops != 0;
grpc_sopb_reset(&s->outgoing_sopb); schedule_nuke_sopb(t, &s->parser.incoming_sopb);
schedule_nuke_sopb(t, &s->outgoing_sopb);
if (s->cancelled) { if (s->cancelled) {
send_rst = 0; send_rst = 0;
} else if (!s->read_closed || !s->sent_write_closed || had_outgoing) { } else if (!s->read_closed || !s->sent_write_closed || had_outgoing) {
...@@ -1238,7 +1257,7 @@ static int init_header_frame_parser(transport *t, int is_continuation) { ...@@ -1238,7 +1257,7 @@ static int init_header_frame_parser(transport *t, int is_continuation) {
t->incoming_stream = NULL; t->incoming_stream = NULL;
/* if stream is accepted, we set incoming_stream in init_stream */ /* if stream is accepted, we set incoming_stream in init_stream */
t->cb->accept_stream(t->cb_user_data, &t->base, t->cb->accept_stream(t->cb_user_data, &t->base,
(void *)(gpr_uintptr)t->incoming_stream_id); (void *)(gpr_uintptr) t->incoming_stream_id);
s = t->incoming_stream; s = t->incoming_stream;
if (!s) { if (!s) {
gpr_log(GPR_ERROR, "stream not accepted"); gpr_log(GPR_ERROR, "stream not accepted");
...@@ -1503,8 +1522,8 @@ static int process_read(transport *t, gpr_slice slice) { ...@@ -1503,8 +1522,8 @@ static int process_read(transport *t, gpr_slice slice) {
"Connect string mismatch: expected '%c' (%d) got '%c' (%d) " "Connect string mismatch: expected '%c' (%d) got '%c' (%d) "
"at byte %d", "at byte %d",
CLIENT_CONNECT_STRING[t->deframe_state], CLIENT_CONNECT_STRING[t->deframe_state],
(int)(gpr_uint8)CLIENT_CONNECT_STRING[t->deframe_state], *cur, (int)(gpr_uint8) CLIENT_CONNECT_STRING[t->deframe_state],
(int)*cur, t->deframe_state); *cur, (int)*cur, t->deframe_state);
drop_connection(t); drop_connection(t);
return 0; return 0;
} }
...@@ -1518,7 +1537,7 @@ static int process_read(transport *t, gpr_slice slice) { ...@@ -1518,7 +1537,7 @@ static int process_read(transport *t, gpr_slice slice) {
dts_fh_0: dts_fh_0:
case DTS_FH_0: case DTS_FH_0:
GPR_ASSERT(cur < end); GPR_ASSERT(cur < end);
t->incoming_frame_size = ((gpr_uint32) * cur) << 16; t->incoming_frame_size = ((gpr_uint32)*cur) << 16;
if (++cur == end) { if (++cur == end) {
t->deframe_state = DTS_FH_1; t->deframe_state = DTS_FH_1;
return 1; return 1;
...@@ -1526,7 +1545,7 @@ static int process_read(transport *t, gpr_slice slice) { ...@@ -1526,7 +1545,7 @@ static int process_read(transport *t, gpr_slice slice) {
/* fallthrough */ /* fallthrough */
case DTS_FH_1: case DTS_FH_1:
GPR_ASSERT(cur < end); GPR_ASSERT(cur < end);
t->incoming_frame_size |= ((gpr_uint32) * cur) << 8; t->incoming_frame_size |= ((gpr_uint32)*cur) << 8;
if (++cur == end) { if (++cur == end) {
t->deframe_state = DTS_FH_2; t->deframe_state = DTS_FH_2;
return 1; return 1;
...@@ -1558,7 +1577,7 @@ static int process_read(transport *t, gpr_slice slice) { ...@@ -1558,7 +1577,7 @@ static int process_read(transport *t, gpr_slice slice) {
/* fallthrough */ /* fallthrough */
case DTS_FH_5: case DTS_FH_5:
GPR_ASSERT(cur < end); GPR_ASSERT(cur < end);
t->incoming_stream_id = (((gpr_uint32) * cur) << 24) & 0x7f; t->incoming_stream_id = (((gpr_uint32)*cur) << 24) & 0x7f;
if (++cur == end) { if (++cur == end) {
t->deframe_state = DTS_FH_6; t->deframe_state = DTS_FH_6;
return 1; return 1;
...@@ -1566,7 +1585,7 @@ static int process_read(transport *t, gpr_slice slice) { ...@@ -1566,7 +1585,7 @@ static int process_read(transport *t, gpr_slice slice) {
/* fallthrough */ /* fallthrough */
case DTS_FH_6: case DTS_FH_6:
GPR_ASSERT(cur < end); GPR_ASSERT(cur < end);
t->incoming_stream_id |= ((gpr_uint32) * cur) << 16; t->incoming_stream_id |= ((gpr_uint32)*cur) << 16;
if (++cur == end) { if (++cur == end) {
t->deframe_state = DTS_FH_7; t->deframe_state = DTS_FH_7;
return 1; return 1;
...@@ -1574,7 +1593,7 @@ static int process_read(transport *t, gpr_slice slice) { ...@@ -1574,7 +1593,7 @@ static int process_read(transport *t, gpr_slice slice) {
/* fallthrough */ /* fallthrough */
case DTS_FH_7: case DTS_FH_7:
GPR_ASSERT(cur < end); GPR_ASSERT(cur < end);
t->incoming_stream_id |= ((gpr_uint32) * cur) << 8; t->incoming_stream_id |= ((gpr_uint32)*cur) << 8;
if (++cur == end) { if (++cur == end) {
t->deframe_state = DTS_FH_8; t->deframe_state = DTS_FH_8;
return 1; return 1;
...@@ -1582,7 +1601,7 @@ static int process_read(transport *t, gpr_slice slice) { ...@@ -1582,7 +1601,7 @@ static int process_read(transport *t, gpr_slice slice) {
/* fallthrough */ /* fallthrough */
case DTS_FH_8: case DTS_FH_8:
GPR_ASSERT(cur < end); GPR_ASSERT(cur < end);
t->incoming_stream_id |= ((gpr_uint32) * cur); t->incoming_stream_id |= ((gpr_uint32)*cur);
t->deframe_state = DTS_FRAME; t->deframe_state = DTS_FRAME;
if (!init_frame_parser(t)) { if (!init_frame_parser(t)) {
return 0; return 0;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment