diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index f2c68df068bd35a11d9cd4c37cdc6315644045e8..d177ce4281e1168fd423f9699eb0c597e27c5e63 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -865,13 +865,34 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, } if (op->send_message != NULL) { - GPR_ASSERT(s->send_message_finished == NULL); - GPR_ASSERT(s->send_message == NULL); - s->send_message_finished = add_closure_barrier(on_complete); if (s->write_closed) { + grpc_closure *temp_barrier = add_closure_barrier(op->send_message); grpc_chttp2_complete_closure_step( - exec_ctx, t, s, &s->send_message_finished, + exec_ctx, t, s, &temp_barrier, GRPC_ERROR_CREATE("Attempt to send message after stream was closed")); + } else { + uint8_t *frame_hdr = + gpr_slice_buffer_tiny_add(&s->flow_controlled_buffer, 5); + uint32_t flags = op->send_message->flags; + frame_hdr[0] = (flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0; + size_t len = op->send_message->length; + frame_hdr[1] = (uint8_t)(len >> 24); + frame_hdr[2] = (uint8_t)(len >> 16); + frame_hdr[3] = (uint8_t)(len >> 8); + frame_hdr[4] = (uint8_t)(len); + grpc_chttp2_write_cb *write_cb = t->write_cb_pool; + if (write_cb != NULL) { + t->write_cb_pool = write_cb->next; + } else { + write_cb = gpr_malloc(sizeof(*write_cb)); + } + write_cb->next = &s->on_write_finished_cbs; + write_cb->call_at_byte = + add_send_completion(t, s, (ssize_t)() - backup, true); + } + + s->send_message_finished = add_closure_barrier(on_complete); + if (s->write_closed) { } else { s->send_message = op->send_message; if (s->id != 0) { diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index ee905369a4aff527b12a8403894a1ef613b859ff..1fef2c8f72956faec948f0529f47ce8a08585df2 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -150,6 +150,17 @@ typedef struct grpc_chttp2_outstanding_ping { struct grpc_chttp2_outstanding_ping *prev; } grpc_chttp2_outstanding_ping; +typedef struct grpc_chttp2_write_cb { + size_t call_at_byte; + grpc_closure *closure; + struct grpc_chttp2_write_cb *next; +} grpc_chttp2_write_cb; + +typedef struct grpc_chttp2_write_cb_list { + grpc_chttp2_write_cb *head; + grpc_chttp2_write_cb *tail; +} grpc_chttp2_write_cb_list; + /* forward declared in frame_data.h */ struct grpc_chttp2_incoming_byte_stream { grpc_byte_stream base; @@ -318,23 +329,9 @@ struct grpc_chttp2_transport { uint32_t goaway_last_stream_index; gpr_slice goaway_text; - /* closures to finish after writing */ - grpc_closure **finish_after_writing; - size_t finish_after_writing_count; - size_t finish_after_writing_capacity; + grpc_chttp2_write_cb *write_cb_pool; }; -typedef enum { - GRPC_CHTTP2_CALL_WHEN_SCHEDULED, - GRPC_CHTTP2_CALL_WHEN_WRITTEN, -} grpc_chttp2_call_write_cb_when; - -typedef struct grpc_chttp2_write_cb { - size_t call_at_byte; - grpc_closure *closure; - grpc_chttp2_call_write_cb_when when; -} grpc_chttp2_write_cb; - struct grpc_chttp2_stream { grpc_chttp2_transport *t; grpc_stream_refcount *refcount; @@ -422,8 +419,7 @@ struct grpc_chttp2_stream { /** HTTP2 stream id for this stream, or zero if one has not been assigned */ uint8_t fetching; bool sent_initial_metadata; - uint8_t sent_message; - uint8_t sent_trailing_metadata; + bool sent_trailing_metadata; /** how much window should we announce? */ uint32_t announce_window; gpr_slice_buffer flow_controlled_buffer; @@ -431,9 +427,9 @@ struct grpc_chttp2_stream { size_t stream_fetched; grpc_closure finished_fetch; - grpc_chttp2_write_cb *write_cbs; - size_t write_cb_count; - size_t write_cb_capacity; + grpc_chttp2_write_cb_list on_write_scheduled_cbs; + grpc_chttp2_write_cb_list on_write_finished_cbs; + grpc_chttp2_write_cb_list finish_after_write; }; /** Transport writing call flow: diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index def79cd2a5b35ad335cd71c2c83e673855b9034c..b75f5f4392bc17f0d286328a14c8a2b53e3613ff 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -40,18 +40,44 @@ #include "src/core/ext/transport/chttp2/transport/http2_errors.h" #include "src/core/lib/profiling/timers.h" -static void queue_write_callback(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s, grpc_closure **c, - grpc_error *error, - grpc_chttp2_call_write_cb_when when) { - switch (when) { - case GRPC_CHTTP2_CALL_WHEN_SCHEDULED: - grpc_chttp2_complete_closure_step(exec_ctx, t, s, c, error); - break; - case GRPC_CHTTP2_CALL_WHEN_WRITTEN: - - break; +static void add_to_write_list(grpc_chttp2_write_cb_list *list, + grpc_chttp2_write_cb *cb) { + if (list->head == NULL) { + list->head = list->tail = cb; + } else { + list->tail->next = cb; + list->tail = cb; + } + cb->next = NULL; +} + +static void finish_write_cb(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + grpc_chttp2_stream *s, grpc_chttp2_write_cb *cb, + grpc_error *error) { + grpc_chttp2_complete_closure_step(exec_ctx, t, s, &cb->closure, error); + cb->next = t->write_cb_pool; + t->write_cb_pool = cb; +} + +static void update_list(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + grpc_chttp2_stream *s, uint32_t send_bytes, + grpc_chttp2_write_cb_list *list, + grpc_chttp2_write_cb_list *done_target_or_null, + grpc_error *error) { + grpc_chttp2_write_cb *cb = list->head; + list->head = list->tail = NULL; + while (cb) { + grpc_chttp2_write_cb *next = cb->next; + if (cb->call_at_byte <= send_bytes) { + if (done_target_or_null != NULL) { + add_to_write_list(done_target_or_null, cb); + } else { + finish_write_cb(exec_ctx, t, s, cb, GRPC_ERROR_REF(error)); + } + } else { + cb->call_at_byte -= send_bytes; + add_to_write_list(list, cb); + } } } @@ -91,7 +117,6 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx, (according to available window sizes) and add to the output buffer */ while (grpc_chttp2_list_pop_writable_stream(t, &s)) { bool sent_initial_metadata = s->sent_initial_metadata; - bool become_writable = false; GRPC_CHTTP2_FLOW_MOVE_STREAM("write", t, s, outgoing_window, s, outgoing_window); @@ -101,10 +126,10 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx, grpc_chttp2_encode_header(&t->hpack_compressor, s->id, s->send_initial_metadata, 0, &s->stats.outgoing, &t->outbuf); - s->send_initial_metadata = NULL; - become_writable = true; + s->sent_initial_metadata = true; sent_initial_metadata = true; + grpc_chttp2_list_add_writing_stream(t, s); } /* send any window updates */ if (s->announce_window > 0 && s->send_initial_metadata == NULL) { @@ -122,24 +147,47 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx, uint32_t max_outgoing = (uint32_t)GPR_MIN(GRPC_CHTTP2_MAX_PAYLOAD_LENGTH, GPR_MIN(s->outgoing_window, t->outgoing_window)); - uint32_t send_bytes = - (uint32_t)GPR_MIN(max_outgoing, s->flow_controlled_buffer.length); - bool is_last_data_frame = - s->fetching_send_message == NULL && - send_bytes == s->flow_controlled_buffer.length; - bool is_last_frame = - is_last_data_frame && s->send_trailing_metadata != NULL && - grpc_metadata_batch_is_empty(s->send_trailing_metadata); - grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer, send_bytes, - is_last_frame, &s->stats.outgoing, &t->outbuf); - GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", t, s, outgoing_window, - send_bytes); - GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", t, outgoing_window, - send_bytes); - if (is_last_frame) { - s->send_trailing_metadata = NULL; - s->sent_trailing_metadata = 1; + if (max_outgoing > 0) { + uint32_t send_bytes = + (uint32_t)GPR_MIN(max_outgoing, s->flow_controlled_buffer.length); + bool is_last_data_frame = + s->fetching_send_message == NULL && + send_bytes == s->flow_controlled_buffer.length; + bool is_last_frame = + is_last_data_frame && s->send_trailing_metadata != NULL && + grpc_metadata_batch_is_empty(s->send_trailing_metadata); + grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer, send_bytes, + is_last_frame, &s->stats.outgoing, + &t->outbuf); + GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", t, s, outgoing_window, + send_bytes); + GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", t, outgoing_window, + send_bytes); + if (is_last_frame) { + s->send_trailing_metadata = NULL; + s->sent_trailing_metadata = 1; + } + update_list(exec_ctx, t, s, send_bytes, &s->on_write_finished_cbs, + &s->finish_after_write, GRPC_ERROR_NONE); + update_list(exec_ctx, t, s, send_bytes, &s->on_write_scheduled_cbs, + NULL, GRPC_ERROR_NONE); + grpc_chttp2_list_add_writing_stream(t, s); + } else if (transport->outgoing_window == 0) { + grpc_chttp2_list_add_writing_stalled_by_transport(t, s); + grpc_chttp2_list_add_writing_stream(t, s); } + } + if (s->send_trailing_metadata && s->fetching_send_message == NULL && + s->flow_controlled_buffer.length == 0) { + grpc_chttp2_encode_header(&t->hpack_compressor, s->id, + s->send_trailing_metadata, 0, + &s->stats.outgoing, &t->outbuf); + s->send_trailing_metadata = NULL; + s->sent_trailing_metadata = true; + become_writable = true; + sent_initial_metadata = true; + grpc_chttp2_list_add_writing_stream(t, s); + } #if 0 if (s->send_message != NULL) { gpr_slice hdr = gpr_slice_malloc(5); @@ -169,231 +217,226 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx, } } #endif - if (stream_global->send_trailing_metadata) { - stream_writing->send_trailing_metadata = - stream_global->send_trailing_metadata; - stream_global->send_trailing_metadata = NULL; - become_writable = true; - } - } - - if (!stream_global->read_closed && - stream_global->unannounced_incoming_window_for_writing > 1024) { - GRPC_CHTTP2_FLOW_MOVE_STREAM("write", transport_global, stream_writing, - announce_window, stream_global, - unannounced_incoming_window_for_writing); + if (stream_global->send_trailing_metadata) { + stream_writing->send_trailing_metadata = + stream_global->send_trailing_metadata; + stream_global->send_trailing_metadata = NULL; become_writable = true; } - - if (become_writable) { - grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing); - } else { - GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2_writing"); - } } - /* if the grpc_chttp2_transport is ready to send a window update, do so here - also; 3/4 is a magic number that will likely get tuned soon */ - if (transport_global->announce_incoming_window > 0) { - uint32_t announced = (uint32_t)GPR_MIN( - transport_global->announce_incoming_window, UINT32_MAX); - GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", transport_global, - announce_incoming_window, announced); - grpc_transport_one_way_stats throwaway_stats; - gpr_slice_buffer_add( - &transport_writing->outbuf, - grpc_chttp2_window_update_create(0, announced, &throwaway_stats)); + if (!stream_global->read_closed && + stream_global->unannounced_incoming_window_for_writing > 1024) { + GRPC_CHTTP2_FLOW_MOVE_STREAM("write", transport_global, stream_writing, + announce_window, stream_global, + unannounced_incoming_window_for_writing); + become_writable = true; } - GPR_TIMER_END("grpc_chttp2_unlocking_check_writes", 0); + if (become_writable) { + grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing); + } else { + GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2_writing"); + } + } - return transport_writing->outbuf.count > 0 || - grpc_chttp2_list_have_writing_streams(transport_writing); + /* if the grpc_chttp2_transport is ready to send a window update, do so here + also; 3/4 is a magic number that will likely get tuned soon */ + if (transport_global->announce_incoming_window > 0) { + uint32_t announced = (uint32_t)GPR_MIN( + transport_global->announce_incoming_window, UINT32_MAX); + GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", transport_global, + announce_incoming_window, announced); + grpc_transport_one_way_stats throwaway_stats; + gpr_slice_buffer_add( + &transport_writing->outbuf, + grpc_chttp2_window_update_create(0, announced, &throwaway_stats)); } - void grpc_chttp2_perform_writes( - grpc_exec_ctx * exec_ctx, - grpc_chttp2_transport_writing * transport_writing, - grpc_endpoint * endpoint) { - GPR_ASSERT(transport_writing->outbuf.count > 0 || - grpc_chttp2_list_have_writing_streams(transport_writing)); + GPR_TIMER_END("grpc_chttp2_unlocking_check_writes", 0); - finalize_outbuf(exec_ctx, transport_writing); + return transport_writing->outbuf.count > 0 || + grpc_chttp2_list_have_writing_streams(transport_writing); +} - GPR_ASSERT(endpoint); +void grpc_chttp2_perform_writes( + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing, + grpc_endpoint *endpoint) { + GPR_ASSERT(transport_writing->outbuf.count > 0 || + grpc_chttp2_list_have_writing_streams(transport_writing)); - if (transport_writing->outbuf.count > 0) { - grpc_endpoint_write(exec_ctx, endpoint, &transport_writing->outbuf, - &transport_writing->done_cb); - } else { - grpc_exec_ctx_sched(exec_ctx, &transport_writing->done_cb, - GRPC_ERROR_NONE, NULL); - } + finalize_outbuf(exec_ctx, transport_writing); + + GPR_ASSERT(endpoint); + + if (transport_writing->outbuf.count > 0) { + grpc_endpoint_write(exec_ctx, endpoint, &transport_writing->outbuf, + &transport_writing->done_cb); + } else { + grpc_exec_ctx_sched(exec_ctx, &transport_writing->done_cb, GRPC_ERROR_NONE, + NULL); } +} - static void finalize_outbuf( - grpc_exec_ctx * exec_ctx, - grpc_chttp2_transport_writing * transport_writing) { - grpc_chttp2_stream_writing *stream_writing; - - GPR_TIMER_BEGIN("finalize_outbuf", 0); - - bool is_first_data_frame = true; - while (grpc_chttp2_list_pop_writing_stream(transport_writing, - &stream_writing)) { - uint32_t max_outgoing = - (uint32_t)GPR_MIN(GRPC_CHTTP2_MAX_PAYLOAD_LENGTH, - GPR_MIN(stream_writing->outgoing_window, - transport_writing->outgoing_window)); - /* fetch any body bytes */ - while (!stream_writing->fetching && stream_writing->send_message && - stream_writing->flow_controlled_buffer.length < max_outgoing && - stream_writing->stream_fetched < - stream_writing->send_message->length) { - if (grpc_byte_stream_next(exec_ctx, stream_writing->send_message, - &stream_writing->fetching_slice, max_outgoing, - &stream_writing->finished_fetch)) { - stream_writing->stream_fetched += - GPR_SLICE_LENGTH(stream_writing->fetching_slice); - if (stream_writing->stream_fetched == - stream_writing->send_message->length) { - stream_writing->send_message = NULL; - } - gpr_slice_buffer_add(&stream_writing->flow_controlled_buffer, - stream_writing->fetching_slice); - } else { - stream_writing->fetching = 1; +static void finalize_outbuf(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport_writing *transport_writing) { + grpc_chttp2_stream_writing *stream_writing; + + GPR_TIMER_BEGIN("finalize_outbuf", 0); + + bool is_first_data_frame = true; + while ( + grpc_chttp2_list_pop_writing_stream(transport_writing, &stream_writing)) { + uint32_t max_outgoing = + (uint32_t)GPR_MIN(GRPC_CHTTP2_MAX_PAYLOAD_LENGTH, + GPR_MIN(stream_writing->outgoing_window, + transport_writing->outgoing_window)); + /* fetch any body bytes */ + while (!stream_writing->fetching && stream_writing->send_message && + stream_writing->flow_controlled_buffer.length < max_outgoing && + stream_writing->stream_fetched < + stream_writing->send_message->length) { + if (grpc_byte_stream_next(exec_ctx, stream_writing->send_message, + &stream_writing->fetching_slice, max_outgoing, + &stream_writing->finished_fetch)) { + stream_writing->stream_fetched += + GPR_SLICE_LENGTH(stream_writing->fetching_slice); + if (stream_writing->stream_fetched == + stream_writing->send_message->length) { + stream_writing->send_message = NULL; } + gpr_slice_buffer_add(&stream_writing->flow_controlled_buffer, + stream_writing->fetching_slice); + } else { + stream_writing->fetching = 1; } - /* send any body bytes */ - if (stream_writing->flow_controlled_buffer.length > 0) { - if (max_outgoing > 0) { - uint32_t send_bytes = (uint32_t)GPR_MIN( - max_outgoing, stream_writing->flow_controlled_buffer.length); - int is_last_data_frame = - stream_writing->send_message == NULL && - send_bytes == stream_writing->flow_controlled_buffer.length; - int is_last_frame = is_last_data_frame && - stream_writing->send_trailing_metadata != NULL && - grpc_metadata_batch_is_empty( - stream_writing->send_trailing_metadata); - grpc_chttp2_encode_data( - stream_writing->id, &stream_writing->flow_controlled_buffer, - send_bytes, is_last_frame, &stream_writing->stats, - &transport_writing->outbuf); - if (is_first_data_frame) { - /* TODO(dgq): this is a hack. It'll be fix in a future refactoring - */ - stream_writing->stats.data_bytes -= 5; /* discount grpc framing */ - is_first_data_frame = false; - } - GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", transport_writing, - stream_writing, outgoing_window, - send_bytes); - GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", transport_writing, - outgoing_window, send_bytes); - if (is_last_frame) { - stream_writing->send_trailing_metadata = NULL; - stream_writing->sent_trailing_metadata = 1; - } - if (is_last_data_frame) { - GPR_ASSERT(stream_writing->send_message == NULL); - stream_writing->sent_message = 1; - } - } else if (transport_writing->outgoing_window == 0) { - grpc_chttp2_list_add_writing_stalled_by_transport(transport_writing, - stream_writing); - grpc_chttp2_list_add_written_stream(transport_writing, - stream_writing); + } + /* send any body bytes */ + if (stream_writing->flow_controlled_buffer.length > 0) { + if (max_outgoing > 0) { + uint32_t send_bytes = (uint32_t)GPR_MIN( + max_outgoing, stream_writing->flow_controlled_buffer.length); + int is_last_data_frame = + stream_writing->send_message == NULL && + send_bytes == stream_writing->flow_controlled_buffer.length; + int is_last_frame = is_last_data_frame && + stream_writing->send_trailing_metadata != NULL && + grpc_metadata_batch_is_empty( + stream_writing->send_trailing_metadata); + grpc_chttp2_encode_data( + stream_writing->id, &stream_writing->flow_controlled_buffer, + send_bytes, is_last_frame, &stream_writing->stats, + &transport_writing->outbuf); + if (is_first_data_frame) { + /* TODO(dgq): this is a hack. It'll be fix in a future refactoring + */ + stream_writing->stats.data_bytes -= 5; /* discount grpc framing */ + is_first_data_frame = false; } - } - /* send trailing metadata if it's available and we're ready for it */ - if (stream_writing->send_message == NULL && - stream_writing->flow_controlled_buffer.length == 0 && - stream_writing->send_trailing_metadata != NULL) { - if (grpc_metadata_batch_is_empty( - stream_writing->send_trailing_metadata)) { - grpc_chttp2_encode_data( - stream_writing->id, &stream_writing->flow_controlled_buffer, 0, 1, - &stream_writing->stats, &transport_writing->outbuf); - } else { - grpc_chttp2_encode_header( - &transport_writing->hpack_compressor, stream_writing->id, - stream_writing->send_trailing_metadata, 1, &stream_writing->stats, - &transport_writing->outbuf); + GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", transport_writing, + stream_writing, outgoing_window, + send_bytes); + GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", transport_writing, + outgoing_window, send_bytes); + if (is_last_frame) { + stream_writing->send_trailing_metadata = NULL; + stream_writing->sent_trailing_metadata = 1; } - if (!transport_writing->is_client && !stream_writing->read_closed) { - gpr_slice_buffer_add(&transport_writing->outbuf, - grpc_chttp2_rst_stream_create( - stream_writing->id, GRPC_CHTTP2_NO_ERROR, - &stream_writing->stats)); + if (is_last_data_frame) { + GPR_ASSERT(stream_writing->send_message == NULL); + stream_writing->sent_message = 1; } - stream_writing->send_trailing_metadata = NULL; - stream_writing->sent_trailing_metadata = 1; + } else if (transport_writing->outgoing_window == 0) { + grpc_chttp2_list_add_writing_stalled_by_transport(transport_writing, + stream_writing); + grpc_chttp2_list_add_written_stream(transport_writing, stream_writing); } - /* if there's more to write, then loop, otherwise prepare to finish the - * write */ - if ((stream_writing->flow_controlled_buffer.length > 0 || - (stream_writing->send_message && !stream_writing->fetching)) && - stream_writing->outgoing_window > 0) { - if (transport_writing->outgoing_window > 0) { - grpc_chttp2_list_add_writing_stream(transport_writing, - stream_writing); - } else { - grpc_chttp2_list_add_writing_stalled_by_transport(transport_writing, - stream_writing); - grpc_chttp2_list_add_written_stream(transport_writing, - stream_writing); - } + } + /* send trailing metadata if it's available and we're ready for it */ + if (stream_writing->send_message == NULL && + stream_writing->flow_controlled_buffer.length == 0 && + stream_writing->send_trailing_metadata != NULL) { + if (grpc_metadata_batch_is_empty( + stream_writing->send_trailing_metadata)) { + grpc_chttp2_encode_data( + stream_writing->id, &stream_writing->flow_controlled_buffer, 0, 1, + &stream_writing->stats, &transport_writing->outbuf); + } else { + grpc_chttp2_encode_header( + &transport_writing->hpack_compressor, stream_writing->id, + stream_writing->send_trailing_metadata, 1, &stream_writing->stats, + &transport_writing->outbuf); + } + if (!transport_writing->is_client && !stream_writing->read_closed) { + gpr_slice_buffer_add(&transport_writing->outbuf, + grpc_chttp2_rst_stream_create( + stream_writing->id, GRPC_CHTTP2_NO_ERROR, + &stream_writing->stats)); + } + stream_writing->send_trailing_metadata = NULL; + stream_writing->sent_trailing_metadata = 1; + } + /* if there's more to write, then loop, otherwise prepare to finish the + * write */ + if ((stream_writing->flow_controlled_buffer.length > 0 || + (stream_writing->send_message && !stream_writing->fetching)) && + stream_writing->outgoing_window > 0) { + if (transport_writing->outgoing_window > 0) { + grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing); } else { + grpc_chttp2_list_add_writing_stalled_by_transport(transport_writing, + stream_writing); grpc_chttp2_list_add_written_stream(transport_writing, stream_writing); } + } else { + grpc_chttp2_list_add_written_stream(transport_writing, stream_writing); } + } + + GPR_TIMER_END("finalize_outbuf", 0); +} - GPR_TIMER_END("finalize_outbuf", 0); +void grpc_chttp2_cleanup_writing( + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, + grpc_chttp2_transport_writing *transport_writing) { + GPR_TIMER_BEGIN("grpc_chttp2_cleanup_writing", 0); + grpc_chttp2_stream_writing *stream_writing; + grpc_chttp2_stream_global *stream_global; + + if (grpc_chttp2_list_flush_writing_stalled_by_transport(exec_ctx, + transport_writing)) { + grpc_chttp2_initiate_write(exec_ctx, transport_global, false, + "resume_stalled_stream"); } - void grpc_chttp2_cleanup_writing( - grpc_exec_ctx * exec_ctx, grpc_chttp2_transport_global * transport_global, - grpc_chttp2_transport_writing * transport_writing) { - GPR_TIMER_BEGIN("grpc_chttp2_cleanup_writing", 0); - grpc_chttp2_stream_writing *stream_writing; - grpc_chttp2_stream_global *stream_global; - - if (grpc_chttp2_list_flush_writing_stalled_by_transport( - exec_ctx, transport_writing)) { - grpc_chttp2_initiate_write(exec_ctx, transport_global, false, - "resume_stalled_stream"); + while (grpc_chttp2_list_pop_written_stream( + transport_global, transport_writing, &stream_global, &stream_writing)) { + if (stream_writing->sent_initial_metadata) { + grpc_chttp2_complete_closure_step( + exec_ctx, transport_global, stream_global, + &stream_global->send_initial_metadata_finished, GRPC_ERROR_NONE); } - - while (grpc_chttp2_list_pop_written_stream( - transport_global, transport_writing, &stream_global, &stream_writing)) { - if (stream_writing->sent_initial_metadata) { - grpc_chttp2_complete_closure_step( - exec_ctx, transport_global, stream_global, - &stream_global->send_initial_metadata_finished, GRPC_ERROR_NONE); - } - grpc_transport_move_one_way_stats(&stream_writing->stats, - &stream_global->stats.outgoing); - if (stream_writing->sent_message) { - GPR_ASSERT(stream_writing->send_message == NULL); - grpc_chttp2_complete_closure_step( - exec_ctx, transport_global, stream_global, - &stream_global->send_message_finished, GRPC_ERROR_NONE); - stream_writing->sent_message = 0; - } - if (stream_writing->sent_trailing_metadata) { - grpc_chttp2_complete_closure_step( - exec_ctx, transport_global, stream_global, - &stream_global->send_trailing_metadata_finished, GRPC_ERROR_NONE); - } - if (stream_writing->sent_trailing_metadata) { - grpc_chttp2_mark_stream_closed( - exec_ctx, transport_global, stream_global, - !transport_global->is_client, 1, GRPC_ERROR_NONE); - } - GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2_writing"); + grpc_transport_move_one_way_stats(&stream_writing->stats, + &stream_global->stats.outgoing); + if (stream_writing->sent_message) { + GPR_ASSERT(stream_writing->send_message == NULL); + grpc_chttp2_complete_closure_step( + exec_ctx, transport_global, stream_global, + &stream_global->send_message_finished, GRPC_ERROR_NONE); + stream_writing->sent_message = 0; + } + if (stream_writing->sent_trailing_metadata) { + grpc_chttp2_complete_closure_step( + exec_ctx, transport_global, stream_global, + &stream_global->send_trailing_metadata_finished, GRPC_ERROR_NONE); } - gpr_slice_buffer_reset_and_unref(&transport_writing->outbuf); - GPR_TIMER_END("grpc_chttp2_cleanup_writing", 0); + if (stream_writing->sent_trailing_metadata) { + grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global, + !transport_global->is_client, 1, + GRPC_ERROR_NONE); + } + GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2_writing"); } + gpr_slice_buffer_reset_and_unref(&transport_writing->outbuf); + GPR_TIMER_END("grpc_chttp2_cleanup_writing", 0); +}