diff --git a/src/core/ext/census/grpc_filter.c b/src/core/ext/census/grpc_filter.c
index abfb3bb5f0e16699efbc3a0253d0dda4798d12ed..5e278ef127e5520565381f9bd8c4dac03d31bbe5 100644
--- a/src/core/ext/census/grpc_filter.c
+++ b/src/core/ext/census/grpc_filter.c
@@ -134,7 +134,7 @@ static void client_init_call_elem(grpc_exec_ctx *exec_ctx,
 }
 
 static void client_destroy_call_elem(grpc_exec_ctx *exec_ctx,
-                                     grpc_call_element *elem) {
+                                     grpc_call_element *elem, void *ignored) {
   call_data *d = elem->call_data;
   GPR_ASSERT(d != NULL);
   /* TODO(hongyu): record rpc client stats and census_rpc_end_op here */
@@ -152,7 +152,7 @@ static void server_init_call_elem(grpc_exec_ctx *exec_ctx,
 }
 
 static void server_destroy_call_elem(grpc_exec_ctx *exec_ctx,
-                                     grpc_call_element *elem) {
+                                     grpc_call_element *elem, void *ignored) {
   call_data *d = elem->call_data;
   GPR_ASSERT(d != NULL);
   /* TODO(hongyu): record rpc server stats and census_tracing_end_op here */
diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c
index 8a98a6bcbeec94f84284c06dc33064f108d9e32d..9b5a078aece23254929ab4db1d118ab146b13af1 100644
--- a/src/core/ext/client_config/client_channel.c
+++ b/src/core/ext/client_config/client_channel.c
@@ -415,9 +415,10 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
 }
 
 /* Destructor for call_data */
-static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
-                              grpc_call_element *elem) {
+static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+                              void *and_free_memory) {
   grpc_subchannel_call_holder_destroy(exec_ctx, elem->call_data);
+  gpr_free(and_free_memory);
 }
 
 /* Constructor for channel_data */
diff --git a/src/core/ext/client_config/subchannel.c b/src/core/ext/client_config/subchannel.c
index c925c28c67b8b602ae4e8e251b006ac6d446db1d..3a5af9f53d4b65f1e6ba6e53087eb86fccb9eeeb 100644
--- a/src/core/ext/client_config/subchannel.c
+++ b/src/core/ext/client_config/subchannel.c
@@ -644,9 +644,9 @@ static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call,
                                     bool success) {
   grpc_subchannel_call *c = call;
   GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0);
-  grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c));
-  GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, c->connection, "subchannel_call");
-  gpr_free(c);
+  grpc_connected_subchannel *connection = c->connection;
+  grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), c);
+  GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, connection, "subchannel_call");
   GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0);
 }
 
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 01507f5ca6f6c0e906fee1c27422bdd58df3d37a..fcf2abfe66ebaf30cc585ae28483e01bdefafa56 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -81,27 +81,25 @@ int grpc_flowctl_trace = 0;
 
 static const grpc_transport_vtable vtable;
 
-static void lock(grpc_chttp2_transport *t);
-static void unlock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t);
-
 /* forward declarations of various callbacks that we'll build closures around */
 static void writing_action(grpc_exec_ctx *exec_ctx, void *t,
                            bool iomgr_success_ignored);
+static void reading_action(grpc_exec_ctx *exec_ctx, void *t,
+                           bool iomgr_success_ignored);
+static void parsing_action(grpc_exec_ctx *exec_ctx, void *t,
+                           bool iomgr_success_ignored);
 
 /** Set a transport level setting, and push it to our peer */
 static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
                          uint32_t value);
 
-/** Endpoint callback to process incoming data */
-static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, bool success);
-
 /** Start disconnection chain */
 static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t);
 
 /** Perform a transport_op */
-static void perform_stream_op_locked(
-    grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
-    grpc_chttp2_stream_global *stream_global, grpc_transport_stream_op *op);
+static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
+                                     grpc_chttp2_transport *t,
+                                     grpc_chttp2_stream *s, void *transport_op);
 
 /** Cancel a stream: coming from the transport API */
 static void cancel_from_api(grpc_exec_ctx *exec_ctx,
@@ -118,15 +116,19 @@ static void close_from_api(grpc_exec_ctx *exec_ctx,
 /** Add endpoint from this transport to pollset */
 static void add_to_pollset_locked(grpc_exec_ctx *exec_ctx,
                                   grpc_chttp2_transport *t,
-                                  grpc_pollset *pollset);
+                                  grpc_chttp2_stream *s_ignored, void *pollset);
 static void add_to_pollset_set_locked(grpc_exec_ctx *exec_ctx,
                                       grpc_chttp2_transport *t,
-                                      grpc_pollset_set *pollset_set);
+                                      grpc_chttp2_stream *s_ignored,
+                                      void *pollset_set);
 
 /** Start new streams that have been created if we can */
 static void maybe_start_some_streams(
     grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global);
 
+static void finish_global_actions(grpc_exec_ctx *exec_ctx,
+                                  grpc_chttp2_transport *t);
+
 static void connectivity_state_set(
     grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
     grpc_connectivity_state state, const char *reason);
@@ -138,7 +140,10 @@ static void incoming_byte_stream_update_flow_control(
     grpc_chttp2_transport_global *transport_global,
     grpc_chttp2_stream_global *stream_global, size_t max_size_hint,
     size_t have_already);
-
+static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
+                                                grpc_chttp2_transport *t,
+                                                grpc_chttp2_stream *s,
+                                                void *byte_stream);
 static void fail_pending_writes(grpc_exec_ctx *exec_ctx,
                                 grpc_chttp2_stream_global *stream_global);
 
@@ -150,7 +155,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
                                grpc_chttp2_transport *t) {
   size_t i;
 
-  gpr_mu_lock(&t->mu);
+  gpr_mu_lock(&t->executor.mu);
 
   GPR_ASSERT(t->ep == NULL);
 
@@ -176,8 +181,8 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
   grpc_chttp2_stream_map_destroy(&t->new_stream_map);
   grpc_connectivity_state_destroy(exec_ctx, &t->channel_callback.state_tracker);
 
-  gpr_mu_unlock(&t->mu);
-  gpr_mu_destroy(&t->mu);
+  gpr_mu_unlock(&t->executor.mu);
+  gpr_mu_destroy(&t->executor.mu);
 
   /* callback remaining pings: they're not allowed to call into the transpot,
      and maybe they hold resources that need to be freed */
@@ -238,7 +243,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
   gpr_ref_init(&t->refs, 2);
   /* ref is dropped at transport close() */
   gpr_ref_init(&t->shutdown_ep_refs, 1);
-  gpr_mu_init(&t->mu);
+  gpr_mu_init(&t->executor.mu);
   t->peer_string = grpc_endpoint_get_peer(ep);
   t->endpoint_reading = 1;
   t->global.next_stream_id = is_client ? 1 : 2;
@@ -262,6 +267,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
   gpr_slice_buffer_init(&t->writing.outbuf);
   grpc_chttp2_hpack_compressor_init(&t->writing.hpack_compressor);
   grpc_closure_init(&t->writing_action, writing_action, t);
+  grpc_closure_init(&t->reading_action, reading_action, t);
+  grpc_closure_init(&t->parsing_action, parsing_action, t);
 
   gpr_slice_buffer_init(&t->parsing.qbuf);
   grpc_chttp2_goaway_parser_init(&t->parsing.goaway_parser);
@@ -269,7 +276,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
 
   grpc_closure_init(&t->writing.done_cb, grpc_chttp2_terminate_writing,
                     &t->writing);
-  grpc_closure_init(&t->recv_data, recv_data, t);
   gpr_slice_buffer_init(&t->read_buffer);
 
   if (is_client) {
@@ -377,14 +383,18 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
   }
 }
 
-static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
-  grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
-
-  lock(t);
+static void destroy_transport_locked(grpc_exec_ctx *exec_ctx,
+                                     grpc_chttp2_transport *t,
+                                     grpc_chttp2_stream *s_ignored,
+                                     void *arg_ignored) {
   t->destroying = 1;
   drop_connection(exec_ctx, t);
-  unlock(exec_ctx, t);
+}
 
+static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
+  grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
+  grpc_chttp2_run_with_global_lock(exec_ctx, t, NULL, destroy_transport_locked,
+                                   NULL, 0);
   UNREF_TRANSPORT(exec_ctx, t, "destroy");
 }
 
@@ -404,17 +414,6 @@ static void allow_endpoint_shutdown_locked(grpc_exec_ctx *exec_ctx,
   }
 }
 
-static void allow_endpoint_shutdown_unlocked(grpc_exec_ctx *exec_ctx,
-                                             grpc_chttp2_transport *t) {
-  if (gpr_unref(&t->shutdown_ep_refs)) {
-    gpr_mu_lock(&t->mu);
-    if (t->ep) {
-      grpc_endpoint_shutdown(exec_ctx, t->ep);
-    }
-    gpr_mu_unlock(&t->mu);
-  }
-}
-
 static void destroy_endpoint(grpc_exec_ctx *exec_ctx,
                              grpc_chttp2_transport *t) {
   grpc_endpoint_destroy(exec_ctx, t->ep);
@@ -424,7 +423,9 @@ static void destroy_endpoint(grpc_exec_ctx *exec_ctx,
 }
 
 static void close_transport_locked(grpc_exec_ctx *exec_ctx,
-                                   grpc_chttp2_transport *t) {
+                                   grpc_chttp2_transport *t,
+                                   grpc_chttp2_stream *s_ignored,
+                                   void *arg_ignored) {
   if (!t->closed) {
     t->closed = 1;
     connectivity_state_set(exec_ctx, &t->global, GRPC_CHANNEL_FATAL_FAILURE,
@@ -464,6 +465,13 @@ void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx,
 }
 #endif
 
+static void finish_init_stream_locked(grpc_exec_ctx *exec_ctx,
+                                      grpc_chttp2_transport *t,
+                                      grpc_chttp2_stream *s,
+                                      void *arg_ignored) {
+  grpc_chttp2_register_stream(t, s);
+}
+
 static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
                        grpc_stream *gs, grpc_stream_refcount *refcount,
                        const void *server_data) {
@@ -473,6 +481,10 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
   memset(s, 0, sizeof(*s));
 
   s->refcount = refcount;
+  /* We reserve one 'active stream' that's dropped when the stream is
+     read-closed. The others are for incoming_byte_streams that are actively
+     reading */
+  gpr_ref_init(&s->global.active_streams, 1);
   GRPC_CHTTP2_STREAM_REF(&s->global, "chttp2");
 
   grpc_chttp2_incoming_metadata_buffer_init(&s->parsing.metadata_buffer[0]);
@@ -486,10 +498,8 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
 
   REF_TRANSPORT(t, "stream");
 
-  lock(t);
-  grpc_chttp2_register_stream(t, s);
   if (server_data) {
-    GPR_ASSERT(t->parsing_active);
+    GPR_ASSERT(t->executor.parsing_active);
     s->global.id = (uint32_t)(uintptr_t)server_data;
     s->parsing.id = s->global.id;
     s->global.outgoing_window =
@@ -502,40 +512,42 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
     grpc_chttp2_stream_map_add(&t->parsing_stream_map, s->global.id, s);
     s->global.in_stream_map = 1;
   }
-  unlock(exec_ctx, t);
+
+  grpc_chttp2_run_with_global_lock(exec_ctx, t, s, finish_init_stream_locked,
+                                   NULL, 0);
 
   return 0;
 }
 
-static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
-                           grpc_stream *gs) {
-  grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
-  grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
-  int i;
+static void destroy_stream_locked(grpc_exec_ctx *exec_ctx,
+                                  grpc_chttp2_transport *t,
+                                  grpc_chttp2_stream *s, void *arg) {
   grpc_byte_stream *bs;
 
   GPR_TIMER_BEGIN("destroy_stream", 0);
 
-  gpr_mu_lock(&t->mu);
-
   GPR_ASSERT((s->global.write_closed && s->global.read_closed) ||
              s->global.id == 0);
   GPR_ASSERT(!s->global.in_stream_map);
   if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) {
-    close_transport_locked(exec_ctx, t);
+    close_transport_locked(exec_ctx, t, NULL, NULL);
   }
-  if (!t->parsing_active && s->global.id) {
+  if (!t->executor.parsing_active && s->global.id) {
     GPR_ASSERT(grpc_chttp2_stream_map_find(&t->parsing_stream_map,
                                            s->global.id) == NULL);
   }
 
+  while (
+      (bs = grpc_chttp2_incoming_frame_queue_pop(&s->global.incoming_frames))) {
+    incoming_byte_stream_destroy_locked(exec_ctx, NULL, NULL, bs);
+  }
+
   grpc_chttp2_list_remove_unannounced_incoming_window_available(&t->global,
                                                                 &s->global);
   grpc_chttp2_list_remove_stalled_by_transport(&t->global, &s->global);
+  grpc_chttp2_list_remove_check_read_ops(&t->global, &s->global);
 
-  gpr_mu_unlock(&t->mu);
-
-  for (i = 0; i < STREAM_LIST_COUNT; i++) {
+  for (int i = 0; i < STREAM_LIST_COUNT; i++) {
     if (s->included[i]) {
       gpr_log(GPR_ERROR, "%s stream %d still included in list %d",
               t->global.is_client ? "client" : "server", s->global.id, i);
@@ -543,11 +555,6 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
     }
   }
 
-  while (
-      (bs = grpc_chttp2_incoming_frame_queue_pop(&s->global.incoming_frames))) {
-    grpc_byte_stream_destroy(exec_ctx, bs);
-  }
-
   GPR_ASSERT(s->global.send_initial_metadata_finished == NULL);
   GPR_ASSERT(s->global.send_message_finished == NULL);
   GPR_ASSERT(s->global.send_trailing_metadata_finished == NULL);
@@ -566,6 +573,17 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
   UNREF_TRANSPORT(exec_ctx, t, "stream");
 
   GPR_TIMER_END("destroy_stream", 0);
+
+  gpr_free(arg);
+}
+
+static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
+                           grpc_stream *gs, void *and_free_memory) {
+  grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
+  grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
+
+  grpc_chttp2_run_with_global_lock(exec_ctx, t, s, destroy_stream_locked,
+                                   and_free_memory, 0);
 }
 
 grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream(
@@ -594,28 +612,89 @@ grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream(
  * LOCK MANAGEMENT
  */
 
-/* We take a grpc_chttp2_transport-global lock in response to calls coming in
-   from above,
-   and in response to data being received from below. New data to be written
-   is always queued, as are callbacks to process data. During unlock() we
-   check our todo lists and initiate callbacks and flush writes. */
-
-static void lock(grpc_chttp2_transport *t) { gpr_mu_lock(&t->mu); }
-
-static void unlock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
-  GPR_TIMER_BEGIN("unlock", 0);
-  if (!t->writing_active && !t->closed &&
-      grpc_chttp2_unlocking_check_writes(exec_ctx, &t->global, &t->writing,
-                                         t->parsing_active)) {
-    t->writing_active = 1;
-    REF_TRANSPORT(t, "writing");
-    grpc_exec_ctx_enqueue(exec_ctx, &t->writing_action, true, NULL);
-    prevent_endpoint_shutdown(t);
+static void finish_global_actions(grpc_exec_ctx *exec_ctx,
+                                  grpc_chttp2_transport *t) {
+  grpc_chttp2_executor_action_header *hdr;
+  grpc_chttp2_executor_action_header *next;
+
+  for (;;) {
+    if (!t->executor.writing_active && !t->closed &&
+        grpc_chttp2_unlocking_check_writes(exec_ctx, &t->global, &t->writing,
+                                           t->executor.parsing_active)) {
+      t->executor.writing_active = 1;
+      REF_TRANSPORT(t, "writing");
+      prevent_endpoint_shutdown(t);
+      grpc_exec_ctx_enqueue(exec_ctx, &t->writing_action, true, NULL);
+    }
+    check_read_ops(exec_ctx, &t->global);
+
+    gpr_mu_lock(&t->executor.mu);
+    if (t->executor.pending_actions != NULL) {
+      hdr = t->executor.pending_actions;
+      t->executor.pending_actions = NULL;
+      gpr_mu_unlock(&t->executor.mu);
+      while (hdr != NULL) {
+        hdr->action(exec_ctx, t, hdr->stream, hdr->arg);
+        next = hdr->next;
+        gpr_free(hdr);
+        UNREF_TRANSPORT(exec_ctx, t, "pending_action");
+        hdr = next;
+      }
+      continue;
+    } else {
+      t->executor.global_active = false;
+    }
+    gpr_mu_unlock(&t->executor.mu);
+    break;
+  }
+}
+
+void grpc_chttp2_run_with_global_lock(grpc_exec_ctx *exec_ctx,
+                                      grpc_chttp2_transport *t,
+                                      grpc_chttp2_stream *optional_stream,
+                                      grpc_chttp2_locked_action action,
+                                      void *arg, size_t sizeof_arg) {
+  grpc_chttp2_executor_action_header *hdr;
+
+  REF_TRANSPORT(t, "run_global");
+  gpr_mu_lock(&t->executor.mu);
+
+  for (;;) {
+    if (!t->executor.global_active) {
+      t->executor.global_active = 1;
+      gpr_mu_unlock(&t->executor.mu);
+
+      action(exec_ctx, t, optional_stream, arg);
+
+      finish_global_actions(exec_ctx, t);
+    } else {
+      gpr_mu_unlock(&t->executor.mu);
+
+      hdr = gpr_malloc(sizeof(*hdr) + sizeof_arg);
+      hdr->stream = optional_stream;
+      hdr->action = action;
+      if (sizeof_arg == 0) {
+        hdr->arg = arg;
+      } else {
+        hdr->arg = hdr + 1;
+        memcpy(hdr->arg, arg, sizeof_arg);
+      }
+
+      gpr_mu_lock(&t->executor.mu);
+      if (!t->executor.global_active) {
+        /* global lock was released while allocating memory: release & retry */
+        gpr_free(hdr);
+        continue;
+      }
+      hdr->next = t->executor.pending_actions;
+      t->executor.pending_actions = hdr;
+      REF_TRANSPORT(t, "pending_action");
+      gpr_mu_unlock(&t->executor.mu);
+    }
+    break;
   }
-  check_read_ops(exec_ctx, &t->global);
 
-  gpr_mu_unlock(&t->mu);
-  GPR_TIMER_END("unlock", 0);
+  UNREF_TRANSPORT(exec_ctx, t, "run_global");
 }
 
 /*******************************************************************************
@@ -645,15 +724,11 @@ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
   }
 }
 
-void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx,
-                                   void *transport_writing_ptr, bool success) {
-  grpc_chttp2_transport_writing *transport_writing = transport_writing_ptr;
-  grpc_chttp2_transport *t = TRANSPORT_FROM_WRITING(transport_writing);
-  grpc_chttp2_stream_global *stream_global;
-
-  GPR_TIMER_BEGIN("grpc_chttp2_terminate_writing", 0);
-
-  lock(t);
+static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx,
+                                        grpc_chttp2_transport *t,
+                                        grpc_chttp2_stream *s_ignored,
+                                        void *a) {
+  bool success = (bool)(uintptr_t)a;
 
   allow_endpoint_shutdown_locked(exec_ctx, t);
 
@@ -663,24 +738,30 @@ void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx,
 
   grpc_chttp2_cleanup_writing(exec_ctx, &t->global, &t->writing);
 
+  grpc_chttp2_stream_global *stream_global;
   while (grpc_chttp2_list_pop_closed_waiting_for_writing(&t->global,
                                                          &stream_global)) {
     fail_pending_writes(exec_ctx, stream_global);
     GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "finish_writes");
   }
 
-  /* leave the writing flag up on shutdown to prevent further writes in unlock()
+  /* leave the writing flag up on shutdown to prevent further writes in
+     unlock()
      from starting */
-  t->writing_active = 0;
+  t->executor.writing_active = 0;
   if (t->ep && !t->endpoint_reading) {
     destroy_endpoint(exec_ctx, t);
   }
 
-  unlock(exec_ctx, t);
-
   UNREF_TRANSPORT(exec_ctx, t, "writing");
+}
 
-  GPR_TIMER_END("grpc_chttp2_terminate_writing", 0);
+void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx,
+                                   void *transport_writing, bool success) {
+  grpc_chttp2_transport *t = TRANSPORT_FROM_WRITING(transport_writing);
+  grpc_chttp2_run_with_global_lock(exec_ctx, t, NULL,
+                                   terminate_writing_with_lock,
+                                   (void *)(uintptr_t)success, 0);
 }
 
 static void writing_action(grpc_exec_ctx *exec_ctx, void *gt,
@@ -806,14 +887,16 @@ static int contains_non_ok_status(
 
 static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, bool success) {}
 
-static void perform_stream_op_locked(
-    grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
-    grpc_chttp2_stream_global *stream_global, grpc_transport_stream_op *op) {
-  grpc_closure *on_complete;
-
+static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
+                                     grpc_chttp2_transport *t,
+                                     grpc_chttp2_stream *s, void *stream_op) {
   GPR_TIMER_BEGIN("perform_stream_op_locked", 0);
 
-  on_complete = op->on_complete;
+  grpc_transport_stream_op *op = stream_op;
+  grpc_chttp2_transport_global *transport_global = &t->global;
+  grpc_chttp2_stream_global *stream_global = &s->global;
+
+  grpc_closure *on_complete = op->on_complete;
   if (on_complete == NULL) {
     on_complete = grpc_closure_create(do_nothing, NULL);
   }
@@ -938,10 +1021,8 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
                               grpc_stream *gs, grpc_transport_stream_op *op) {
   grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
   grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
-
-  lock(t);
-  perform_stream_op_locked(exec_ctx, &t->global, &s->global, op);
-  unlock(exec_ctx, t);
+  grpc_chttp2_run_with_global_lock(exec_ctx, t, s, perform_stream_op_locked, op,
+                                   sizeof(*op));
 }
 
 static void send_ping_locked(grpc_chttp2_transport *t, grpc_closure *on_recv) {
@@ -961,13 +1042,10 @@ static void send_ping_locked(grpc_chttp2_transport *t, grpc_closure *on_recv) {
   gpr_slice_buffer_add(&t->global.qbuf, grpc_chttp2_ping_create(0, p->id));
 }
 
-void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx,
-                          grpc_chttp2_transport_parsing *transport_parsing,
-                          const uint8_t *opaque_8bytes) {
+static void ack_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+                            grpc_chttp2_stream *s, void *opaque_8bytes) {
   grpc_chttp2_outstanding_ping *ping;
-  grpc_chttp2_transport *t = TRANSPORT_FROM_PARSING(transport_parsing);
   grpc_chttp2_transport_global *transport_global = &t->global;
-  lock(t);
   for (ping = transport_global->pings.next; ping != &transport_global->pings;
        ping = ping->next) {
     if (0 == memcmp(opaque_8bytes, ping->id, 8)) {
@@ -978,13 +1056,31 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx,
       break;
     }
   }
-  unlock(exec_ctx, t);
+}
+
+void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx,
+                          grpc_chttp2_transport_parsing *transport_parsing,
+                          const uint8_t *opaque_8bytes) {
+  grpc_chttp2_run_with_global_lock(
+      exec_ctx, TRANSPORT_FROM_PARSING(transport_parsing), NULL,
+      ack_ping_locked, (void *)opaque_8bytes, 8);
 }
 
 static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
                                         grpc_chttp2_transport *t,
-                                        grpc_transport_op *op) {
-  bool close_transport = false;
+                                        grpc_chttp2_stream *s_unused,
+                                        void *stream_op) {
+  grpc_transport_op *op = stream_op;
+  bool close_transport = op->disconnect;
+
+  /* If there's a set_accept_stream ensure that we're not parsing
+     to avoid changing things out from underneath */
+  if (t->executor.parsing_active && op->set_accept_stream) {
+    GPR_ASSERT(t->post_parsing_op == NULL);
+    t->post_parsing_op = gpr_malloc(sizeof(*op));
+    memcpy(t->post_parsing_op, op, sizeof(*op));
+    return;
+  }
 
   grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, true, NULL);
 
@@ -1010,47 +1106,31 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
   }
 
   if (op->bind_pollset) {
-    add_to_pollset_locked(exec_ctx, t, op->bind_pollset);
+    add_to_pollset_locked(exec_ctx, t, NULL, op->bind_pollset);
   }
 
   if (op->bind_pollset_set) {
-    add_to_pollset_set_locked(exec_ctx, t, op->bind_pollset_set);
+    add_to_pollset_set_locked(exec_ctx, t, NULL, op->bind_pollset_set);
   }
 
   if (op->send_ping) {
     send_ping_locked(t, op->send_ping);
   }
 
-  if (op->disconnect) {
-    close_transport_locked(exec_ctx, t);
-  }
-
   if (close_transport) {
-    close_transport_locked(exec_ctx, t);
+    close_transport_locked(exec_ctx, t, NULL, NULL);
   }
 }
 
 static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
                                  grpc_transport_op *op) {
   grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
-
-  lock(t);
-
-  /* If there's a set_accept_stream ensure that we're not parsing
-     to avoid changing things out from underneath */
-  if (t->parsing_active && op->set_accept_stream) {
-    GPR_ASSERT(t->post_parsing_op == NULL);
-    t->post_parsing_op = gpr_malloc(sizeof(*op));
-    memcpy(t->post_parsing_op, op, sizeof(*op));
-  } else {
-    perform_transport_op_locked(exec_ctx, t, op);
-  }
-
-  unlock(exec_ctx, t);
+  grpc_chttp2_run_with_global_lock(
+      exec_ctx, t, NULL, perform_transport_op_locked, op, sizeof(*op));
 }
 
 /*******************************************************************************
- * INPUT PROCESSING
+ * INPUT PROCESSING - GENERAL
  */
 
 static void check_read_ops(grpc_exec_ctx *exec_ctx,
@@ -1072,7 +1152,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
       while (stream_global->seen_error &&
              (bs = grpc_chttp2_incoming_frame_queue_pop(
                   &stream_global->incoming_frames)) != NULL) {
-        grpc_byte_stream_destroy(exec_ctx, bs);
+        incoming_byte_stream_destroy_locked(exec_ctx, NULL, NULL, bs);
       }
       if (stream_global->incoming_frames.head != NULL) {
         *stream_global->recv_message = grpc_chttp2_incoming_frame_queue_pop(
@@ -1093,9 +1173,9 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
       while (stream_global->seen_error &&
              (bs = grpc_chttp2_incoming_frame_queue_pop(
                   &stream_global->incoming_frames)) != NULL) {
-        grpc_byte_stream_destroy(exec_ctx, bs);
+        incoming_byte_stream_destroy_locked(exec_ctx, NULL, NULL, bs);
       }
-      if (stream_global->incoming_frames.head == NULL) {
+      if (stream_global->all_incoming_byte_streams_finished) {
         grpc_chttp2_incoming_metadata_buffer_publish(
             &stream_global->received_trailing_metadata,
             stream_global->recv_trailing_metadata);
@@ -1107,6 +1187,15 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
   }
 }
 
+static void decrement_active_streams_locked(
+    grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
+    grpc_chttp2_stream_global *stream_global) {
+  if ((stream_global->all_incoming_byte_streams_finished =
+           gpr_unref(&stream_global->active_streams))) {
+    grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
+  }
+}
+
 static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
                           uint32_t id) {
   size_t new_stream_count;
@@ -1128,7 +1217,7 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
   }
 
   if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) {
-    close_transport_locked(exec_ctx, t);
+    close_transport_locked(exec_ctx, t, NULL, NULL);
   }
   if (grpc_chttp2_list_remove_writable_stream(&t->global, &s->global)) {
     GRPC_CHTTP2_STREAM_UNREF(exec_ctx, &s->global, "chttp2_writing");
@@ -1229,10 +1318,11 @@ void grpc_chttp2_mark_stream_closed(
     stream_global->read_closed = 1;
     stream_global->published_initial_metadata = 1;
     stream_global->published_trailing_metadata = 1;
+    decrement_active_streams_locked(exec_ctx, transport_global, stream_global);
   }
   if (close_writes && !stream_global->write_closed) {
     stream_global->write_closed = 1;
-    if (TRANSPORT_FROM_GLOBAL(transport_global)->writing_active) {
+    if (TRANSPORT_FROM_GLOBAL(transport_global)->executor.writing_active) {
       GRPC_CHTTP2_STREAM_REF(stream_global, "finish_writes");
       grpc_chttp2_list_add_closed_waiting_for_writing(transport_global,
                                                       stream_global);
@@ -1242,7 +1332,7 @@ void grpc_chttp2_mark_stream_closed(
   }
   if (stream_global->read_closed && stream_global->write_closed) {
     if (stream_global->id != 0 &&
-        TRANSPORT_FROM_GLOBAL(transport_global)->parsing_active) {
+        TRANSPORT_FROM_GLOBAL(transport_global)->executor.parsing_active) {
       grpc_chttp2_list_add_closed_waiting_for_parsing(transport_global,
                                                       stream_global);
     } else {
@@ -1374,7 +1464,7 @@ static void end_all_the_calls(grpc_exec_ctx *exec_ctx,
 }
 
 static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
-  close_transport_locked(exec_ctx, t);
+  close_transport_locked(exec_ctx, t, NULL, NULL);
   end_all_the_calls(exec_ctx, t);
 }
 
@@ -1398,102 +1488,136 @@ static void update_global_window(void *args, uint32_t id, void *stream) {
   }
 }
 
-static void read_error_locked(grpc_exec_ctx *exec_ctx,
-                              grpc_chttp2_transport *t) {
-  t->endpoint_reading = 0;
-  if (!t->writing_active && t->ep) {
-    destroy_endpoint(exec_ctx, t);
-  }
-}
+/*******************************************************************************
+ * INPUT PROCESSING - PARSING
+ */
 
-/* tcp read callback */
-static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, bool success) {
-  size_t i;
-  int keep_reading = 0;
-  grpc_chttp2_transport *t = tp;
+static void reading_action_locked(grpc_exec_ctx *exec_ctx,
+                                  grpc_chttp2_transport *t,
+                                  grpc_chttp2_stream *s_unused, void *arg);
+static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg, bool success);
+static void post_reading_action_locked(grpc_exec_ctx *exec_ctx,
+                                       grpc_chttp2_transport *t,
+                                       grpc_chttp2_stream *s_unused, void *arg);
+static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+                              grpc_chttp2_stream *s_unused, void *arg);
+
+static void reading_action(grpc_exec_ctx *exec_ctx, void *tp, bool success) {
+  /* Control flow:
+     reading_action_locked ->
+       (parse_unlocked -> post_parse_locked)? ->
+       post_reading_action_locked */
+  grpc_chttp2_run_with_global_lock(exec_ctx, tp, NULL, reading_action_locked,
+                                   (void *)(uintptr_t)success, 0);
+}
+
+static void reading_action_locked(grpc_exec_ctx *exec_ctx,
+                                  grpc_chttp2_transport *t,
+                                  grpc_chttp2_stream *s_unused, void *arg) {
   grpc_chttp2_transport_global *transport_global = &t->global;
   grpc_chttp2_transport_parsing *transport_parsing = &t->parsing;
-  grpc_chttp2_stream_global *stream_global;
+  bool success = (bool)(uintptr_t)arg;
 
-  GPR_TIMER_BEGIN("recv_data", 0);
-
-  lock(t);
-  i = 0;
-  GPR_ASSERT(!t->parsing_active);
+  GPR_ASSERT(!t->executor.parsing_active);
   if (!t->closed) {
-    t->parsing_active = 1;
+    t->executor.parsing_active = 1;
     /* merge stream lists */
     grpc_chttp2_stream_map_move_into(&t->new_stream_map,
                                      &t->parsing_stream_map);
     grpc_chttp2_prepare_to_read(transport_global, transport_parsing);
-    gpr_mu_unlock(&t->mu);
-    GPR_TIMER_BEGIN("recv_data.parse", 0);
-    for (; i < t->read_buffer.count &&
-           grpc_chttp2_perform_read(exec_ctx, transport_parsing,
-                                    t->read_buffer.slices[i]);
-         i++)
-      ;
-    GPR_TIMER_END("recv_data.parse", 0);
-    gpr_mu_lock(&t->mu);
-    /* copy parsing qbuf to global qbuf */
-    gpr_slice_buffer_move_into(&t->parsing.qbuf, &t->global.qbuf);
-    if (i != t->read_buffer.count) {
-      unlock(exec_ctx, t);
-      lock(t);
-      drop_connection(exec_ctx, t);
-    }
-    /* merge stream lists */
-    grpc_chttp2_stream_map_move_into(&t->new_stream_map,
-                                     &t->parsing_stream_map);
-    transport_global->concurrent_stream_count =
-        (uint32_t)grpc_chttp2_stream_map_size(&t->parsing_stream_map);
-    if (transport_parsing->initial_window_update != 0) {
-      grpc_chttp2_stream_map_for_each(&t->parsing_stream_map,
-                                      update_global_window, t);
-      transport_parsing->initial_window_update = 0;
-    }
-    /* handle higher level things */
-    grpc_chttp2_publish_reads(exec_ctx, transport_global, transport_parsing);
-    t->parsing_active = 0;
-    /* handle delayed transport ops (if there is one) */
-    if (t->post_parsing_op) {
-      grpc_transport_op *op = t->post_parsing_op;
-      t->post_parsing_op = NULL;
-      perform_transport_op_locked(exec_ctx, t, op);
-      gpr_free(op);
-    }
-    /* if a stream is in the stream map, and gets cancelled, we need to ensure
-     * we are not parsing before continuing the cancellation to keep things in
-     * a sane state */
-    while (grpc_chttp2_list_pop_closed_waiting_for_parsing(transport_global,
-                                                           &stream_global)) {
-      GPR_ASSERT(stream_global->in_stream_map);
-      GPR_ASSERT(stream_global->write_closed);
-      GPR_ASSERT(stream_global->read_closed);
-      remove_stream(exec_ctx, t, stream_global->id);
-      GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2");
-    }
+    grpc_exec_ctx_enqueue(exec_ctx, &t->parsing_action, success, NULL);
+  } else {
+    post_reading_action_locked(exec_ctx, t, s_unused, arg);
+  }
+}
+
+static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
+  grpc_chttp2_transport *t = arg;
+  GPR_TIMER_BEGIN("reading_action.parse", 0);
+  size_t i = 0;
+  for (; i < t->read_buffer.count &&
+         grpc_chttp2_perform_read(exec_ctx, &t->parsing,
+                                  t->read_buffer.slices[i]);
+       i++)
+    ;
+  if (i != t->read_buffer.count) {
+    success = false;
   }
-  if (!success || i != t->read_buffer.count || t->closed) {
+  GPR_TIMER_END("reading_action.parse", 0);
+  grpc_chttp2_run_with_global_lock(exec_ctx, t, NULL, post_parse_locked,
+                                   (void *)(uintptr_t)success, 0);
+}
+
+static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+                              grpc_chttp2_stream *s_unused, void *arg) {
+  grpc_chttp2_transport_global *transport_global = &t->global;
+  grpc_chttp2_transport_parsing *transport_parsing = &t->parsing;
+  /* copy parsing qbuf to global qbuf */
+  gpr_slice_buffer_move_into(&t->parsing.qbuf, &t->global.qbuf);
+  /* merge stream lists */
+  grpc_chttp2_stream_map_move_into(&t->new_stream_map, &t->parsing_stream_map);
+  transport_global->concurrent_stream_count =
+      (uint32_t)grpc_chttp2_stream_map_size(&t->parsing_stream_map);
+  if (transport_parsing->initial_window_update != 0) {
+    grpc_chttp2_stream_map_for_each(&t->parsing_stream_map,
+                                    update_global_window, t);
+    transport_parsing->initial_window_update = 0;
+  }
+  /* handle higher level things */
+  grpc_chttp2_publish_reads(exec_ctx, transport_global, transport_parsing);
+  t->executor.parsing_active = 0;
+  /* handle delayed transport ops (if there is one) */
+  if (t->post_parsing_op) {
+    grpc_transport_op *op = t->post_parsing_op;
+    t->post_parsing_op = NULL;
+    perform_transport_op_locked(exec_ctx, t, NULL, op);
+    gpr_free(op);
+  }
+  /* if a stream is in the stream map, and gets cancelled, we need to
+   * ensure we are not parsing before continuing the cancellation to keep
+   * things in a sane state */
+  grpc_chttp2_stream_global *stream_global;
+  while (grpc_chttp2_list_pop_closed_waiting_for_parsing(transport_global,
+                                                         &stream_global)) {
+    GPR_ASSERT(stream_global->in_stream_map);
+    GPR_ASSERT(stream_global->write_closed);
+    GPR_ASSERT(stream_global->read_closed);
+    remove_stream(exec_ctx, t, stream_global->id);
+    GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2");
+  }
+
+  post_reading_action_locked(exec_ctx, t, s_unused, arg);
+}
+
+static void post_reading_action_locked(grpc_exec_ctx *exec_ctx,
+                                       grpc_chttp2_transport *t,
+                                       grpc_chttp2_stream *s_unused,
+                                       void *arg) {
+  bool success = (bool)(uintptr_t)arg;
+  bool keep_reading = false;
+  if (!success || t->closed) {
     drop_connection(exec_ctx, t);
-    read_error_locked(exec_ctx, t);
+    t->endpoint_reading = 0;
+    if (!t->executor.writing_active && t->ep) {
+      grpc_endpoint_destroy(exec_ctx, t->ep);
+      t->ep = NULL;
+      /* safe as we still have a ref for read */
+      UNREF_TRANSPORT(exec_ctx, t, "disconnect");
+    }
   } else if (!t->closed) {
-    keep_reading = 1;
+    keep_reading = true;
     REF_TRANSPORT(t, "keep_reading");
     prevent_endpoint_shutdown(t);
   }
   gpr_slice_buffer_reset_and_unref(&t->read_buffer);
-  unlock(exec_ctx, t);
 
   if (keep_reading) {
-    grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, &t->recv_data);
-    allow_endpoint_shutdown_unlocked(exec_ctx, t);
+    grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, &t->reading_action);
+    allow_endpoint_shutdown_locked(exec_ctx, t);
     UNREF_TRANSPORT(exec_ctx, t, "keep_reading");
   } else {
-    UNREF_TRANSPORT(exec_ctx, t, "recv_data");
+    UNREF_TRANSPORT(exec_ctx, t, "reading_action");
   }
-
-  GPR_TIMER_END("recv_data", 0);
 }
 
 /*******************************************************************************
@@ -1517,7 +1641,7 @@ static void connectivity_state_set(
 
 static void add_to_pollset_locked(grpc_exec_ctx *exec_ctx,
                                   grpc_chttp2_transport *t,
-                                  grpc_pollset *pollset) {
+                                  grpc_chttp2_stream *s_unused, void *pollset) {
   if (t->ep) {
     grpc_endpoint_add_to_pollset(exec_ctx, t->ep, pollset);
   }
@@ -1525,7 +1649,8 @@ static void add_to_pollset_locked(grpc_exec_ctx *exec_ctx,
 
 static void add_to_pollset_set_locked(grpc_exec_ctx *exec_ctx,
                                       grpc_chttp2_transport *t,
-                                      grpc_pollset_set *pollset_set) {
+                                      grpc_chttp2_stream *s_unused,
+                                      void *pollset_set) {
   if (t->ep) {
     grpc_endpoint_add_to_pollset_set(exec_ctx, t->ep, pollset_set);
   }
@@ -1533,16 +1658,24 @@ static void add_to_pollset_set_locked(grpc_exec_ctx *exec_ctx,
 
 static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
                         grpc_stream *gs, grpc_pollset *pollset) {
-  grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
-  lock(t);
-  add_to_pollset_locked(exec_ctx, t, pollset);
-  unlock(exec_ctx, t);
+  /* TODO(ctiller): keep pollset alive */
+  grpc_chttp2_run_with_global_lock(exec_ctx, (grpc_chttp2_transport *)gt,
+                                   (grpc_chttp2_stream *)gs,
+                                   add_to_pollset_locked, pollset, 0);
 }
 
 /*******************************************************************************
  * BYTE STREAM
  */
 
+static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx,
+                                       grpc_chttp2_incoming_byte_stream *bs) {
+  if (gpr_unref(&bs->refs)) {
+    gpr_slice_buffer_destroy(&bs->slices);
+    gpr_free(bs);
+  }
+}
+
 static void incoming_byte_stream_update_flow_control(
     grpc_chttp2_transport_global *transport_global,
     grpc_chttp2_stream_global *stream_global, size_t max_size_hint,
@@ -1583,87 +1716,146 @@ static void incoming_byte_stream_update_flow_control(
   }
 }
 
-static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx,
-                                     grpc_byte_stream *byte_stream,
-                                     gpr_slice *slice, size_t max_size_hint,
-                                     grpc_closure *on_complete) {
+typedef struct {
+  grpc_chttp2_incoming_byte_stream *byte_stream;
+  gpr_slice *slice;
+  size_t max_size_hint;
+  grpc_closure *on_complete;
+} incoming_byte_stream_next_arg;
+
+static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx,
+                                             grpc_chttp2_transport *t,
+                                             grpc_chttp2_stream *s,
+                                             void *argp) {
+  incoming_byte_stream_next_arg *arg = argp;
   grpc_chttp2_incoming_byte_stream *bs =
-      (grpc_chttp2_incoming_byte_stream *)byte_stream;
+      (grpc_chttp2_incoming_byte_stream *)arg->byte_stream;
   grpc_chttp2_transport_global *transport_global = &bs->transport->global;
   grpc_chttp2_stream_global *stream_global = &bs->stream->global;
 
-  lock(bs->transport);
   if (bs->is_tail) {
-    incoming_byte_stream_update_flow_control(transport_global, stream_global,
-                                             max_size_hint, bs->slices.length);
+    incoming_byte_stream_update_flow_control(
+        transport_global, stream_global, arg->max_size_hint, bs->slices.length);
   }
   if (bs->slices.count > 0) {
-    *slice = gpr_slice_buffer_take_first(&bs->slices);
-    unlock(exec_ctx, bs->transport);
-    return 1;
+    *arg->slice = gpr_slice_buffer_take_first(&bs->slices);
+    grpc_exec_ctx_enqueue(exec_ctx, arg->on_complete, true, NULL);
   } else if (bs->failed) {
-    grpc_exec_ctx_enqueue(exec_ctx, on_complete, false, NULL);
-    unlock(exec_ctx, bs->transport);
-    return 0;
+    grpc_exec_ctx_enqueue(exec_ctx, arg->on_complete, false, NULL);
   } else {
-    bs->on_next = on_complete;
-    bs->next = slice;
-    unlock(exec_ctx, bs->transport);
-    return 0;
+    bs->on_next = arg->on_complete;
+    bs->next = arg->slice;
   }
+  incoming_byte_stream_unref(exec_ctx, bs);
 }
 
-static void incoming_byte_stream_unref(grpc_chttp2_incoming_byte_stream *bs) {
-  if (gpr_unref(&bs->refs)) {
-    gpr_slice_buffer_destroy(&bs->slices);
-    gpr_free(bs);
-  }
+static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx,
+                                     grpc_byte_stream *byte_stream,
+                                     gpr_slice *slice, size_t max_size_hint,
+                                     grpc_closure *on_complete) {
+  grpc_chttp2_incoming_byte_stream *bs =
+      (grpc_chttp2_incoming_byte_stream *)byte_stream;
+  incoming_byte_stream_next_arg arg = {bs, slice, max_size_hint, on_complete};
+  gpr_ref(&bs->refs);
+  grpc_chttp2_run_with_global_lock(exec_ctx, bs->transport, bs->stream,
+                                   incoming_byte_stream_next_locked, &arg,
+                                   sizeof(arg));
+  return 0;
+}
+
+static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
+                                         grpc_byte_stream *byte_stream);
+
+static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
+                                                grpc_chttp2_transport *t,
+                                                grpc_chttp2_stream *s,
+                                                void *byte_stream) {
+  grpc_chttp2_incoming_byte_stream *bs = byte_stream;
+  GPR_ASSERT(bs->base.destroy == incoming_byte_stream_destroy);
+  decrement_active_streams_locked(exec_ctx, &bs->transport->global,
+                                  &bs->stream->global);
+  incoming_byte_stream_unref(exec_ctx, bs);
 }
 
 static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
                                          grpc_byte_stream *byte_stream) {
-  incoming_byte_stream_unref((grpc_chttp2_incoming_byte_stream *)byte_stream);
+  grpc_chttp2_incoming_byte_stream *bs =
+      (grpc_chttp2_incoming_byte_stream *)byte_stream;
+  grpc_chttp2_run_with_global_lock(exec_ctx, bs->transport, bs->stream,
+                                   incoming_byte_stream_destroy_locked, bs, 0);
 }
 
-void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
-                                           grpc_chttp2_incoming_byte_stream *bs,
-                                           gpr_slice slice) {
-  gpr_mu_lock(&bs->transport->mu);
+typedef struct {
+  grpc_chttp2_incoming_byte_stream *byte_stream;
+  gpr_slice slice;
+} incoming_byte_stream_push_arg;
+
+static void incoming_byte_stream_push_locked(grpc_exec_ctx *exec_ctx,
+                                             grpc_chttp2_transport *t,
+                                             grpc_chttp2_stream *s,
+                                             void *argp) {
+  incoming_byte_stream_push_arg *arg = argp;
+  grpc_chttp2_incoming_byte_stream *bs = arg->byte_stream;
   if (bs->on_next != NULL) {
-    *bs->next = slice;
+    *bs->next = arg->slice;
     grpc_exec_ctx_enqueue(exec_ctx, bs->on_next, true, NULL);
     bs->on_next = NULL;
   } else {
-    gpr_slice_buffer_add(&bs->slices, slice);
+    gpr_slice_buffer_add(&bs->slices, arg->slice);
   }
-  gpr_mu_unlock(&bs->transport->mu);
+  incoming_byte_stream_unref(exec_ctx, bs);
+}
+
+void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
+                                           grpc_chttp2_incoming_byte_stream *bs,
+                                           gpr_slice slice) {
+  incoming_byte_stream_push_arg arg = {bs, slice};
+  gpr_ref(&bs->refs);
+  grpc_chttp2_run_with_global_lock(exec_ctx, bs->transport, bs->stream,
+                                   incoming_byte_stream_push_locked, &arg,
+                                   sizeof(arg));
+}
+
+static void incoming_byte_stream_finished_failed_locked(
+    grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s,
+    void *argp) {
+  grpc_chttp2_incoming_byte_stream *bs = argp;
+  grpc_exec_ctx_enqueue(exec_ctx, bs->on_next, false, NULL);
+  bs->on_next = NULL;
+  bs->failed = 1;
+  incoming_byte_stream_unref(exec_ctx, bs);
+}
+
+static void incoming_byte_stream_finished_ok_locked(grpc_exec_ctx *exec_ctx,
+                                                    grpc_chttp2_transport *t,
+                                                    grpc_chttp2_stream *s,
+                                                    void *argp) {
+  grpc_chttp2_incoming_byte_stream *bs = argp;
+  incoming_byte_stream_unref(exec_ctx, bs);
 }
 
 void grpc_chttp2_incoming_byte_stream_finished(
     grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, int success,
     int from_parsing_thread) {
-  if (!success) {
-    if (from_parsing_thread) {
-      gpr_mu_lock(&bs->transport->mu);
-    }
-    grpc_exec_ctx_enqueue(exec_ctx, bs->on_next, false, NULL);
-    bs->on_next = NULL;
-    bs->failed = 1;
-    if (from_parsing_thread) {
-      gpr_mu_unlock(&bs->transport->mu);
+  if (from_parsing_thread) {
+    if (success) {
+      grpc_chttp2_run_with_global_lock(exec_ctx, bs->transport, bs->stream,
+                                       incoming_byte_stream_finished_ok_locked,
+                                       bs, 0);
+    } else {
+      incoming_byte_stream_finished_ok_locked(exec_ctx, bs->transport,
+                                              bs->stream, bs);
     }
   } else {
-#ifndef NDEBUG
-    if (from_parsing_thread) {
-      gpr_mu_lock(&bs->transport->mu);
-    }
-    GPR_ASSERT(bs->on_next == NULL);
-    if (from_parsing_thread) {
-      gpr_mu_unlock(&bs->transport->mu);
+    if (success) {
+      grpc_chttp2_run_with_global_lock(
+          exec_ctx, bs->transport, bs->stream,
+          incoming_byte_stream_finished_failed_locked, bs, 0);
+    } else {
+      incoming_byte_stream_finished_failed_locked(exec_ctx, bs->transport,
+                                                  bs->stream, bs);
     }
-#endif
   }
-  incoming_byte_stream_unref(bs);
 }
 
 grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
@@ -1680,6 +1872,7 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
   incoming_byte_stream->next_message = NULL;
   incoming_byte_stream->transport = TRANSPORT_FROM_PARSING(transport_parsing);
   incoming_byte_stream->stream = STREAM_FROM_PARSING(stream_parsing);
+  gpr_ref(&incoming_byte_stream->stream->global.active_streams);
   gpr_slice_buffer_init(&incoming_byte_stream->slices);
   incoming_byte_stream->on_next = NULL;
   incoming_byte_stream->is_tail = 1;
@@ -1810,7 +2003,7 @@ void grpc_chttp2_transport_start_reading(grpc_exec_ctx *exec_ctx,
                                          grpc_transport *transport,
                                          gpr_slice *slices, size_t nslices) {
   grpc_chttp2_transport *t = (grpc_chttp2_transport *)transport;
-  REF_TRANSPORT(t, "recv_data"); /* matches unref inside recv_data */
+  REF_TRANSPORT(t, "reading_action"); /* matches unref inside reading_action */
   gpr_slice_buffer_addn(&t->read_buffer, slices, nslices);
-  recv_data(exec_ctx, t, 1);
+  reading_action(exec_ctx, t, 1);
 }
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 98cd38abd43c2c2b681c2cf803e28af595100938..7a8084641d7a2006790afcecc3eb3908eac0f78c 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -291,27 +291,44 @@ struct grpc_chttp2_transport_parsing {
   int64_t outgoing_window;
 };
 
+typedef void (*grpc_chttp2_locked_action)(grpc_exec_ctx *ctx,
+                                          grpc_chttp2_transport *t,
+                                          grpc_chttp2_stream *s, void *arg);
+
+typedef struct grpc_chttp2_executor_action_header {
+  grpc_chttp2_stream *stream;
+  grpc_chttp2_locked_action action;
+  struct grpc_chttp2_executor_action_header *next;
+  void *arg;
+} grpc_chttp2_executor_action_header;
+
 struct grpc_chttp2_transport {
   grpc_transport base; /* must be first */
-  grpc_endpoint *ep;
   gpr_refcount refs;
+  grpc_endpoint *ep;
   char *peer_string;
 
   /** when this drops to zero it's safe to shutdown the endpoint */
   gpr_refcount shutdown_ep_refs;
 
-  gpr_mu mu;
+  struct {
+    gpr_mu mu;
+
+    /** is a thread currently in the global lock */
+    bool global_active;
+    /** is a thread currently writing */
+    bool writing_active;
+    /** is a thread currently parsing */
+    bool parsing_active;
+
+    grpc_chttp2_executor_action_header *pending_actions;
+  } executor;
 
   /** is the transport destroying itself? */
   uint8_t destroying;
   /** has the upper layer closed the transport? */
   uint8_t closed;
 
-  /** is a thread currently writing */
-  uint8_t writing_active;
-  /** is a thread currently parsing */
-  uint8_t parsing_active;
-
   /** is there a read request to the endpoint outstanding? */
   uint8_t endpoint_reading;
 
@@ -338,8 +355,10 @@ struct grpc_chttp2_transport {
 
   /** closure to execute writing */
   grpc_closure writing_action;
-  /** closure to finish reading from the endpoint */
-  grpc_closure recv_data;
+  /** closure to start reading from the endpoint */
+  grpc_closure reading_action;
+  /** closure to actually do parsing */
+  grpc_closure parsing_action;
 
   /** incoming read bytes */
   gpr_slice_buffer read_buffer;
@@ -397,21 +416,26 @@ typedef struct {
   grpc_transport_stream_stats *collecting_stats;
   grpc_transport_stream_stats stats;
 
+  /** number of streams that are currently being read */
+  gpr_refcount active_streams;
+
   /** when the application requests writes be closed, the write_closed is
       'queued'; when the close is flow controlled into the send path, we are
       'sending' it; when the write has been performed it is 'sent' */
-  uint8_t write_closed;
+  bool write_closed;
   /** is this stream reading half-closed (boolean) */
-  uint8_t read_closed;
+  bool read_closed;
+  /** are all published incoming byte streams closed */
+  bool all_incoming_byte_streams_finished;
   /** is this stream in the stream map? (boolean) */
-  uint8_t in_stream_map;
+  bool in_stream_map;
   /** has this stream seen an error? if 1, then pending incoming frames
       can be thrown away */
-  uint8_t seen_error;
+  bool seen_error;
 
-  uint8_t published_initial_metadata;
-  uint8_t published_trailing_metadata;
-  uint8_t faked_trailing_metadata;
+  bool published_initial_metadata;
+  bool published_trailing_metadata;
+  bool faked_trailing_metadata;
 
   grpc_chttp2_incoming_metadata_buffer received_initial_metadata;
   grpc_chttp2_incoming_metadata_buffer received_trailing_metadata;
@@ -570,6 +594,9 @@ int grpc_chttp2_list_pop_waiting_for_concurrency(
 void grpc_chttp2_list_add_check_read_ops(
     grpc_chttp2_transport_global *transport_global,
     grpc_chttp2_stream_global *stream_global);
+bool grpc_chttp2_list_remove_check_read_ops(
+    grpc_chttp2_transport_global *transport_global,
+    grpc_chttp2_stream_global *stream_global);
 int grpc_chttp2_list_pop_check_read_ops(
     grpc_chttp2_transport_global *transport_global,
     grpc_chttp2_stream_global **stream_global);
@@ -645,6 +672,12 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx,
                                        grpc_chttp2_stream_global *stream_global,
                                        grpc_closure **pclosure, int success);
 
+void grpc_chttp2_run_with_global_lock(grpc_exec_ctx *exec_ctx,
+                                      grpc_chttp2_transport *transport,
+                                      grpc_chttp2_stream *optional_stream,
+                                      grpc_chttp2_locked_action action,
+                                      void *arg, size_t sizeof_arg);
+
 #define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
 #define GRPC_CHTTP2_CLIENT_CONNECT_STRLEN \
   (sizeof(GRPC_CHTTP2_CLIENT_CONNECT_STRING) - 1)
diff --git a/src/core/ext/transport/chttp2/transport/stream_lists.c b/src/core/ext/transport/chttp2/transport/stream_lists.c
index e5b35aadca032d766f37d75257931a74ed7de326..8f3ab00e6df8bfe7f61ac58d9d35136f7a29731a 100644
--- a/src/core/ext/transport/chttp2/transport/stream_lists.c
+++ b/src/core/ext/transport/chttp2/transport/stream_lists.c
@@ -305,6 +305,14 @@ void grpc_chttp2_list_add_check_read_ops(
                   GRPC_CHTTP2_LIST_CHECK_READ_OPS);
 }
 
+bool grpc_chttp2_list_remove_check_read_ops(
+    grpc_chttp2_transport_global *transport_global,
+    grpc_chttp2_stream_global *stream_global) {
+  return stream_list_maybe_remove(TRANSPORT_FROM_GLOBAL(transport_global),
+                                  STREAM_FROM_GLOBAL(stream_global),
+                                  GRPC_CHTTP2_LIST_CHECK_READ_OPS);
+}
+
 int grpc_chttp2_list_pop_check_read_ops(
     grpc_chttp2_transport_global *transport_global,
     grpc_chttp2_stream_global **stream_global) {
diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c
index e36066d8639d653a79705f5b16d80915109d9f56..ad182d1f69f9f1bc59404c77d2884db77ada52d3 100644
--- a/src/core/lib/channel/channel_stack.c
+++ b/src/core/lib/channel/channel_stack.c
@@ -213,14 +213,16 @@ void grpc_call_stack_ignore_set_pollset(grpc_exec_ctx *exec_ctx,
                                         grpc_call_element *elem,
                                         grpc_pollset *pollset) {}
 
-void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack) {
+void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack,
+                             void *and_free_memory) {
   grpc_call_element *elems = CALL_ELEMS_FROM_STACK(stack);
   size_t count = stack->count;
   size_t i;
 
   /* destroy per-filter data */
   for (i = 0; i < count; i++) {
-    elems[i].filter->destroy_call_elem(exec_ctx, &elems[i]);
+    elems[i].filter->destroy_call_elem(exec_ctx, &elems[i],
+                                       i == count - 1 ? and_free_memory : NULL);
   }
 }
 
diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h
index 9e3a25a152d6408bb759a69ff7a41ba5ea986384..36c17cb467fcdf0a1aae09791494c66c5bb2c556 100644
--- a/src/core/lib/channel/channel_stack.h
+++ b/src/core/lib/channel/channel_stack.h
@@ -104,8 +104,12 @@ typedef struct {
   void (*set_pollset)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
                       grpc_pollset *pollset);
   /* Destroy per call data.
-     The filter does not need to do any chaining */
-  void (*destroy_call_elem)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem);
+     The filter does not need to do any chaining.
+     The bottom filter of a stack will be passed a non-NULL pointer to
+     \a and_free_memory that should be passed to gpr_free when destruction
+     is complete. */
+  void (*destroy_call_elem)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+                            void *and_free_memory);
 
   /* sizeof(per channel data) */
   size_t sizeof_channel_data;
@@ -223,7 +227,8 @@ void grpc_call_stack_set_pollset(grpc_exec_ctx *exec_ctx,
 #endif
 
 /* Destroy a call stack */
-void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack);
+void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack,
+                             void *and_free_memory);
 
 /* Ignore set pollset - used by filters to implement the set_pollset method
    if they don't care about pollsets at all. Does nothing. */
diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c
index 3d42d0e616187b0d5ca9475a0301f4327b75d0cb..bac0811fd34b8c8554474c9811a111e43d983efd 100644
--- a/src/core/lib/channel/compress_filter.c
+++ b/src/core/lib/channel/compress_filter.c
@@ -246,8 +246,8 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
 }
 
 /* Destructor for call_data */
-static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
-                              grpc_call_element *elem) {
+static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+                              void *ignored) {
   /* grab pointers to our data from the call element */
   call_data *calld = elem->call_data;
   gpr_slice_buffer_destroy(&calld->slices);
diff --git a/src/core/lib/channel/connected_channel.c b/src/core/lib/channel/connected_channel.c
index c1debab4c620f370c0770f1a912bf97cd96f7d44..68a3a7d6fdfc610ab472e88e6520b05afe0c27b5 100644
--- a/src/core/lib/channel/connected_channel.c
+++ b/src/core/lib/channel/connected_channel.c
@@ -102,12 +102,13 @@ static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
 }
 
 /* Destructor for call_data */
-static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
-                              grpc_call_element *elem) {
+static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+                              void *and_free_memory) {
   call_data *calld = elem->call_data;
   channel_data *chand = elem->channel_data;
   grpc_transport_destroy_stream(exec_ctx, chand->transport,
-                                TRANSPORT_STREAM_FROM_CALL_DATA(calld));
+                                TRANSPORT_STREAM_FROM_CALL_DATA(calld),
+                                and_free_memory);
 }
 
 /* Constructor for channel_data */
diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c
index 211f537c6919b6e69da003c2dfb341d149079167..516e708d1f039509ffe5d26acbbb9c154c7e61d4 100644
--- a/src/core/lib/channel/http_client_filter.c
+++ b/src/core/lib/channel/http_client_filter.c
@@ -155,8 +155,8 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
 }
 
 /* Destructor for call_data */
-static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
-                              grpc_call_element *elem) {}
+static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+                              void *ignored) {}
 
 static grpc_mdelem *scheme_from_args(const grpc_channel_args *args) {
   unsigned i;
diff --git a/src/core/lib/channel/http_server_filter.c b/src/core/lib/channel/http_server_filter.c
index 59dded6342802ce0c6b7a9d93392659025e88bfc..ba865416decdce1cc8fa864819568f22b9e9ef64 100644
--- a/src/core/lib/channel/http_server_filter.c
+++ b/src/core/lib/channel/http_server_filter.c
@@ -225,8 +225,8 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
 }
 
 /* Destructor for call_data */
-static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
-                              grpc_call_element *elem) {}
+static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+                              void *ignored) {}
 
 /* Constructor for channel_data */
 static void init_channel_elem(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/lib/iomgr/iomgr.c b/src/core/lib/iomgr/iomgr.c
index 146663984d624e738cfb32fba6df5d400d82c1ad..60cef8ba77703f3de1e4a3a29842377f63b0c43c 100644
--- a/src/core/lib/iomgr/iomgr.c
+++ b/src/core/lib/iomgr/iomgr.c
@@ -166,8 +166,10 @@ bool grpc_iomgr_abort_on_leaks(void) {
   if (env == NULL) return false;
   static const char *truthy[] = {"yes",  "Yes",  "YES", "true",
                                  "True", "TRUE", "1"};
+  bool should_we = false;
   for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) {
-    if (0 == strcmp(env, truthy[i])) return true;
+    if (0 == strcmp(env, truthy[i])) should_we = true;
   }
-  return false;
+  gpr_free(env);
+  return should_we;
 }
diff --git a/src/core/lib/security/client_auth_filter.c b/src/core/lib/security/client_auth_filter.c
index 943b1da85c60dc0c22714d5b2122ddbe59452435..8b58cb86bf92f8e0cb436a0f65303b3457da8a63 100644
--- a/src/core/lib/security/client_auth_filter.c
+++ b/src/core/lib/security/client_auth_filter.c
@@ -277,8 +277,8 @@ static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
 }
 
 /* Destructor for call_data */
-static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
-                              grpc_call_element *elem) {
+static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+                              void *ignored) {
   call_data *calld = elem->call_data;
   grpc_call_credentials_unref(calld->creds);
   if (calld->host != NULL) {
diff --git a/src/core/lib/security/server_auth_filter.c b/src/core/lib/security/server_auth_filter.c
index 7844dc87cb58853fd2c2c42be3d4505329d65dc9..3320497d21484bb5382dc0a8f80fdcf04d5a5caf 100644
--- a/src/core/lib/security/server_auth_filter.c
+++ b/src/core/lib/security/server_auth_filter.c
@@ -224,8 +224,8 @@ static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
                         grpc_pollset *pollset) {}
 
 /* Destructor for call_data */
-static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
-                              grpc_call_element *elem) {}
+static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+                              void *ignored) {}
 
 /* Constructor for channel_data */
 static void init_channel_elem(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index c4f310e2fb21cefdfde2f5380207c8e6b8d42a78..9b2b94eedf56fa513641983bfeffefab08f39bf3 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -373,8 +373,6 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, bool success) {
   if (c->receiving_stream != NULL) {
     grpc_byte_stream_destroy(exec_ctx, c->receiving_stream);
   }
-  grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c));
-  GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->channel, "call");
   gpr_mu_destroy(&c->mu);
   for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
     if (c->status[i].details) {
@@ -392,7 +390,9 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, bool success) {
   if (c->cq) {
     GRPC_CQ_INTERNAL_UNREF(c->cq, "bind");
   }
-  gpr_free(c);
+  grpc_channel *channel = c->channel;
+  grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), c);
+  GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "call");
   GPR_TIMER_END("destroy_call", 0);
 }
 
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c
index 5ec8808b508525a81ead9ca0c1072d1f098d3e71..1f82c3bad2e679165927c1ab59728231c737937a 100644
--- a/src/core/lib/surface/completion_queue.c
+++ b/src/core/lib/surface/completion_queue.c
@@ -227,6 +227,10 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
 #endif
 
   GPR_TIMER_BEGIN("grpc_cq_end_op", 0);
+  GRPC_API_TRACE(
+      "grpc_cq_end_op(exec_ctx=%p, cc=%p, tag=%p, success=%d, done=%p, "
+      "done_arg=%p, storage=%p)",
+      7, (exec_ctx, cc, tag, success, done, done_arg, storage));
 
   storage->tag = tag;
   storage->done = done;
diff --git a/src/core/lib/surface/lame_client.c b/src/core/lib/surface/lame_client.c
index 80bd95df68cd437fa1bae9a9b2e6b209d32001bf..f50ec54cea9d863f889a49b77675ed12915e58a3 100644
--- a/src/core/lib/surface/lame_client.c
+++ b/src/core/lib/surface/lame_client.c
@@ -107,8 +107,10 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx,
 static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
                            grpc_call_element_args *args) {}
 
-static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
-                              grpc_call_element *elem) {}
+static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+                              void *and_free_memory) {
+  gpr_free(and_free_memory);
+}
 
 static void init_channel_elem(grpc_exec_ctx *exec_ctx,
                               grpc_channel_element *elem,
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index ad8ee8c7a99ef3dc90fad5866e0c55a18c8296d8..2db95b9cdfc1fd1ebbb852bd217edf92ad6d9a1c 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -820,8 +820,8 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
   server_ref(chand->server);
 }
 
-static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
-                              grpc_call_element *elem) {
+static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+                              void *ignored) {
   channel_data *chand = elem->channel_data;
   call_data *calld = elem->call_data;
 
diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c
index 53c634adca679e7bdb48e5b0a966d4ae8bc9088e..e6d524abe68bb96a8b5e4e51d948135350af94d0 100644
--- a/src/core/lib/transport/transport.c
+++ b/src/core/lib/transport/transport.c
@@ -133,8 +133,9 @@ void grpc_transport_set_pollset(grpc_exec_ctx *exec_ctx,
 
 void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx,
                                    grpc_transport *transport,
-                                   grpc_stream *stream) {
-  transport->vtable->destroy_stream(exec_ctx, transport, stream);
+                                   grpc_stream *stream, void *and_free_memory) {
+  transport->vtable->destroy_stream(exec_ctx, transport, stream,
+                                    and_free_memory);
 }
 
 char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h
index 1eb446312bfd8ccea2f03bdc8653a181f6b39dfb..482a9d1791b7534b1aecc23a369bdde30f361fb0 100644
--- a/src/core/lib/transport/transport.h
+++ b/src/core/lib/transport/transport.h
@@ -98,6 +98,11 @@ void grpc_transport_move_stats(grpc_transport_stream_stats *from,
 /* Transport stream op: a set of operations to perform on a transport
    against a single stream */
 typedef struct grpc_transport_stream_op {
+  /** Should be enqueued when all requested operations (excluding recv_message
+      and recv_initial_metadata which have their own closures) in a given batch
+      have been completed. */
+  grpc_closure *on_complete;
+
   /** Send initial metadata to the peer, from the provided metadata batch.
       idempotent_request MUST be set if this is non-null */
   grpc_metadata_batch *send_initial_metadata;
@@ -129,11 +134,6 @@ typedef struct grpc_transport_stream_op {
   /** Collect any stats into provided buffer, zero internal stat counters */
   grpc_transport_stream_stats *collect_stats;
 
-  /** Should be enqueued when all requested operations (excluding recv_message
-      and recv_initial_metadata which have their own closures) in a given batch
-      have been completed. */
-  grpc_closure *on_complete;
-
   /** If != GRPC_STATUS_OK, cancel this stream */
   grpc_status_code cancel_with_status;
 
@@ -213,7 +213,7 @@ void grpc_transport_set_pollset(grpc_exec_ctx *exec_ctx,
                  caller, but any child memory must be cleaned up) */
 void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx,
                                    grpc_transport *transport,
-                                   grpc_stream *stream);
+                                   grpc_stream *stream, void *and_free_memory);
 
 void grpc_transport_stream_op_finish_with_failure(grpc_exec_ctx *exec_ctx,
                                                   grpc_transport_stream_op *op);
diff --git a/src/core/lib/transport/transport_impl.h b/src/core/lib/transport/transport_impl.h
index 2ff67073aff1edefeb564323e58a4619c648348a..956155eec828367aed782244788b5eb4a7ce1460 100644
--- a/src/core/lib/transport/transport_impl.h
+++ b/src/core/lib/transport/transport_impl.h
@@ -63,7 +63,7 @@ typedef struct grpc_transport_vtable {
 
   /* implementation of grpc_transport_destroy_stream */
   void (*destroy_stream)(grpc_exec_ctx *exec_ctx, grpc_transport *self,
-                         grpc_stream *stream);
+                         grpc_stream *stream, void *and_free_memory);
 
   /* implementation of grpc_transport_destroy */
   void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_transport *self);
diff --git a/test/core/channel/channel_stack_test.c b/test/core/channel/channel_stack_test.c
index 81e3927a00646866e9a3acbb0f4d35b922e8ca60..1a5594bde86a87934f6295a0f6f14b3b32267ad4 100644
--- a/test/core/channel/channel_stack_test.c
+++ b/test/core/channel/channel_stack_test.c
@@ -62,8 +62,8 @@ static void call_init_func(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
 static void channel_destroy_func(grpc_exec_ctx *exec_ctx,
                                  grpc_channel_element *elem) {}
 
-static void call_destroy_func(grpc_exec_ctx *exec_ctx,
-                              grpc_call_element *elem) {
+static void call_destroy_func(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+                              void *ignored) {
   ++*(int *)(elem->channel_data);
 }
 
@@ -87,7 +87,7 @@ static void free_channel(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
 }
 
 static void free_call(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
-  grpc_call_stack_destroy(exec_ctx, arg);
+  grpc_call_stack_destroy(exec_ctx, arg, NULL);
   gpr_free(arg);
 }
 
diff --git a/test/core/end2end/tests/filter_causes_close.c b/test/core/end2end/tests/filter_causes_close.c
index 9f9ee856485c455f82f286e5905d50ac8278d2de..99049aa6bd5b7580aab702a1baabe219c261d159 100644
--- a/test/core/end2end/tests/filter_causes_close.c
+++ b/test/core/end2end/tests/filter_causes_close.c
@@ -232,8 +232,8 @@ static void start_transport_stream_op(grpc_exec_ctx *exec_ctx,
 static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
                            grpc_call_element_args *args) {}
 
-static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
-                              grpc_call_element *elem) {}
+static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+                              void *and_free_memory) {}
 
 static void init_channel_elem(grpc_exec_ctx *exec_ctx,
                               grpc_channel_element *elem,