diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 5465d3306a5c932f9144bdfdd40385976fde5e07..478d4a6944b4af346a34f0f6968047deb3bfb87e 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -237,6 +237,9 @@ struct transport { /* state for a stream that's not yet been created */ grpc_stream_op_buffer new_stream_sopb; + /* stream ops that need to be destroyed, but outside of the lock */ + grpc_stream_op_buffer nuke_later_sopb; + /* active parser */ void *parser_data; stream *incoming_stream; @@ -343,6 +346,7 @@ static void unref_transport(transport *t) { grpc_chttp2_hpack_parser_destroy(&t->hpack_parser); grpc_chttp2_hpack_compressor_destroy(&t->hpack_compressor); grpc_chttp2_goaway_parser_destroy(&t->goaway_parser); + grpc_sopb_destroy(&t->nuke_later_sopb); grpc_mdstr_unref(t->str_grpc_timeout); @@ -416,6 +420,7 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup, t->cap_pending_goaways = 0; gpr_slice_buffer_init(&t->outbuf); gpr_slice_buffer_init(&t->qbuf); + grpc_sopb_init(&t->nuke_later_sopb); if (is_client) { gpr_slice_buffer_add(&t->qbuf, gpr_slice_from_copied_string(CLIENT_CONNECT_STRING)); @@ -525,7 +530,7 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs, lock(t); s->id = 0; } else { - s->id = (gpr_uint32)(gpr_uintptr)server_data; + s->id = (gpr_uint32)(gpr_uintptr) server_data; t->incoming_stream = s; grpc_chttp2_stream_map_add(&t->stream_map, s->id, s); } @@ -555,6 +560,11 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs, return 0; } +static void schedule_nuke_sopb(transport *t, grpc_stream_op_buffer *sopb) { + grpc_sopb_append(&t->nuke_later_sopb, sopb->ops, sopb->nops); + sopb->nops = 0; +} + static void destroy_stream(grpc_transport *gt, grpc_stream *gs) { transport *t = (transport *)gt; stream *s = (stream *)gs; @@ -681,6 +691,11 @@ static void unlock(transport *t) { int i; pending_goaway *goaways = NULL; grpc_endpoint *ep = t->ep; + grpc_stream_op_buffer nuke_now = t->nuke_later_sopb; + + if (nuke_now.nops) { + memset(&t->nuke_later_sopb, 0, sizeof(t->nuke_later_sopb)); + } /* see if we need to trigger a write - and if so, get the data ready */ if (ep && !t->writing) { @@ -750,6 +765,10 @@ static void unlock(transport *t) { unref_transport(t); } + if (nuke_now.nops) { + grpc_sopb_reset(&nuke_now); + } + gpr_free(goaways); } @@ -1006,9 +1025,9 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id, if (s) { /* clear out any unreported input & output: nobody cares anymore */ - grpc_sopb_reset(&s->parser.incoming_sopb); had_outgoing = s->outgoing_sopb.nops != 0; - grpc_sopb_reset(&s->outgoing_sopb); + schedule_nuke_sopb(t, &s->parser.incoming_sopb); + schedule_nuke_sopb(t, &s->outgoing_sopb); if (s->cancelled) { send_rst = 0; } else if (!s->read_closed || !s->sent_write_closed || had_outgoing) { @@ -1238,7 +1257,7 @@ static int init_header_frame_parser(transport *t, int is_continuation) { t->incoming_stream = NULL; /* if stream is accepted, we set incoming_stream in init_stream */ t->cb->accept_stream(t->cb_user_data, &t->base, - (void *)(gpr_uintptr)t->incoming_stream_id); + (void *)(gpr_uintptr) t->incoming_stream_id); s = t->incoming_stream; if (!s) { gpr_log(GPR_ERROR, "stream not accepted"); @@ -1503,8 +1522,8 @@ static int process_read(transport *t, gpr_slice slice) { "Connect string mismatch: expected '%c' (%d) got '%c' (%d) " "at byte %d", CLIENT_CONNECT_STRING[t->deframe_state], - (int)(gpr_uint8)CLIENT_CONNECT_STRING[t->deframe_state], *cur, - (int)*cur, t->deframe_state); + (int)(gpr_uint8) CLIENT_CONNECT_STRING[t->deframe_state], + *cur, (int)*cur, t->deframe_state); drop_connection(t); return 0; } @@ -1518,7 +1537,7 @@ static int process_read(transport *t, gpr_slice slice) { dts_fh_0: case DTS_FH_0: GPR_ASSERT(cur < end); - t->incoming_frame_size = ((gpr_uint32) * cur) << 16; + t->incoming_frame_size = ((gpr_uint32)*cur) << 16; if (++cur == end) { t->deframe_state = DTS_FH_1; return 1; @@ -1526,7 +1545,7 @@ static int process_read(transport *t, gpr_slice slice) { /* fallthrough */ case DTS_FH_1: GPR_ASSERT(cur < end); - t->incoming_frame_size |= ((gpr_uint32) * cur) << 8; + t->incoming_frame_size |= ((gpr_uint32)*cur) << 8; if (++cur == end) { t->deframe_state = DTS_FH_2; return 1; @@ -1558,7 +1577,7 @@ static int process_read(transport *t, gpr_slice slice) { /* fallthrough */ case DTS_FH_5: GPR_ASSERT(cur < end); - t->incoming_stream_id = (((gpr_uint32) * cur) << 24) & 0x7f; + t->incoming_stream_id = (((gpr_uint32)*cur) << 24) & 0x7f; if (++cur == end) { t->deframe_state = DTS_FH_6; return 1; @@ -1566,7 +1585,7 @@ static int process_read(transport *t, gpr_slice slice) { /* fallthrough */ case DTS_FH_6: GPR_ASSERT(cur < end); - t->incoming_stream_id |= ((gpr_uint32) * cur) << 16; + t->incoming_stream_id |= ((gpr_uint32)*cur) << 16; if (++cur == end) { t->deframe_state = DTS_FH_7; return 1; @@ -1574,7 +1593,7 @@ static int process_read(transport *t, gpr_slice slice) { /* fallthrough */ case DTS_FH_7: GPR_ASSERT(cur < end); - t->incoming_stream_id |= ((gpr_uint32) * cur) << 8; + t->incoming_stream_id |= ((gpr_uint32)*cur) << 8; if (++cur == end) { t->deframe_state = DTS_FH_8; return 1; @@ -1582,7 +1601,7 @@ static int process_read(transport *t, gpr_slice slice) { /* fallthrough */ case DTS_FH_8: GPR_ASSERT(cur < end); - t->incoming_stream_id |= ((gpr_uint32) * cur); + t->incoming_stream_id |= ((gpr_uint32)*cur); t->deframe_state = DTS_FRAME; if (!init_frame_parser(t)) { return 0;