diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 97780d90f25416b8d784478e5d96aa3deb4c135f..b1473e06df0f4bad0990dcff2f206d21c20e0e1f 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -851,55 +851,52 @@ static bool contains_non_ok_status(grpc_metadata_batch *batch) { return false; } -static void add_fetched_slice_locked(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s); +typedef enum { CONTINUE_FETCHING, FINISHED_SLICE } continue_fetching_phase; static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_chttp2_stream *s) { - if (s->fetching_send_message == NULL) { - /* Stream was cancelled before message fetch completed */ - abort(); /* TODO(ctiller): what cleanup here? */ - return; - } - if (s->fetched_send_message_length == s->fetching_send_message->length) { - int64_t notify_offset = s->next_message_end_offset; - if (notify_offset <= s->flow_controlled_bytes_written) { - grpc_chttp2_complete_closure_step( - exec_ctx, t, s, &s->fetching_send_message_finished, GRPC_ERROR_NONE, - "fetching_send_message_finished"); - } else { - grpc_chttp2_write_cb *cb = t->write_cb_pool; - if (cb == NULL) { - cb = gpr_malloc(sizeof(*cb)); + grpc_chttp2_stream *s, + continue_fetching_phase phase) { + if (phase == FINISHED_SLICE) goto finished_slice; + for (;;) { + if (s->fetching_send_message == NULL) { + /* Stream was cancelled before message fetch completed */ + abort(); /* TODO(ctiller): what cleanup here? */ + return; /* early out */ + } + if (s->fetched_send_message_length == s->fetching_send_message->length) { + int64_t notify_offset = s->next_message_end_offset; + if (notify_offset <= s->flow_controlled_bytes_written) { + grpc_chttp2_complete_closure_step( + exec_ctx, t, s, &s->fetching_send_message_finished, GRPC_ERROR_NONE, + "fetching_send_message_finished"); } else { - t->write_cb_pool = cb->next; + grpc_chttp2_write_cb *cb = t->write_cb_pool; + if (cb == NULL) { + cb = gpr_malloc(sizeof(*cb)); + } else { + t->write_cb_pool = cb->next; + } + cb->call_at_byte = notify_offset; + cb->closure = s->fetching_send_message_finished; + s->fetching_send_message_finished = NULL; + cb->next = s->on_write_finished_cbs; + s->on_write_finished_cbs = cb; + } + s->fetching_send_message = NULL; + return; /* early out */ + } else if (grpc_byte_stream_next(exec_ctx, s->fetching_send_message, + &s->fetching_slice, UINT32_MAX, + &s->complete_fetch)) { + finished_slice: + s->fetched_send_message_length += + (uint32_t)GPR_SLICE_LENGTH(s->fetching_slice); + gpr_slice_buffer_add(&s->flow_controlled_buffer, s->fetching_slice); + if (s->id != 0) { + grpc_chttp2_become_writable(exec_ctx, t, s, true, "op.send_message"); } - cb->call_at_byte = notify_offset; - cb->closure = s->fetching_send_message_finished; - s->fetching_send_message_finished = NULL; - cb->next = s->on_write_finished_cbs; - s->on_write_finished_cbs = cb; } - s->fetching_send_message = NULL; - } else if (grpc_byte_stream_next(exec_ctx, s->fetching_send_message, - &s->fetching_slice, UINT32_MAX, - &s->complete_fetch)) { - add_fetched_slice_locked(exec_ctx, t, s); - } -} - -static void add_fetched_slice_locked(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s) { - s->fetched_send_message_length += - (uint32_t)GPR_SLICE_LENGTH(s->fetching_slice); - gpr_slice_buffer_add(&s->flow_controlled_buffer, s->fetching_slice); - if (s->id != 0) { - grpc_chttp2_become_writable(exec_ctx, t, s, true, "op.send_message"); } - continue_fetching_send_locked(exec_ctx, t, s); } static void complete_fetch_locked(grpc_exec_ctx *exec_ctx, void *gs, @@ -907,7 +904,7 @@ static void complete_fetch_locked(grpc_exec_ctx *exec_ctx, void *gs, grpc_chttp2_stream *s = gs; grpc_chttp2_transport *t = s->t; if (error == GRPC_ERROR_NONE) { - add_fetched_slice_locked(exec_ctx, t, s); + continue_fetching_send_locked(exec_ctx, t, s, FINISHED_SLICE); } else { /* TODO(ctiller): what to do here */ abort(); @@ -1042,7 +1039,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, /* TODO(ctiller): make this configurable */ s->next_message_end_offset -= 65536; } - continue_fetching_send_locked(exec_ctx, t, s); + continue_fetching_send_locked(exec_ctx, t, s, CONTINUE_FETCHING); if (s->id != 0) { grpc_chttp2_become_writable(exec_ctx, t, s, true, "op.send_message"); }