From 0b8e0d5071bc135380e7e855ba33c92b082d49ff Mon Sep 17 00:00:00 2001
From: Craig Tiller <ctiller@google.com>
Date: Mon, 29 Aug 2016 08:52:42 -0700
Subject: [PATCH] Write work

---
 .../chttp2/transport/chttp2_transport.c       |  29 +-
 .../ext/transport/chttp2/transport/internal.h |  36 +-
 .../ext/transport/chttp2/transport/writing.c  | 503 ++++++++++--------
 3 files changed, 314 insertions(+), 254 deletions(-)

diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index f2c68df068..d177ce4281 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -865,13 +865,34 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
   }
 
   if (op->send_message != NULL) {
-    GPR_ASSERT(s->send_message_finished == NULL);
-    GPR_ASSERT(s->send_message == NULL);
-    s->send_message_finished = add_closure_barrier(on_complete);
     if (s->write_closed) {
+      grpc_closure *temp_barrier = add_closure_barrier(op->send_message);
       grpc_chttp2_complete_closure_step(
-          exec_ctx, t, s, &s->send_message_finished,
+          exec_ctx, t, s, &temp_barrier,
           GRPC_ERROR_CREATE("Attempt to send message after stream was closed"));
+    } else {
+      uint8_t *frame_hdr =
+          gpr_slice_buffer_tiny_add(&s->flow_controlled_buffer, 5);
+      uint32_t flags = op->send_message->flags;
+      frame_hdr[0] = (flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0;
+      size_t len = op->send_message->length;
+      frame_hdr[1] = (uint8_t)(len >> 24);
+      frame_hdr[2] = (uint8_t)(len >> 16);
+      frame_hdr[3] = (uint8_t)(len >> 8);
+      frame_hdr[4] = (uint8_t)(len);
+      grpc_chttp2_write_cb *write_cb = t->write_cb_pool;
+      if (write_cb != NULL) {
+        t->write_cb_pool = write_cb->next;
+      } else {
+        write_cb = gpr_malloc(sizeof(*write_cb));
+      }
+      write_cb->next = &s->on_write_finished_cbs;
+      write_cb->call_at_byte =
+          add_send_completion(t, s, (ssize_t)() - backup, true);
+    }
+
+    s->send_message_finished = add_closure_barrier(on_complete);
+    if (s->write_closed) {
     } else {
       s->send_message = op->send_message;
       if (s->id != 0) {
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index ee905369a4..1fef2c8f72 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -150,6 +150,17 @@ typedef struct grpc_chttp2_outstanding_ping {
   struct grpc_chttp2_outstanding_ping *prev;
 } grpc_chttp2_outstanding_ping;
 
+typedef struct grpc_chttp2_write_cb {
+  size_t call_at_byte;
+  grpc_closure *closure;
+  struct grpc_chttp2_write_cb *next;
+} grpc_chttp2_write_cb;
+
+typedef struct grpc_chttp2_write_cb_list {
+  grpc_chttp2_write_cb *head;
+  grpc_chttp2_write_cb *tail;
+} grpc_chttp2_write_cb_list;
+
 /* forward declared in frame_data.h */
 struct grpc_chttp2_incoming_byte_stream {
   grpc_byte_stream base;
@@ -318,23 +329,9 @@ struct grpc_chttp2_transport {
   uint32_t goaway_last_stream_index;
   gpr_slice goaway_text;
 
-  /* closures to finish after writing */
-  grpc_closure **finish_after_writing;
-  size_t finish_after_writing_count;
-  size_t finish_after_writing_capacity;
+  grpc_chttp2_write_cb *write_cb_pool;
 };
 
-typedef enum {
-  GRPC_CHTTP2_CALL_WHEN_SCHEDULED,
-  GRPC_CHTTP2_CALL_WHEN_WRITTEN,
-} grpc_chttp2_call_write_cb_when;
-
-typedef struct grpc_chttp2_write_cb {
-  size_t call_at_byte;
-  grpc_closure *closure;
-  grpc_chttp2_call_write_cb_when when;
-} grpc_chttp2_write_cb;
-
 struct grpc_chttp2_stream {
   grpc_chttp2_transport *t;
   grpc_stream_refcount *refcount;
@@ -422,8 +419,7 @@ struct grpc_chttp2_stream {
   /** HTTP2 stream id for this stream, or zero if one has not been assigned */
   uint8_t fetching;
   bool sent_initial_metadata;
-  uint8_t sent_message;
-  uint8_t sent_trailing_metadata;
+  bool sent_trailing_metadata;
   /** how much window should we announce? */
   uint32_t announce_window;
   gpr_slice_buffer flow_controlled_buffer;
@@ -431,9 +427,9 @@ struct grpc_chttp2_stream {
   size_t stream_fetched;
   grpc_closure finished_fetch;
 
-  grpc_chttp2_write_cb *write_cbs;
-  size_t write_cb_count;
-  size_t write_cb_capacity;
+  grpc_chttp2_write_cb_list on_write_scheduled_cbs;
+  grpc_chttp2_write_cb_list on_write_finished_cbs;
+  grpc_chttp2_write_cb_list finish_after_write;
 };
 
 /** Transport writing call flow:
diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c
index def79cd2a5..b75f5f4392 100644
--- a/src/core/ext/transport/chttp2/transport/writing.c
+++ b/src/core/ext/transport/chttp2/transport/writing.c
@@ -40,18 +40,44 @@
 #include "src/core/ext/transport/chttp2/transport/http2_errors.h"
 #include "src/core/lib/profiling/timers.h"
 
-static void queue_write_callback(grpc_exec_ctx *exec_ctx,
-                                 grpc_chttp2_transport *t,
-                                 grpc_chttp2_stream *s, grpc_closure **c,
-                                 grpc_error *error,
-                                 grpc_chttp2_call_write_cb_when when) {
-  switch (when) {
-    case GRPC_CHTTP2_CALL_WHEN_SCHEDULED:
-      grpc_chttp2_complete_closure_step(exec_ctx, t, s, c, error);
-      break;
-    case GRPC_CHTTP2_CALL_WHEN_WRITTEN:
-
-      break;
+static void add_to_write_list(grpc_chttp2_write_cb_list *list,
+                              grpc_chttp2_write_cb *cb) {
+  if (list->head == NULL) {
+    list->head = list->tail = cb;
+  } else {
+    list->tail->next = cb;
+    list->tail = cb;
+  }
+  cb->next = NULL;
+}
+
+static void finish_write_cb(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+                            grpc_chttp2_stream *s, grpc_chttp2_write_cb *cb,
+                            grpc_error *error) {
+  grpc_chttp2_complete_closure_step(exec_ctx, t, s, &cb->closure, error);
+  cb->next = t->write_cb_pool;
+  t->write_cb_pool = cb;
+}
+
+static void update_list(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+                        grpc_chttp2_stream *s, uint32_t send_bytes,
+                        grpc_chttp2_write_cb_list *list,
+                        grpc_chttp2_write_cb_list *done_target_or_null,
+                        grpc_error *error) {
+  grpc_chttp2_write_cb *cb = list->head;
+  list->head = list->tail = NULL;
+  while (cb) {
+    grpc_chttp2_write_cb *next = cb->next;
+    if (cb->call_at_byte <= send_bytes) {
+      if (done_target_or_null != NULL) {
+        add_to_write_list(done_target_or_null, cb);
+      } else {
+        finish_write_cb(exec_ctx, t, s, cb, GRPC_ERROR_REF(error));
+      }
+    } else {
+      cb->call_at_byte -= send_bytes;
+      add_to_write_list(list, cb);
+    }
   }
 }
 
@@ -91,7 +117,6 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
      (according to available window sizes) and add to the output buffer */
   while (grpc_chttp2_list_pop_writable_stream(t, &s)) {
     bool sent_initial_metadata = s->sent_initial_metadata;
-    bool become_writable = false;
 
     GRPC_CHTTP2_FLOW_MOVE_STREAM("write", t, s, outgoing_window, s,
                                  outgoing_window);
@@ -101,10 +126,10 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
       grpc_chttp2_encode_header(&t->hpack_compressor, s->id,
                                 s->send_initial_metadata, 0, &s->stats.outgoing,
                                 &t->outbuf);
-
       s->send_initial_metadata = NULL;
-      become_writable = true;
+      s->sent_initial_metadata = true;
       sent_initial_metadata = true;
+      grpc_chttp2_list_add_writing_stream(t, s);
     }
     /* send any window updates */
     if (s->announce_window > 0 && s->send_initial_metadata == NULL) {
@@ -122,24 +147,47 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
         uint32_t max_outgoing =
             (uint32_t)GPR_MIN(GRPC_CHTTP2_MAX_PAYLOAD_LENGTH,
                               GPR_MIN(s->outgoing_window, t->outgoing_window));
-        uint32_t send_bytes =
-            (uint32_t)GPR_MIN(max_outgoing, s->flow_controlled_buffer.length);
-        bool is_last_data_frame =
-            s->fetching_send_message == NULL &&
-            send_bytes == s->flow_controlled_buffer.length;
-        bool is_last_frame =
-            is_last_data_frame && s->send_trailing_metadata != NULL &&
-            grpc_metadata_batch_is_empty(s->send_trailing_metadata);
-        grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer, send_bytes,
-                                is_last_frame, &s->stats.outgoing, &t->outbuf);
-        GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", t, s, outgoing_window,
-                                      send_bytes);
-        GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", t, outgoing_window,
-                                         send_bytes);
-        if (is_last_frame) {
-          s->send_trailing_metadata = NULL;
-          s->sent_trailing_metadata = 1;
+        if (max_outgoing > 0) {
+          uint32_t send_bytes =
+              (uint32_t)GPR_MIN(max_outgoing, s->flow_controlled_buffer.length);
+          bool is_last_data_frame =
+              s->fetching_send_message == NULL &&
+              send_bytes == s->flow_controlled_buffer.length;
+          bool is_last_frame =
+              is_last_data_frame && s->send_trailing_metadata != NULL &&
+              grpc_metadata_batch_is_empty(s->send_trailing_metadata);
+          grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer, send_bytes,
+                                  is_last_frame, &s->stats.outgoing,
+                                  &t->outbuf);
+          GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", t, s, outgoing_window,
+                                        send_bytes);
+          GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", t, outgoing_window,
+                                           send_bytes);
+          if (is_last_frame) {
+            s->send_trailing_metadata = NULL;
+            s->sent_trailing_metadata = 1;
+          }
+          update_list(exec_ctx, t, s, send_bytes, &s->on_write_finished_cbs,
+                      &s->finish_after_write, GRPC_ERROR_NONE);
+          update_list(exec_ctx, t, s, send_bytes, &s->on_write_scheduled_cbs,
+                      NULL, GRPC_ERROR_NONE);
+          grpc_chttp2_list_add_writing_stream(t, s);
+        } else if (transport->outgoing_window == 0) {
+          grpc_chttp2_list_add_writing_stalled_by_transport(t, s);
+          grpc_chttp2_list_add_writing_stream(t, s);
         }
+      }
+      if (s->send_trailing_metadata && s->fetching_send_message == NULL &&
+          s->flow_controlled_buffer.length == 0) {
+        grpc_chttp2_encode_header(&t->hpack_compressor, s->id,
+                                  s->send_trailing_metadata, 0,
+                                  &s->stats.outgoing, &t->outbuf);
+        s->send_trailing_metadata = NULL;
+        s->sent_trailing_metadata = true;
+        become_writable = true;
+        sent_initial_metadata = true;
+        grpc_chttp2_list_add_writing_stream(t, s);
+      }
 #if 0
       if (s->send_message != NULL) {
         gpr_slice hdr = gpr_slice_malloc(5);
@@ -169,231 +217,226 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
         }
       }
 #endif
-        if (stream_global->send_trailing_metadata) {
-          stream_writing->send_trailing_metadata =
-              stream_global->send_trailing_metadata;
-          stream_global->send_trailing_metadata = NULL;
-          become_writable = true;
-        }
-      }
-
-      if (!stream_global->read_closed &&
-          stream_global->unannounced_incoming_window_for_writing > 1024) {
-        GRPC_CHTTP2_FLOW_MOVE_STREAM("write", transport_global, stream_writing,
-                                     announce_window, stream_global,
-                                     unannounced_incoming_window_for_writing);
+      if (stream_global->send_trailing_metadata) {
+        stream_writing->send_trailing_metadata =
+            stream_global->send_trailing_metadata;
+        stream_global->send_trailing_metadata = NULL;
         become_writable = true;
       }
-
-      if (become_writable) {
-        grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing);
-      } else {
-        GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2_writing");
-      }
     }
 
-    /* if the grpc_chttp2_transport is ready to send a window update, do so here
-       also; 3/4 is a magic number that will likely get tuned soon */
-    if (transport_global->announce_incoming_window > 0) {
-      uint32_t announced = (uint32_t)GPR_MIN(
-          transport_global->announce_incoming_window, UINT32_MAX);
-      GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", transport_global,
-                                       announce_incoming_window, announced);
-      grpc_transport_one_way_stats throwaway_stats;
-      gpr_slice_buffer_add(
-          &transport_writing->outbuf,
-          grpc_chttp2_window_update_create(0, announced, &throwaway_stats));
+    if (!stream_global->read_closed &&
+        stream_global->unannounced_incoming_window_for_writing > 1024) {
+      GRPC_CHTTP2_FLOW_MOVE_STREAM("write", transport_global, stream_writing,
+                                   announce_window, stream_global,
+                                   unannounced_incoming_window_for_writing);
+      become_writable = true;
     }
 
-    GPR_TIMER_END("grpc_chttp2_unlocking_check_writes", 0);
+    if (become_writable) {
+      grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing);
+    } else {
+      GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2_writing");
+    }
+  }
 
-    return transport_writing->outbuf.count > 0 ||
-           grpc_chttp2_list_have_writing_streams(transport_writing);
+  /* if the grpc_chttp2_transport is ready to send a window update, do so here
+     also; 3/4 is a magic number that will likely get tuned soon */
+  if (transport_global->announce_incoming_window > 0) {
+    uint32_t announced = (uint32_t)GPR_MIN(
+        transport_global->announce_incoming_window, UINT32_MAX);
+    GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", transport_global,
+                                     announce_incoming_window, announced);
+    grpc_transport_one_way_stats throwaway_stats;
+    gpr_slice_buffer_add(
+        &transport_writing->outbuf,
+        grpc_chttp2_window_update_create(0, announced, &throwaway_stats));
   }
 
-  void grpc_chttp2_perform_writes(
-      grpc_exec_ctx * exec_ctx,
-      grpc_chttp2_transport_writing * transport_writing,
-      grpc_endpoint * endpoint) {
-    GPR_ASSERT(transport_writing->outbuf.count > 0 ||
-               grpc_chttp2_list_have_writing_streams(transport_writing));
+  GPR_TIMER_END("grpc_chttp2_unlocking_check_writes", 0);
 
-    finalize_outbuf(exec_ctx, transport_writing);
+  return transport_writing->outbuf.count > 0 ||
+         grpc_chttp2_list_have_writing_streams(transport_writing);
+}
 
-    GPR_ASSERT(endpoint);
+void grpc_chttp2_perform_writes(
+    grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing,
+    grpc_endpoint *endpoint) {
+  GPR_ASSERT(transport_writing->outbuf.count > 0 ||
+             grpc_chttp2_list_have_writing_streams(transport_writing));
 
-    if (transport_writing->outbuf.count > 0) {
-      grpc_endpoint_write(exec_ctx, endpoint, &transport_writing->outbuf,
-                          &transport_writing->done_cb);
-    } else {
-      grpc_exec_ctx_sched(exec_ctx, &transport_writing->done_cb,
-                          GRPC_ERROR_NONE, NULL);
-    }
+  finalize_outbuf(exec_ctx, transport_writing);
+
+  GPR_ASSERT(endpoint);
+
+  if (transport_writing->outbuf.count > 0) {
+    grpc_endpoint_write(exec_ctx, endpoint, &transport_writing->outbuf,
+                        &transport_writing->done_cb);
+  } else {
+    grpc_exec_ctx_sched(exec_ctx, &transport_writing->done_cb, GRPC_ERROR_NONE,
+                        NULL);
   }
+}
 
-  static void finalize_outbuf(
-      grpc_exec_ctx * exec_ctx,
-      grpc_chttp2_transport_writing * transport_writing) {
-    grpc_chttp2_stream_writing *stream_writing;
-
-    GPR_TIMER_BEGIN("finalize_outbuf", 0);
-
-    bool is_first_data_frame = true;
-    while (grpc_chttp2_list_pop_writing_stream(transport_writing,
-                                               &stream_writing)) {
-      uint32_t max_outgoing =
-          (uint32_t)GPR_MIN(GRPC_CHTTP2_MAX_PAYLOAD_LENGTH,
-                            GPR_MIN(stream_writing->outgoing_window,
-                                    transport_writing->outgoing_window));
-      /* fetch any body bytes */
-      while (!stream_writing->fetching && stream_writing->send_message &&
-             stream_writing->flow_controlled_buffer.length < max_outgoing &&
-             stream_writing->stream_fetched <
-                 stream_writing->send_message->length) {
-        if (grpc_byte_stream_next(exec_ctx, stream_writing->send_message,
-                                  &stream_writing->fetching_slice, max_outgoing,
-                                  &stream_writing->finished_fetch)) {
-          stream_writing->stream_fetched +=
-              GPR_SLICE_LENGTH(stream_writing->fetching_slice);
-          if (stream_writing->stream_fetched ==
-              stream_writing->send_message->length) {
-            stream_writing->send_message = NULL;
-          }
-          gpr_slice_buffer_add(&stream_writing->flow_controlled_buffer,
-                               stream_writing->fetching_slice);
-        } else {
-          stream_writing->fetching = 1;
+static void finalize_outbuf(grpc_exec_ctx *exec_ctx,
+                            grpc_chttp2_transport_writing *transport_writing) {
+  grpc_chttp2_stream_writing *stream_writing;
+
+  GPR_TIMER_BEGIN("finalize_outbuf", 0);
+
+  bool is_first_data_frame = true;
+  while (
+      grpc_chttp2_list_pop_writing_stream(transport_writing, &stream_writing)) {
+    uint32_t max_outgoing =
+        (uint32_t)GPR_MIN(GRPC_CHTTP2_MAX_PAYLOAD_LENGTH,
+                          GPR_MIN(stream_writing->outgoing_window,
+                                  transport_writing->outgoing_window));
+    /* fetch any body bytes */
+    while (!stream_writing->fetching && stream_writing->send_message &&
+           stream_writing->flow_controlled_buffer.length < max_outgoing &&
+           stream_writing->stream_fetched <
+               stream_writing->send_message->length) {
+      if (grpc_byte_stream_next(exec_ctx, stream_writing->send_message,
+                                &stream_writing->fetching_slice, max_outgoing,
+                                &stream_writing->finished_fetch)) {
+        stream_writing->stream_fetched +=
+            GPR_SLICE_LENGTH(stream_writing->fetching_slice);
+        if (stream_writing->stream_fetched ==
+            stream_writing->send_message->length) {
+          stream_writing->send_message = NULL;
         }
+        gpr_slice_buffer_add(&stream_writing->flow_controlled_buffer,
+                             stream_writing->fetching_slice);
+      } else {
+        stream_writing->fetching = 1;
       }
-      /* send any body bytes */
-      if (stream_writing->flow_controlled_buffer.length > 0) {
-        if (max_outgoing > 0) {
-          uint32_t send_bytes = (uint32_t)GPR_MIN(
-              max_outgoing, stream_writing->flow_controlled_buffer.length);
-          int is_last_data_frame =
-              stream_writing->send_message == NULL &&
-              send_bytes == stream_writing->flow_controlled_buffer.length;
-          int is_last_frame = is_last_data_frame &&
-                              stream_writing->send_trailing_metadata != NULL &&
-                              grpc_metadata_batch_is_empty(
-                                  stream_writing->send_trailing_metadata);
-          grpc_chttp2_encode_data(
-              stream_writing->id, &stream_writing->flow_controlled_buffer,
-              send_bytes, is_last_frame, &stream_writing->stats,
-              &transport_writing->outbuf);
-          if (is_first_data_frame) {
-            /* TODO(dgq): this is a hack. It'll be fix in a future refactoring
-             */
-            stream_writing->stats.data_bytes -= 5; /* discount grpc framing */
-            is_first_data_frame = false;
-          }
-          GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", transport_writing,
-                                        stream_writing, outgoing_window,
-                                        send_bytes);
-          GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", transport_writing,
-                                           outgoing_window, send_bytes);
-          if (is_last_frame) {
-            stream_writing->send_trailing_metadata = NULL;
-            stream_writing->sent_trailing_metadata = 1;
-          }
-          if (is_last_data_frame) {
-            GPR_ASSERT(stream_writing->send_message == NULL);
-            stream_writing->sent_message = 1;
-          }
-        } else if (transport_writing->outgoing_window == 0) {
-          grpc_chttp2_list_add_writing_stalled_by_transport(transport_writing,
-                                                            stream_writing);
-          grpc_chttp2_list_add_written_stream(transport_writing,
-                                              stream_writing);
+    }
+    /* send any body bytes */
+    if (stream_writing->flow_controlled_buffer.length > 0) {
+      if (max_outgoing > 0) {
+        uint32_t send_bytes = (uint32_t)GPR_MIN(
+            max_outgoing, stream_writing->flow_controlled_buffer.length);
+        int is_last_data_frame =
+            stream_writing->send_message == NULL &&
+            send_bytes == stream_writing->flow_controlled_buffer.length;
+        int is_last_frame = is_last_data_frame &&
+                            stream_writing->send_trailing_metadata != NULL &&
+                            grpc_metadata_batch_is_empty(
+                                stream_writing->send_trailing_metadata);
+        grpc_chttp2_encode_data(
+            stream_writing->id, &stream_writing->flow_controlled_buffer,
+            send_bytes, is_last_frame, &stream_writing->stats,
+            &transport_writing->outbuf);
+        if (is_first_data_frame) {
+          /* TODO(dgq): this is a hack. It'll be fix in a future refactoring
+           */
+          stream_writing->stats.data_bytes -= 5; /* discount grpc framing */
+          is_first_data_frame = false;
         }
-      }
-      /* send trailing metadata if it's available and we're ready for it */
-      if (stream_writing->send_message == NULL &&
-          stream_writing->flow_controlled_buffer.length == 0 &&
-          stream_writing->send_trailing_metadata != NULL) {
-        if (grpc_metadata_batch_is_empty(
-                stream_writing->send_trailing_metadata)) {
-          grpc_chttp2_encode_data(
-              stream_writing->id, &stream_writing->flow_controlled_buffer, 0, 1,
-              &stream_writing->stats, &transport_writing->outbuf);
-        } else {
-          grpc_chttp2_encode_header(
-              &transport_writing->hpack_compressor, stream_writing->id,
-              stream_writing->send_trailing_metadata, 1, &stream_writing->stats,
-              &transport_writing->outbuf);
+        GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", transport_writing,
+                                      stream_writing, outgoing_window,
+                                      send_bytes);
+        GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", transport_writing,
+                                         outgoing_window, send_bytes);
+        if (is_last_frame) {
+          stream_writing->send_trailing_metadata = NULL;
+          stream_writing->sent_trailing_metadata = 1;
         }
-        if (!transport_writing->is_client && !stream_writing->read_closed) {
-          gpr_slice_buffer_add(&transport_writing->outbuf,
-                               grpc_chttp2_rst_stream_create(
-                                   stream_writing->id, GRPC_CHTTP2_NO_ERROR,
-                                   &stream_writing->stats));
+        if (is_last_data_frame) {
+          GPR_ASSERT(stream_writing->send_message == NULL);
+          stream_writing->sent_message = 1;
         }
-        stream_writing->send_trailing_metadata = NULL;
-        stream_writing->sent_trailing_metadata = 1;
+      } else if (transport_writing->outgoing_window == 0) {
+        grpc_chttp2_list_add_writing_stalled_by_transport(transport_writing,
+                                                          stream_writing);
+        grpc_chttp2_list_add_written_stream(transport_writing, stream_writing);
       }
-      /* if there's more to write, then loop, otherwise prepare to finish the
-       * write */
-      if ((stream_writing->flow_controlled_buffer.length > 0 ||
-           (stream_writing->send_message && !stream_writing->fetching)) &&
-          stream_writing->outgoing_window > 0) {
-        if (transport_writing->outgoing_window > 0) {
-          grpc_chttp2_list_add_writing_stream(transport_writing,
-                                              stream_writing);
-        } else {
-          grpc_chttp2_list_add_writing_stalled_by_transport(transport_writing,
-                                                            stream_writing);
-          grpc_chttp2_list_add_written_stream(transport_writing,
-                                              stream_writing);
-        }
+    }
+    /* send trailing metadata if it's available and we're ready for it */
+    if (stream_writing->send_message == NULL &&
+        stream_writing->flow_controlled_buffer.length == 0 &&
+        stream_writing->send_trailing_metadata != NULL) {
+      if (grpc_metadata_batch_is_empty(
+              stream_writing->send_trailing_metadata)) {
+        grpc_chttp2_encode_data(
+            stream_writing->id, &stream_writing->flow_controlled_buffer, 0, 1,
+            &stream_writing->stats, &transport_writing->outbuf);
+      } else {
+        grpc_chttp2_encode_header(
+            &transport_writing->hpack_compressor, stream_writing->id,
+            stream_writing->send_trailing_metadata, 1, &stream_writing->stats,
+            &transport_writing->outbuf);
+      }
+      if (!transport_writing->is_client && !stream_writing->read_closed) {
+        gpr_slice_buffer_add(&transport_writing->outbuf,
+                             grpc_chttp2_rst_stream_create(
+                                 stream_writing->id, GRPC_CHTTP2_NO_ERROR,
+                                 &stream_writing->stats));
+      }
+      stream_writing->send_trailing_metadata = NULL;
+      stream_writing->sent_trailing_metadata = 1;
+    }
+    /* if there's more to write, then loop, otherwise prepare to finish the
+     * write */
+    if ((stream_writing->flow_controlled_buffer.length > 0 ||
+         (stream_writing->send_message && !stream_writing->fetching)) &&
+        stream_writing->outgoing_window > 0) {
+      if (transport_writing->outgoing_window > 0) {
+        grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing);
       } else {
+        grpc_chttp2_list_add_writing_stalled_by_transport(transport_writing,
+                                                          stream_writing);
         grpc_chttp2_list_add_written_stream(transport_writing, stream_writing);
       }
+    } else {
+      grpc_chttp2_list_add_written_stream(transport_writing, stream_writing);
     }
+  }
+
+  GPR_TIMER_END("finalize_outbuf", 0);
+}
 
-    GPR_TIMER_END("finalize_outbuf", 0);
+void grpc_chttp2_cleanup_writing(
+    grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
+    grpc_chttp2_transport_writing *transport_writing) {
+  GPR_TIMER_BEGIN("grpc_chttp2_cleanup_writing", 0);
+  grpc_chttp2_stream_writing *stream_writing;
+  grpc_chttp2_stream_global *stream_global;
+
+  if (grpc_chttp2_list_flush_writing_stalled_by_transport(exec_ctx,
+                                                          transport_writing)) {
+    grpc_chttp2_initiate_write(exec_ctx, transport_global, false,
+                               "resume_stalled_stream");
   }
 
-  void grpc_chttp2_cleanup_writing(
-      grpc_exec_ctx * exec_ctx, grpc_chttp2_transport_global * transport_global,
-      grpc_chttp2_transport_writing * transport_writing) {
-    GPR_TIMER_BEGIN("grpc_chttp2_cleanup_writing", 0);
-    grpc_chttp2_stream_writing *stream_writing;
-    grpc_chttp2_stream_global *stream_global;
-
-    if (grpc_chttp2_list_flush_writing_stalled_by_transport(
-            exec_ctx, transport_writing)) {
-      grpc_chttp2_initiate_write(exec_ctx, transport_global, false,
-                                 "resume_stalled_stream");
+  while (grpc_chttp2_list_pop_written_stream(
+      transport_global, transport_writing, &stream_global, &stream_writing)) {
+    if (stream_writing->sent_initial_metadata) {
+      grpc_chttp2_complete_closure_step(
+          exec_ctx, transport_global, stream_global,
+          &stream_global->send_initial_metadata_finished, GRPC_ERROR_NONE);
     }
-
-    while (grpc_chttp2_list_pop_written_stream(
-        transport_global, transport_writing, &stream_global, &stream_writing)) {
-      if (stream_writing->sent_initial_metadata) {
-        grpc_chttp2_complete_closure_step(
-            exec_ctx, transport_global, stream_global,
-            &stream_global->send_initial_metadata_finished, GRPC_ERROR_NONE);
-      }
-      grpc_transport_move_one_way_stats(&stream_writing->stats,
-                                        &stream_global->stats.outgoing);
-      if (stream_writing->sent_message) {
-        GPR_ASSERT(stream_writing->send_message == NULL);
-        grpc_chttp2_complete_closure_step(
-            exec_ctx, transport_global, stream_global,
-            &stream_global->send_message_finished, GRPC_ERROR_NONE);
-        stream_writing->sent_message = 0;
-      }
-      if (stream_writing->sent_trailing_metadata) {
-        grpc_chttp2_complete_closure_step(
-            exec_ctx, transport_global, stream_global,
-            &stream_global->send_trailing_metadata_finished, GRPC_ERROR_NONE);
-      }
-      if (stream_writing->sent_trailing_metadata) {
-        grpc_chttp2_mark_stream_closed(
-            exec_ctx, transport_global, stream_global,
-            !transport_global->is_client, 1, GRPC_ERROR_NONE);
-      }
-      GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2_writing");
+    grpc_transport_move_one_way_stats(&stream_writing->stats,
+                                      &stream_global->stats.outgoing);
+    if (stream_writing->sent_message) {
+      GPR_ASSERT(stream_writing->send_message == NULL);
+      grpc_chttp2_complete_closure_step(
+          exec_ctx, transport_global, stream_global,
+          &stream_global->send_message_finished, GRPC_ERROR_NONE);
+      stream_writing->sent_message = 0;
+    }
+    if (stream_writing->sent_trailing_metadata) {
+      grpc_chttp2_complete_closure_step(
+          exec_ctx, transport_global, stream_global,
+          &stream_global->send_trailing_metadata_finished, GRPC_ERROR_NONE);
     }
-    gpr_slice_buffer_reset_and_unref(&transport_writing->outbuf);
-    GPR_TIMER_END("grpc_chttp2_cleanup_writing", 0);
+    if (stream_writing->sent_trailing_metadata) {
+      grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global,
+                                     !transport_global->is_client, 1,
+                                     GRPC_ERROR_NONE);
+    }
+    GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2_writing");
   }
+  gpr_slice_buffer_reset_and_unref(&transport_writing->outbuf);
+  GPR_TIMER_END("grpc_chttp2_cleanup_writing", 0);
+}
-- 
GitLab