diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c
index 40897fb8f86494a5f12b2776989637d1a10cedee..6b21bcf6a9fa7034dbfe94e2c6dec435f38536cb 100644
--- a/src/core/iomgr/tcp_posix.c
+++ b/src/core/iomgr/tcp_posix.c
@@ -258,6 +258,8 @@ typedef struct {
   grpc_endpoint base;
   grpc_fd *em_fd;
   int fd;
+  int iov_size;            /* Number of slices to allocate per read attempt */
+  int finished_edge;
   size_t slice_size;
   gpr_refcount refcount;
 
@@ -315,9 +317,7 @@ static void call_read_cb(grpc_tcp *tcp, gpr_slice *slices, size_t nslices,
 
 #define INLINE_SLICE_BUFFER_SIZE 8
 #define MAX_READ_IOVEC 4
-static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) {
-  grpc_tcp *tcp = (grpc_tcp *)arg;
-  int iov_size = 1;
+static void grpc_tcp_continue_read(grpc_tcp *tcp) {
   gpr_slice static_read_slices[INLINE_SLICE_BUFFER_SIZE];
   struct msghdr msg;
   struct iovec iov[MAX_READ_IOVEC];
@@ -327,88 +327,103 @@ static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) {
   gpr_slice *final_slices;
   size_t final_nslices;
 
+  GPR_ASSERT(!tcp->finished_edge);
   GRPC_TIMER_BEGIN(GRPC_PTAG_HANDLE_READ, 0);
   slice_state_init(&read_state, static_read_slices, INLINE_SLICE_BUFFER_SIZE,
                    0);
 
-  if (!success) {
-    call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN);
-    grpc_tcp_unref(tcp);
-    return;
+  allocated_bytes = slice_state_append_blocks_into_iovec(
+      &read_state, iov, tcp->iov_size, tcp->slice_size);
+
+  msg.msg_name = NULL;
+  msg.msg_namelen = 0;
+  msg.msg_iov = iov;
+  msg.msg_iovlen = tcp->iov_size;
+  msg.msg_control = NULL;
+  msg.msg_controllen = 0;
+  msg.msg_flags = 0;
+
+  GRPC_TIMER_MARK(RECVMSG_BEGIN, 0);
+  do {
+    read_bytes = recvmsg(tcp->fd, &msg, 0);
+  } while (read_bytes < 0 && errno == EINTR);
+  GRPC_TIMER_MARK(RECVMSG_END, 0);
+
+  if (read_bytes < allocated_bytes) {
+    /* TODO(klempner): Consider a second read first, in hopes of getting a
+     * quick EAGAIN and saving a bunch of allocations. */
+    slice_state_remove_last(&read_state, read_bytes < 0
+                                             ? allocated_bytes
+                                             : allocated_bytes - read_bytes);
   }
 
-  /* TODO(klempner): Limit the amount we read at once. */
-  for (;;) {
-    allocated_bytes = slice_state_append_blocks_into_iovec(
-        &read_state, iov, iov_size, tcp->slice_size);
-
-    msg.msg_name = NULL;
-    msg.msg_namelen = 0;
-    msg.msg_iov = iov;
-    msg.msg_iovlen = iov_size;
-    msg.msg_control = NULL;
-    msg.msg_controllen = 0;
-    msg.msg_flags = 0;
-
-    GRPC_TIMER_BEGIN(GRPC_PTAG_RECVMSG, 0);
-    do {
-      read_bytes = recvmsg(tcp->fd, &msg, 0);
-    } while (read_bytes < 0 && errno == EINTR);
-    GRPC_TIMER_END(GRPC_PTAG_RECVMSG, 0);
-
-    if (read_bytes < allocated_bytes) {
-      /* TODO(klempner): Consider a second read first, in hopes of getting a
-       * quick EAGAIN and saving a bunch of allocations. */
-      slice_state_remove_last(&read_state, read_bytes < 0
-                                               ? allocated_bytes
-                                               : allocated_bytes - read_bytes);
-    }
-
-    if (read_bytes < 0) {
-      /* NB: After calling the user_cb a parallel call of the read handler may
-       * be running. */
-      if (errno == EAGAIN) {
-        if (slice_state_has_available(&read_state)) {
-          /* TODO(klempner): We should probably do the call into the application
-             without all this junk on the stack */
-          /* FIXME(klempner): Refcount properly */
-          slice_state_transfer_ownership(&read_state, &final_slices,
-                                         &final_nslices);
-          call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_OK);
-          slice_state_destroy(&read_state);
-          grpc_tcp_unref(tcp);
-        } else {
-          /* Spurious read event, consume it here */
-          slice_state_destroy(&read_state);
-          grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
-        }
-      } else {
-        /* TODO(klempner): Log interesting errors */
-        call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_ERROR);
-        slice_state_destroy(&read_state);
-        grpc_tcp_unref(tcp);
+  if (read_bytes < 0) {
+    /* NB: After calling the user_cb a parallel call of the read handler may
+     * be running. */
+    if (errno == EAGAIN) {
+      if (tcp->iov_size > 1) {
+        tcp->iov_size /= 2;
       }
-      return;
-    } else if (read_bytes == 0) {
-      /* 0 read size ==> end of stream */
       if (slice_state_has_available(&read_state)) {
-        /* there were bytes already read: pass them up to the application */
+        /* TODO(klempner): We should probably do the call into the application
+           without all this junk on the stack */
+        /* FIXME(klempner): Refcount properly */
         slice_state_transfer_ownership(&read_state, &final_slices,
                                        &final_nslices);
-        call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_EOF);
+        tcp->finished_edge = 1;
+        call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_OK);
+        slice_state_destroy(&read_state);
+        grpc_tcp_unref(tcp);
       } else {
-        call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_EOF);
+        /* We've consumed the edge, request a new one */
+        slice_state_destroy(&read_state);
+        grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
       }
+    } else {
+      /* TODO(klempner): Log interesting errors */
+      call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_ERROR);
       slice_state_destroy(&read_state);
       grpc_tcp_unref(tcp);
-      return;
-    } else if (iov_size < MAX_READ_IOVEC) {
-      ++iov_size;
     }
+  } else if (read_bytes == 0) {
+    /* 0 read size ==> end of stream */
+    if (slice_state_has_available(&read_state)) {
+      /* there were bytes already read: pass them up to the application */
+      slice_state_transfer_ownership(&read_state, &final_slices,
+                                     &final_nslices);
+      call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_EOF);
+    } else {
+      call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_EOF);
+    }
+    slice_state_destroy(&read_state);
+    grpc_tcp_unref(tcp);
+  } else {
+    if (tcp->iov_size < MAX_READ_IOVEC) {
+      ++tcp->iov_size;
+    }
+    GPR_ASSERT(slice_state_has_available(&read_state));
+    slice_state_transfer_ownership(&read_state, &final_slices,
+                                   &final_nslices);
+    call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_OK);
+    slice_state_destroy(&read_state);
+    grpc_tcp_unref(tcp);
   }
+
   GRPC_TIMER_END(GRPC_PTAG_HANDLE_READ, 0);
 }
 
+static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) {
+  grpc_tcp *tcp = (grpc_tcp *)arg;
+  GPR_ASSERT(!tcp->finished_edge);
+
+  if (!success) {
+    call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN);
+    grpc_tcp_unref(tcp);
+  } else {
+    grpc_tcp_continue_read(tcp);
+  }
+}
+
 static void grpc_tcp_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
                                     void *user_data) {
   grpc_tcp *tcp = (grpc_tcp *)ep;
@@ -416,7 +431,12 @@ static void grpc_tcp_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
   tcp->read_cb = cb;
   tcp->read_user_data = user_data;
   gpr_ref(&tcp->refcount);
-  grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
+  if (tcp->finished_edge) {
+    tcp->finished_edge = 0;
+    grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
+  } else {
+    grpc_iomgr_add_callback(grpc_tcp_handle_read, tcp);
+  }
 }
 
 #define MAX_WRITE_IOVEC 16
@@ -554,6 +574,8 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size) {
   tcp->read_user_data = NULL;
   tcp->write_user_data = NULL;
   tcp->slice_size = slice_size;
+  tcp->iov_size = 1;
+  tcp->finished_edge = 1;
   slice_state_init(&tcp->write_state, NULL, 0, 0);
   /* paired with unref in grpc_tcp_destroy */
   gpr_ref_init(&tcp->refcount, 1);
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 136921656fce1e46182c69d797622f72f567a230..070be1b25a88ac0099b2ca73dfd9357e870a3bcf 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -34,6 +34,7 @@
 #include "src/core/surface/call.h"
 #include "src/core/channel/channel_stack.h"
 #include "src/core/iomgr/alarm.h"
+#include "src/core/profiling/timers.h"
 #include "src/core/support/string.h"
 #include "src/core/surface/byte_buffer_queue.h"
 #include "src/core/surface/channel.h"
@@ -405,14 +406,14 @@ static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); }
 
 static int need_more_data(grpc_call *call) {
   return is_op_live(call, GRPC_IOREQ_RECV_INITIAL_METADATA) ||
-         is_op_live(call, GRPC_IOREQ_RECV_MESSAGE) ||
+         (is_op_live(call, GRPC_IOREQ_RECV_MESSAGE) && grpc_bbq_empty(&call->incoming_queue)) ||
          is_op_live(call, GRPC_IOREQ_RECV_TRAILING_METADATA) ||
          is_op_live(call, GRPC_IOREQ_RECV_STATUS) ||
          is_op_live(call, GRPC_IOREQ_RECV_STATUS_DETAILS) ||
          (is_op_live(call, GRPC_IOREQ_RECV_CLOSE) &&
           grpc_bbq_empty(&call->incoming_queue)) ||
          (call->write_state == WRITE_STATE_INITIAL && !call->is_client &&
-          call->read_state != READ_STATE_STREAM_CLOSED);
+          call->read_state < READ_STATE_GOT_INITIAL_METADATA);
 }
 
 static void unlock(grpc_call *call) {
@@ -685,6 +686,7 @@ static int add_slice_to_message(grpc_call *call, gpr_slice slice) {
 static void call_on_done_recv(void *pc, int success) {
   grpc_call *call = pc;
   size_t i;
+  GRPC_TIMER_MARK(CALL_ON_DONE_RECV_BEGIN, 0);
   lock(call);
   call->receiving = 0;
   if (success) {
@@ -729,6 +731,7 @@ static void call_on_done_recv(void *pc, int success) {
   unlock(call);
 
   GRPC_CALL_INTERNAL_UNREF(call, "receiving", 0);
+  GRPC_TIMER_MARK(CALL_ON_DONE_RECV_END, 0);
 }
 
 static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count,
diff --git a/src/core/surface/init.c b/src/core/surface/init.c
index bfee28e5fc1bc5d2b7d0f0d106d2f1ee98a13384..d6eb9b2c24b13defdf230bc92f81c368ddd0e7ad 100644
--- a/src/core/surface/init.c
+++ b/src/core/surface/init.c
@@ -59,6 +59,7 @@ void grpc_init(void) {
     grpc_register_tracer("channel", &grpc_trace_channel);
     grpc_register_tracer("surface", &grpc_surface_trace);
     grpc_register_tracer("http", &grpc_http_trace);
+    grpc_register_tracer("flowctl", &grpc_flowctl_trace);
     grpc_register_tracer("batch", &grpc_trace_batch);
     grpc_security_pre_init();
     grpc_iomgr_init();
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 0bee37c6142268b8687c0cbf75f64c55c5d668ee..97e6bae4bb90c178595357c409930ed17dfc2c90 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -37,6 +37,7 @@
 #include <stdio.h>
 #include <string.h>
 
+#include "src/core/profiling/timers.h"
 #include "src/core/support/string.h"
 #include "src/core/transport/chttp2/frame_data.h"
 #include "src/core/transport/chttp2/frame_goaway.h"
@@ -64,6 +65,7 @@
 #define CLIENT_CONNECT_STRLEN 24
 
 int grpc_http_trace = 0;
+int grpc_flowctl_trace = 0;
 
 typedef struct transport transport;
 typedef struct stream stream;
@@ -74,6 +76,12 @@ typedef struct stream stream;
   else                    \
   stmt
 
+#define FLOWCTL_TRACE(t, obj, dir, id, delta) \
+  if (!grpc_flowctl_trace)                    \
+    ;                                         \
+  else                                        \
+  flowctl_trace(t, #dir, obj->dir##_window, id, delta)
+
 /* streams are kept in various linked lists depending on what things need to
    happen to them... this enum labels each list */
 typedef enum {
@@ -382,6 +390,12 @@ static void add_to_pollset_locked(transport *t, grpc_pollset *pollset);
 static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op);
 static void add_metadata_batch(transport *t, stream *s);
 
+static void flowctl_trace(transport *t, const char *flow, gpr_int32 window,
+                          gpr_uint32 id, gpr_int32 delta) {
+  gpr_log(GPR_DEBUG, "HTTP:FLOW:%p:%d:%s: %d + %d = %d", t, id, flow, window,
+          delta, window + delta);
+}
+
 /*
  * CONSTRUCTION/DESTRUCTION/REFCOUNTING
  */
@@ -772,6 +786,8 @@ static void unlock(transport *t) {
   grpc_stream_op_buffer nuke_now;
   const grpc_transport_callbacks *cb = t->cb;
 
+  GRPC_TIMER_MARK(HTTP2_UNLOCK_BEGIN, 0);
+
   grpc_sopb_init(&nuke_now);
   if (t->nuke_later_sopb.nops) {
     grpc_sopb_swap(&nuke_now, &t->nuke_later_sopb);
@@ -820,6 +836,8 @@ static void unlock(transport *t) {
   /* finally unlock */
   gpr_mu_unlock(&t->mu);
 
+  GRPC_TIMER_MARK(HTTP2_UNLOCK_CLEANUP, 0);
+
   /* perform some callbacks if necessary */
   for (i = 0; i < num_goaways; i++) {
     cb->goaway(t->cb_user_data, &t->base, goaways[i].status, goaways[i].debug);
@@ -850,6 +868,8 @@ static void unlock(transport *t) {
   grpc_sopb_destroy(&nuke_now);
 
   gpr_free(goaways);
+
+  GRPC_TIMER_MARK(HTTP2_UNLOCK_END, 0);
 }
 
 /*
@@ -896,6 +916,8 @@ static int prepare_write(transport *t) {
     window_delta = grpc_chttp2_preencode(
         s->outgoing_sopb->ops, &s->outgoing_sopb->nops,
         GPR_MIN(t->outgoing_window, s->outgoing_window), &s->writing_sopb);
+    FLOWCTL_TRACE(t, t, outgoing, 0, -(gpr_int64)window_delta);
+    FLOWCTL_TRACE(t, s, outgoing, s->id, -(gpr_int64)window_delta);
     t->outgoing_window -= window_delta;
     s->outgoing_window -= window_delta;
 
@@ -924,6 +946,7 @@ static int prepare_write(transport *t) {
     if (!s->read_closed && window_delta) {
       gpr_slice_buffer_add(
           &t->outbuf, grpc_chttp2_window_update_create(s->id, window_delta));
+      FLOWCTL_TRACE(t, s, incoming, s->id, window_delta);
       s->incoming_window += window_delta;
     }
   }
@@ -933,6 +956,7 @@ static int prepare_write(transport *t) {
     window_delta = t->connection_window_target - t->incoming_window;
     gpr_slice_buffer_add(&t->outbuf,
                          grpc_chttp2_window_update_create(0, window_delta));
+    FLOWCTL_TRACE(t, t, incoming, 0, window_delta);
     t->incoming_window += window_delta;
   }
 
@@ -1259,6 +1283,8 @@ static grpc_chttp2_parse_error update_incoming_window(transport *t, stream *s) {
     return GRPC_CHTTP2_CONNECTION_ERROR;
   }
 
+  FLOWCTL_TRACE(t, t, incoming, 0, -(gpr_int64)t->incoming_frame_size);
+  FLOWCTL_TRACE(t, s, incoming, s->id, -(gpr_int64)t->incoming_frame_size);
   t->incoming_window -= t->incoming_frame_size;
   s->incoming_window -= t->incoming_frame_size;
 
@@ -1608,6 +1634,7 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
         for (i = 0; i < t->stream_map.count; i++) {
           stream *s = (stream *)(t->stream_map.values[i]);
           int was_window_empty = s->outgoing_window <= 0;
+          FLOWCTL_TRACE(t, s, outgoing, s->id, st.initial_window_update);
           s->outgoing_window += st.initial_window_update;
           if (was_window_empty && s->outgoing_window > 0 && s->outgoing_sopb &&
               s->outgoing_sopb->nops > 0) {
@@ -1626,6 +1653,7 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
                                       GRPC_CHTTP2_FLOW_CONTROL_ERROR),
                             GRPC_CHTTP2_FLOW_CONTROL_ERROR, NULL, 1);
             } else {
+              FLOWCTL_TRACE(t, s, outgoing, s->id, st.window_update);
               s->outgoing_window += st.window_update;
               /* if this window update makes outgoing ops writable again,
                  flag that */
@@ -1640,6 +1668,7 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
           if (!is_window_update_legal(st.window_update, t->outgoing_window)) {
             drop_connection(t);
           } else {
+            FLOWCTL_TRACE(t, t, outgoing, 0, st.window_update);
             t->outgoing_window += st.window_update;
           }
         }
diff --git a/src/core/transport/chttp2_transport.h b/src/core/transport/chttp2_transport.h
index a7f1b9a8646872e6e8a4d9f64507cf455844d987..fad714fabf5d9d4edceeaa8f95c862f590f8cec7 100644
--- a/src/core/transport/chttp2_transport.h
+++ b/src/core/transport/chttp2_transport.h
@@ -38,6 +38,7 @@
 #include "src/core/transport/transport.h"
 
 extern int grpc_http_trace;
+extern int grpc_flowctl_trace;
 
 void grpc_create_chttp2_transport(grpc_transport_setup_callback setup,
                                   void *arg,