From 629b0ed8041f96968e8e61c2b4992d08a38cf28a Mon Sep 17 00:00:00 2001
From: Craig Tiller <ctiller@google.com>
Date: Wed, 22 Apr 2015 11:14:26 -0700
Subject: [PATCH] Call compiles

---
 src/core/channel/connected_channel.c |  39 ----
 src/core/surface/call.c              | 263 ++++++++++++++++++++-------
 src/core/surface/call.h              |  14 --
 src/core/surface/channel.c           |  23 +++
 src/core/surface/channel.h           |   1 +
 src/core/transport/stream_op.h       |  28 ++-
 src/core/transport/transport.h       |  15 --
 7 files changed, 246 insertions(+), 137 deletions(-)

diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c
index d0b834a10a..9e2d92ffbc 100644
--- a/src/core/channel/connected_channel.c
+++ b/src/core/channel/connected_channel.c
@@ -45,8 +45,6 @@
 #include <grpc/support/slice_buffer.h>
 
 #define MAX_BUFFER_LENGTH 8192
-/* the protobuf library will (by default) start warning at 100megs */
-#define DEFAULT_MAX_MESSAGE_LENGTH (100 * 1024 * 1024)
 
 typedef struct connected_channel_channel_data {
   grpc_transport *transport;
@@ -63,24 +61,6 @@ typedef struct connected_channel_call_data {
 #define CALL_DATA_FROM_TRANSPORT_STREAM(transport_stream) \
   (((call_data *)(transport_stream)) - 1)
 
-#if 0
-/* Copy the contents of a byte buffer into stream ops */
-static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer,
-                                           grpc_stream_op_buffer *sopb) {
-  size_t i;
-
-  switch (byte_buffer->type) {
-    case GRPC_BB_SLICE_BUFFER:
-      for (i = 0; i < byte_buffer->data.slice_buffer.count; i++) {
-        gpr_slice slice = byte_buffer->data.slice_buffer.slices[i];
-        gpr_slice_ref(slice);
-        grpc_sopb_add_slice(sopb, slice);
-      }
-      break;
-  }
-}
-#endif
-
 /* Intercept a call operation and either push it directly up or translate it
    into transport stream operations */
 static void con_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) {
@@ -145,25 +125,6 @@ static void init_channel_elem(grpc_channel_element *elem,
   GPR_ASSERT(is_last);
   GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
   cd->transport = NULL;
-
-#if 0
-  cd->max_message_length = DEFAULT_MAX_MESSAGE_LENGTH;
-  if (args) {
-    for (i = 0; i < args->num_args; i++) {
-      if (0 == strcmp(args->args[i].key, GRPC_ARG_MAX_MESSAGE_LENGTH)) {
-        if (args->args[i].type != GRPC_ARG_INTEGER) {
-          gpr_log(GPR_ERROR, "%s ignored: it must be an integer",
-                  GRPC_ARG_MAX_MESSAGE_LENGTH);
-        } else if (args->args[i].value.integer < 0) {
-          gpr_log(GPR_ERROR, "%s ignored: it must be >= 0",
-                  GRPC_ARG_MAX_MESSAGE_LENGTH);
-        } else {
-          cd->max_message_length = args->args[i].value.integer;
-        }
-      }
-    }
-  }
-#endif  
 }
 
 /* Destructor for channel_data */
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index fb2efac74e..7fcf6e2b04 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -144,12 +144,14 @@ struct grpc_call {
   gpr_uint8 have_alarm;
   /* are we currently performing a send operation */
   gpr_uint8 sending;
+  /* are we currently performing a recv operation */
+  gpr_uint8 receiving;
   /* are we currently completing requests */
   gpr_uint8 completing;
   /* pairs with completed_requests */
   gpr_uint8 num_completed_requests;
-  /* flag that we need to request more data */
-  gpr_uint8 need_more_data;
+  /* are we currently reading a message? */
+  gpr_uint8 reading_message;
   /* flags with bits corresponding to write states allowing us to determine
      what was sent */
   gpr_uint16 last_send_contains;
@@ -221,6 +223,9 @@ struct grpc_call {
   grpc_stream_op_buffer recv_ops;
   grpc_stream_state recv_state;
 
+  gpr_slice_buffer incoming_message;
+  gpr_uint32 incoming_message_length;
+
   /* Data that the legacy api needs to track. To be deleted at some point
      soon */
   legacy_state *legacy_state;
@@ -246,6 +251,8 @@ static void call_on_done_recv(void *call, int success);
 static void call_on_done_send(void *call, int success);
 static int fill_send_ops(grpc_call *call, grpc_transport_op *op);
 static void execute_op(grpc_call *call, grpc_transport_op *op);
+static void recv_metadata(grpc_call *call, grpc_metadata_batch *metadata);
+static void finish_read_ops(grpc_call *call);
 
 grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
                             const void *server_transport_data,
@@ -378,6 +385,15 @@ static int is_op_live(grpc_call *call, grpc_ioreq_op op) {
 
 static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); }
 
+static int need_more_data(grpc_call *call) {
+  return is_op_live(call, GRPC_IOREQ_RECV_INITIAL_METADATA) ||
+         is_op_live(call, GRPC_IOREQ_RECV_MESSAGE) ||
+         is_op_live(call, GRPC_IOREQ_RECV_TRAILING_METADATA) ||
+         is_op_live(call, GRPC_IOREQ_RECV_STATUS) ||
+         is_op_live(call, GRPC_IOREQ_RECV_STATUS_DETAILS) ||
+         is_op_live(call, GRPC_IOREQ_RECV_CLOSE);
+}
+
 static void unlock(grpc_call *call) {
   grpc_transport_op op;
   completed_request completed_requests[GRPC_IOREQ_OP_COUNT];
@@ -387,13 +403,14 @@ static void unlock(grpc_call *call) {
 
   memset(&op, 0, sizeof(op));
 
-  if (call->need_more_data &&
-      (call->write_state >= WRITE_STATE_STARTED || !call->is_client)) {
+  if (!call->receiving &&
+      (call->write_state >= WRITE_STATE_STARTED || !call->is_client) &&
+      need_more_data(call)) {
     op.recv_ops = &call->recv_ops;
     op.recv_state = &call->recv_state;
     op.on_done_recv = call_on_done_recv;
     op.recv_user_data = call;
-    call->need_more_data = 0;
+    call->receiving = 1;
     grpc_call_internal_ref(call);
     start_op = 1;
   }
@@ -570,6 +587,121 @@ static void call_on_done_send(void *pc, int success) {
   grpc_call_internal_unref(call, 0);
 }
 
+static void finish_message(grpc_call *call) {
+  /* TODO(ctiller): this could be a lot faster if coded directly */
+  grpc_byte_buffer *byte_buffer = grpc_byte_buffer_create(
+      call->incoming_message.slices, call->incoming_message.count);
+  gpr_slice_buffer_reset_and_unref(&call->incoming_message);
+
+  grpc_bbq_push(&call->incoming_queue, byte_buffer);
+
+  GPR_ASSERT(call->incoming_message.count == 0);
+  call->reading_message = 0;
+}
+
+static int begin_message(grpc_call *call, grpc_begin_message msg) {
+  /* can't begin a message when we're still reading a message */
+  if (call->reading_message) {
+    char *message = NULL;
+    gpr_asprintf(
+        &message, "Message terminated early; read %d bytes, expected %d",
+        (int)call->incoming_message.length, (int)call->incoming_message_length);
+    grpc_call_cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message);
+    gpr_free(message);
+    return 0;
+  }
+  /* stash away parameters, and prepare for incoming slices */
+  if (msg.length > grpc_channel_get_max_message_length(call->channel)) {
+    char *message = NULL;
+    gpr_asprintf(
+        &message,
+        "Maximum message length of %d exceeded by a message of length %d",
+        grpc_channel_get_max_message_length(call->channel), msg.length);
+    grpc_call_cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message);
+    gpr_free(message);
+    return 0;
+  } else if (msg.length > 0) {
+    call->reading_message = 1;
+    call->incoming_message_length = msg.length;
+    return 1;
+  } else {
+    finish_message(call);
+    return 1;
+  }
+}
+
+static int add_slice_to_message(grpc_call *call, gpr_slice slice) {
+  if (GPR_SLICE_LENGTH(slice) == 0) {
+    gpr_slice_unref(slice);
+    return 1;
+  }
+  /* we have to be reading a message to know what to do here */
+  if (!call->reading_message) {
+    grpc_call_cancel_with_status(
+        call, GRPC_STATUS_INVALID_ARGUMENT,
+        "Received payload data while not reading a message");
+    return 0;
+  }
+  /* append the slice to the incoming buffer */
+  gpr_slice_buffer_add(&call->incoming_message, slice);
+  if (call->incoming_message.length > call->incoming_message_length) {
+    /* if we got too many bytes, complain */
+    char *message = NULL;
+    gpr_asprintf(
+        &message, "Receiving message overflow; read %d bytes, expected %d",
+        (int)call->incoming_message.length, (int)call->incoming_message_length);
+    grpc_call_cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message);
+    gpr_free(message);
+    return 0;
+  } else if (call->incoming_message.length == call->incoming_message_length) {
+    finish_message(call);
+    return 1;
+  } else {
+    return 1;
+  }
+}
+
+static void call_on_done_recv(void *pc, int success) {
+  grpc_call *call = pc;
+  size_t i;
+  int unref = 0;
+  lock(call);
+  for (i = 0; success && i < call->recv_ops.nops; i++) {
+    grpc_stream_op *op = &call->recv_ops.ops[i];
+    switch (op->type) {
+      case GRPC_NO_OP:
+        break;
+      case GRPC_OP_METADATA:
+        recv_metadata(call, &op->data.metadata);
+        break;
+      case GRPC_OP_BEGIN_MESSAGE:
+        success = begin_message(call, op->data.begin_message);
+        break;
+      case GRPC_OP_SLICE:
+        success = add_slice_to_message(call, op->data.slice);
+        break;
+    }
+  }
+  if (call->recv_state == GRPC_STREAM_RECV_CLOSED) {
+    GPR_ASSERT(call->read_state <= READ_STATE_READ_CLOSED);
+    call->read_state = READ_STATE_READ_CLOSED;
+  }
+  if (call->recv_state == GRPC_STREAM_CLOSED) {
+    GPR_ASSERT(call->read_state <= READ_STATE_STREAM_CLOSED);
+    call->read_state = READ_STATE_STREAM_CLOSED;
+    unref = 1;
+  }
+  if (!success) {
+    abort();
+  }
+  finish_read_ops(call);
+  unlock(call);
+
+  if (unref) {
+    grpc_call_internal_unref(call, 0);
+  }
+}
+
 static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count,
                                                 grpc_metadata *metadata) {
   size_t i;
@@ -595,6 +727,22 @@ static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count,
   return out;
 }
 
+/* Copy the contents of a byte buffer into stream ops */
+static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer,
+                                           grpc_stream_op_buffer *sopb) {
+  size_t i;
+
+  switch (byte_buffer->type) {
+    case GRPC_BB_SLICE_BUFFER:
+      for (i = 0; i < byte_buffer->data.slice_buffer.count; i++) {
+        gpr_slice slice = byte_buffer->data.slice_buffer.slices[i];
+        gpr_slice_ref(slice);
+        grpc_sopb_add_slice(sopb, slice);
+      }
+      break;
+  }
+}
+
 static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
   grpc_ioreq_data data;
   grpc_metadata_batch mdb;
@@ -608,24 +756,25 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
         break;
       }
       data = call->request_data[GRPC_IOREQ_SEND_INITIAL_METADATA];
-      mdb.list = chain_metadata_from_app(
-          call, data.send_metadata.count, data.send_metadata.metadata);
+      mdb.list = chain_metadata_from_app(call, data.send_metadata.count,
+                                         data.send_metadata.metadata);
       mdb.garbage.head = mdb.garbage.tail = NULL;
       mdb.deadline = call->send_deadline;
       for (i = 0; i < call->send_initial_metadata_count; i++) {
-        grpc_metadata_batch_link_head(&mdb,
-                                      &call->send_initial_metadata[i]);
+        grpc_metadata_batch_link_head(&mdb, &call->send_initial_metadata[i]);
       }
       grpc_sopb_add_metadata(&call->send_ops, mdb);
       op->send_ops = &call->send_ops;
       op->bind_pollset = grpc_cq_pollset(call->cq);
       call->last_send_contains |= 1 << GRPC_IOREQ_SEND_INITIAL_METADATA;
       call->write_state = WRITE_STATE_STARTED;
-      /* fall through intended */
+    /* fall through intended */
     case WRITE_STATE_STARTED:
       if (is_op_live(call, GRPC_IOREQ_SEND_MESSAGE)) {
         data = call->request_data[GRPC_IOREQ_SEND_MESSAGE];
-        grpc_sopb_add_message(&call->send_ops, data.send_message);
+        grpc_sopb_add_begin_message(
+            &call->send_ops, grpc_byte_buffer_length(data.send_message), 0);
+        copy_byte_buffer_to_stream_ops(data.send_message, &call->send_ops);
         op->send_ops = &call->send_ops;
         call->last_send_contains |= 1 << GRPC_IOREQ_SEND_MESSAGE;
       }
@@ -637,8 +786,8 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
         if (!call->is_client) {
           /* send trailing metadata */
           data = call->request_data[GRPC_IOREQ_SEND_TRAILING_METADATA];
-          mdb.list = chain_metadata_from_app(
-              call, data.send_metadata.count, data.send_metadata.metadata);
+          mdb.list = chain_metadata_from_app(call, data.send_metadata.count,
+                                             data.send_metadata.metadata);
           mdb.garbage.head = mdb.garbage.tail = NULL;
           mdb.deadline = call->send_deadline;
           /* send status */
@@ -656,7 +805,8 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
                 &mdb, &call->details_link,
                 grpc_mdelem_from_metadata_strings(
                     call->metadata_context,
-                    grpc_mdstr_ref(grpc_channel_get_message_string(call->channel)),
+                    grpc_mdstr_ref(
+                        grpc_channel_get_message_string(call->channel)),
                     grpc_mdstr_from_string(call->metadata_context,
                                            data.send_status.details)));
           }
@@ -779,10 +929,6 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
   master->on_complete = completion;
   master->user_data = user_data;
 
-  if (have_ops & (1u << GRPC_IOREQ_RECV_MESSAGE)) {
-    call->need_more_data = 1;
-  }
-
   finish_read_ops(call);
   early_out_write_ops(call);
 
@@ -867,28 +1013,6 @@ static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline) {
   grpc_alarm_init(&call->alarm, deadline, call_alarm, call, gpr_now());
 }
 
-static void set_read_state_locked(grpc_call *call, read_state state) {
-  GPR_ASSERT(call->read_state < state);
-  call->read_state = state;
-  finish_read_ops(call);
-}
-
-static void set_read_state(grpc_call *call, read_state state) {
-  lock(call);
-  set_read_state_locked(call, state);
-  unlock(call);
-}
-
-void grpc_call_read_closed(grpc_call_element *elem) {
-  set_read_state(CALL_FROM_TOP_ELEM(elem), READ_STATE_READ_CLOSED);
-}
-
-void grpc_call_stream_closed(grpc_call_element *elem) {
-  grpc_call *call = CALL_FROM_TOP_ELEM(elem);
-  set_read_state(call, READ_STATE_STREAM_CLOSED);
-  grpc_call_internal_unref(call, 0);
-}
-
 /* we offset status by a small amount when storing it into transport metadata
    as metadata cannot store a 0 value (which is used as OK for grpc_status_codes
    */
@@ -912,35 +1036,13 @@ static gpr_uint32 decode_status(grpc_mdelem *md) {
   return status;
 }
 
-void grpc_call_recv_message(grpc_call_element *elem,
-                            grpc_byte_buffer *byte_buffer) {
-  grpc_call *call = CALL_FROM_TOP_ELEM(elem);
-  lock(call);
-  grpc_bbq_push(&call->incoming_queue, byte_buffer);
-  finish_read_ops(call);
-  unlock(call);
-}
-
-void grpc_call_recv_synthetic_status(grpc_call_element *elem,
-                                     grpc_status_code status,
-                                     const char *message) {
-  grpc_call *call = CALL_FROM_TOP_ELEM(elem);
-  lock(call);
-  set_status_code(call, STATUS_FROM_CORE, status);
-  set_status_details(call, STATUS_FROM_CORE,
-                     grpc_mdstr_from_string(call->metadata_context, message));
-  unlock(call);
-}
-
-int grpc_call_recv_metadata(grpc_call_element *elem, grpc_metadata_batch *md) {
-  grpc_call *call = CALL_FROM_TOP_ELEM(elem);
+static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) {
   grpc_linked_mdelem *l;
   grpc_metadata_array *dest;
   grpc_metadata *mdusr;
   int is_trailing;
   grpc_mdctx *mdctx = call->metadata_context;
 
-  lock(call);
   is_trailing = call->read_state >= READ_STATE_GOT_INITIAL_METADATA;
   for (l = md->list.head; l != NULL; l = l->next) {
     grpc_mdelem *md = l->md;
@@ -976,9 +1078,8 @@ int grpc_call_recv_metadata(grpc_call_element *elem, grpc_metadata_batch *md) {
     set_deadline_alarm(call, md->deadline);
   }
   if (!is_trailing) {
-    set_read_state_locked(call, READ_STATE_GOT_INITIAL_METADATA);
+    call->read_state = READ_STATE_GOT_INITIAL_METADATA;
   }
-  unlock(call);
 
   grpc_mdctx_lock(mdctx);
   for (l = md->list.head; l; l = l->next) {
@@ -988,13 +1089,43 @@ int grpc_call_recv_metadata(grpc_call_element *elem, grpc_metadata_batch *md) {
     grpc_mdctx_locked_mdelem_unref(mdctx, l->md);
   }
   grpc_mdctx_unlock(mdctx);
+}
+
+#if 0
+void grpc_call_read_closed(grpc_call_element *elem) {
+  set_read_state(CALL_FROM_TOP_ELEM(elem), READ_STATE_READ_CLOSED);
+}
 
-  return !is_trailing;
+void grpc_call_stream_closed(grpc_call_element *elem) {
+  grpc_call *call = CALL_FROM_TOP_ELEM(elem);
+  set_read_state(call, READ_STATE_STREAM_CLOSED);
+  grpc_call_internal_unref(call, 0);
+}
+
+void grpc_call_recv_message(grpc_call_element *elem,
+                            grpc_byte_buffer *byte_buffer) {
+  grpc_call *call = CALL_FROM_TOP_ELEM(elem);
+  lock(call);
+  grpc_bbq_push(&call->incoming_queue, byte_buffer);
+  finish_read_ops(call);
+  unlock(call);
+}
+
+void grpc_call_recv_synthetic_status(grpc_call_element *elem,
+                                     grpc_status_code status,
+                                     const char *message) {
+  grpc_call *call = CALL_FROM_TOP_ELEM(elem);
+  lock(call);
+  set_status_code(call, STATUS_FROM_CORE, status);
+  set_status_details(call, STATUS_FROM_CORE,
+                     grpc_mdstr_from_string(call->metadata_context, message));
+  unlock(call);
 }
 
 grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) {
   return CALL_STACK_FROM_CALL(call);
 }
+#endif
 
 /*
  * BATCH API IMPLEMENTATION
diff --git a/src/core/surface/call.h b/src/core/surface/call.h
index 358e5560a3..199beb1738 100644
--- a/src/core/surface/call.h
+++ b/src/core/surface/call.h
@@ -96,26 +96,12 @@ grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call);
 void grpc_call_internal_ref(grpc_call *call);
 void grpc_call_internal_unref(grpc_call *call, int allow_immediate_deletion);
 
-/* Helpers for grpc_client, grpc_server filters to publish received data to
-   the completion queue/surface layer */
-/* receive metadata - returns 1 if this was initial metadata */
-int grpc_call_recv_metadata(grpc_call_element *surface_element,
-                            grpc_metadata_batch *md);
-void grpc_call_recv_message(grpc_call_element *surface_element,
-                            grpc_byte_buffer *message);
-void grpc_call_read_closed(grpc_call_element *surface_element);
-void grpc_call_stream_closed(grpc_call_element *surface_element);
-
 grpc_call_error grpc_call_start_ioreq_and_call_back(
     grpc_call *call, const grpc_ioreq *reqs, size_t nreqs,
     grpc_ioreq_completion_func on_complete, void *user_data);
 
 grpc_call_stack *grpc_call_get_call_stack(grpc_call *call);
 
-void grpc_call_recv_synthetic_status(grpc_call_element *elem,
-                                     grpc_status_code status,
-                                     const char *message);
-
 /* Given the top call_element, get the call object. */
 grpc_call *grpc_call_from_top_element(grpc_call_element *surface_element);
 
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index 29b042e7c1..f1d71afaf2 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -52,6 +52,7 @@ typedef struct registered_call {
 struct grpc_channel {
   int is_client;
   gpr_refcount refs;
+  gpr_uint32 max_message_length;
   grpc_mdctx *metadata_context;
   grpc_mdstr *grpc_status_string;
   grpc_mdstr *grpc_message_string;
@@ -68,9 +69,13 @@ struct grpc_channel {
 #define CHANNEL_FROM_TOP_ELEM(top_elem) \
   CHANNEL_FROM_CHANNEL_STACK(grpc_channel_stack_from_top_element(top_elem))
 
+/* the protobuf library will (by default) start warning at 100megs */
+#define DEFAULT_MAX_MESSAGE_LENGTH (100 * 1024 * 1024)
+
 grpc_channel *grpc_channel_create_from_filters(
     const grpc_channel_filter **filters, size_t num_filters,
     const grpc_channel_args *args, grpc_mdctx *mdctx, int is_client) {
+  size_t i;
   size_t size =
       sizeof(grpc_channel) + grpc_channel_stack_size(filters, num_filters);
   grpc_channel *channel = gpr_malloc(size);
@@ -88,6 +93,24 @@ grpc_channel *grpc_channel_create_from_filters(
                           CHANNEL_STACK_FROM_CHANNEL(channel));
   gpr_mu_init(&channel->registered_call_mu);
   channel->registered_calls = NULL;
+
+  channel->max_message_length = DEFAULT_MAX_MESSAGE_LENGTH;
+  if (args) {
+    for (i = 0; i < args->num_args; i++) {
+      if (0 == strcmp(args->args[i].key, GRPC_ARG_MAX_MESSAGE_LENGTH)) {
+        if (args->args[i].type != GRPC_ARG_INTEGER) {
+          gpr_log(GPR_ERROR, "%s ignored: it must be an integer",
+                  GRPC_ARG_MAX_MESSAGE_LENGTH);
+        } else if (args->args[i].value.integer < 0) {
+          gpr_log(GPR_ERROR, "%s ignored: it must be >= 0",
+                  GRPC_ARG_MAX_MESSAGE_LENGTH);
+        } else {
+          channel->max_message_length = args->args[i].value.integer;
+        }
+      }
+    }
+  }
+
   return channel;
 }
 
diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h
index d3e51185ee..05d57a905b 100644
--- a/src/core/surface/channel.h
+++ b/src/core/surface/channel.h
@@ -44,6 +44,7 @@ grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel);
 grpc_mdctx *grpc_channel_get_metadata_context(grpc_channel *channel);
 grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel);
 grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel);
+gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel);
 
 void grpc_client_channel_closed(grpc_channel_element *elem);
 
diff --git a/src/core/transport/stream_op.h b/src/core/transport/stream_op.h
index dabe68f3bd..c3901bf608 100644
--- a/src/core/transport/stream_op.h
+++ b/src/core/transport/stream_op.h
@@ -50,9 +50,22 @@ typedef enum grpc_stream_op_code {
      Must be ignored by receivers */
   GRPC_NO_OP,
   GRPC_OP_METADATA,
-  GRPC_OP_MESSAGE
+  /* Begin a message/metadata element/status - as defined by
+     grpc_message_type. */
+  GRPC_OP_BEGIN_MESSAGE,
+  /* Add a slice of data to the current message/metadata element/status.
+     Must not overflow the forward declared length. */
+  GRPC_OP_SLICE
 } grpc_stream_op_code;
 
+/* Arguments for GRPC_OP_BEGIN */
+typedef struct grpc_begin_message {
+  /* How many bytes of data will this message contain */
+  gpr_uint32 length;
+  /* Write flags for the message: see grpc.h GRPC_WRITE_xxx */
+  gpr_uint32 flags;
+} grpc_begin_message;
+
 typedef struct grpc_linked_mdelem {
   grpc_mdelem *md;
   struct grpc_linked_mdelem *next;
@@ -105,8 +118,9 @@ typedef struct grpc_stream_op {
   /* the arguments to this operation. union fields are named according to the
      associated op-code */
   union {
-    grpc_byte_buffer *message;
+    grpc_begin_message begin_message;
     grpc_metadata_batch metadata;
+    gpr_slice slice;
   } data;
 } grpc_stream_op;
 
@@ -134,8 +148,16 @@ void grpc_stream_ops_unref_owned_objects(grpc_stream_op *ops, size_t nops);
 
 /* Append a GRPC_NO_OP to a buffer */
 void grpc_sopb_add_no_op(grpc_stream_op_buffer *sopb);
-void grpc_sopb_add_message(grpc_stream_op_buffer *sopb, grpc_byte_buffer *bb);
+/* Append a GRPC_OP_BEGIN to a buffer */
+void grpc_sopb_add_begin_message(grpc_stream_op_buffer *sopb, gpr_uint32 length,
+                                 gpr_uint32 flags);
 void grpc_sopb_add_metadata(grpc_stream_op_buffer *sopb, grpc_metadata_batch metadata);
+/* Append a GRPC_SLICE to a buffer - does not ref/unref the slice */
+void grpc_sopb_add_slice(grpc_stream_op_buffer *sopb, gpr_slice slice);
+/* Append a GRPC_OP_FLOW_CTL_CB to a buffer */
+void grpc_sopb_add_flow_ctl_cb(grpc_stream_op_buffer *sopb,
+                               void (*cb)(void *arg, grpc_op_error error),
+                               void *arg);
 /* Append a buffer to a buffer - does not ref/unref any internal objects */
 void grpc_sopb_append(grpc_stream_op_buffer *sopb, grpc_stream_op *ops,
                       size_t nops);
diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h
index f31011e56a..264245d351 100644
--- a/src/core/transport/transport.h
+++ b/src/core/transport/transport.h
@@ -113,21 +113,6 @@ int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream,
 void grpc_transport_destroy_stream(grpc_transport *transport,
                                    grpc_stream *stream);
 
-/* Enable/disable incoming data for a stream.
-
-   This effectively disables new window becoming available for a given stream,
-   but does not prevent existing window from being consumed by a sender: the
-   caller must still be prepared to receive some additional data after this
-   call.
-
-   Arguments:
-     transport - the transport on which to create this stream
-     stream    - the grpc_stream to destroy (memory is still owned by the
-                 caller, but any child memory must be cleaned up)
-     allow     - is it allowed that new window be opened up? */
-void grpc_transport_set_allow_window_updates(grpc_transport *transport,
-                                             grpc_stream *stream, int allow);
-
 /* Transport op: a set of operations to perform on a transport */
 typedef struct grpc_transport_op {
   grpc_stream_op_buffer *send_ops;
-- 
GitLab