From 6e7b45ed215fe113abed17002fddf5635c97549c Mon Sep 17 00:00:00 2001
From: Craig Tiller <ctiller@google.com>
Date: Fri, 8 Jul 2016 17:25:49 -0700
Subject: [PATCH] Make transport_stream_ops all be heap allocated

---
 src/core/lib/channel/channel_stack.c          | 23 ++++---
 src/core/lib/channel/compress_filter.c        | 14 ++---
 .../security/transport/server_auth_filter.c   | 27 ++++----
 src/core/lib/surface/call.c                   | 63 ++++++++++---------
 4 files changed, 71 insertions(+), 56 deletions(-)

diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c
index 87175d7943..e1375348a9 100644
--- a/src/core/lib/channel/channel_stack.c
+++ b/src/core/lib/channel/channel_stack.c
@@ -32,6 +32,7 @@
  */
 
 #include "src/core/lib/channel/channel_stack.h"
+#include <grpc/support/alloc.h>
 #include <grpc/support/log.h>
 
 #include <stdlib.h>
@@ -259,21 +260,27 @@ grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem) {
       sizeof(grpc_call_stack)));
 }
 
+static void destroy_op(grpc_exec_ctx *exec_ctx, void *op, grpc_error *error) {
+  gpr_free(op);
+}
+
 void grpc_call_element_send_cancel(grpc_exec_ctx *exec_ctx,
                                    grpc_call_element *cur_elem) {
-  grpc_transport_stream_op op;
-  memset(&op, 0, sizeof(op));
-  op.cancel_error = GRPC_ERROR_CANCELLED;
-  grpc_call_next_op(exec_ctx, cur_elem, &op);
+  grpc_transport_stream_op *op = gpr_malloc(sizeof(*op));
+  memset(op, 0, sizeof(*op));
+  op->cancel_error = GRPC_ERROR_CANCELLED;
+  op->on_complete = grpc_closure_create(destroy_op, op);
+  grpc_call_next_op(exec_ctx, cur_elem, op);
 }
 
 void grpc_call_element_send_cancel_with_message(grpc_exec_ctx *exec_ctx,
                                                 grpc_call_element *cur_elem,
                                                 grpc_status_code status,
                                                 gpr_slice *optional_message) {
-  grpc_transport_stream_op op;
-  memset(&op, 0, sizeof(op));
-  grpc_transport_stream_op_add_cancellation_with_message(&op, status,
+  grpc_transport_stream_op *op = gpr_malloc(sizeof(*op));
+  memset(op, 0, sizeof(*op));
+  op->on_complete = grpc_closure_create(destroy_op, op);
+  grpc_transport_stream_op_add_cancellation_with_message(op, status,
                                                          optional_message);
-  grpc_call_next_op(exec_ctx, cur_elem, &op);
+  grpc_call_next_op(exec_ctx, cur_elem, op);
 }
diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c
index 32ebe53ee6..d4b1cf0296 100644
--- a/src/core/lib/channel/compress_filter.c
+++ b/src/core/lib/channel/compress_filter.c
@@ -60,7 +60,7 @@ typedef struct call_data {
   /** If true, contents of \a compression_algorithm are authoritative */
   int has_compression_algorithm;
 
-  grpc_transport_stream_op send_op;
+  grpc_transport_stream_op *send_op;
   uint32_t send_length;
   uint32_t send_flags;
   gpr_slice incoming_slice;
@@ -199,11 +199,11 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx,
 
   grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices,
                                 calld->send_flags);
-  calld->send_op.send_message = &calld->replacement_stream.base;
-  calld->post_send = calld->send_op.on_complete;
-  calld->send_op.on_complete = &calld->send_done;
+  calld->send_op->send_message = &calld->replacement_stream.base;
+  calld->post_send = calld->send_op->on_complete;
+  calld->send_op->on_complete = &calld->send_done;
 
-  grpc_call_next_op(exec_ctx, elem, &calld->send_op);
+  grpc_call_next_op(exec_ctx, elem, calld->send_op);
 }
 
 static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
@@ -220,7 +220,7 @@ static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
 static void continue_send_message(grpc_exec_ctx *exec_ctx,
                                   grpc_call_element *elem) {
   call_data *calld = elem->call_data;
-  while (grpc_byte_stream_next(exec_ctx, calld->send_op.send_message,
+  while (grpc_byte_stream_next(exec_ctx, calld->send_op->send_message,
                                &calld->incoming_slice, ~(size_t)0,
                                &calld->got_slice)) {
     gpr_slice_buffer_add(&calld->slices, calld->incoming_slice);
@@ -243,7 +243,7 @@ static void compress_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
   }
   if (op->send_message != NULL && !skip_compression(elem) &&
       0 == (op->send_message->flags & GRPC_WRITE_NO_COMPRESS)) {
-    calld->send_op = *op;
+    calld->send_op = op;
     calld->send_length = op->send_message->length;
     calld->send_flags = op->send_message->flags;
     continue_send_message(exec_ctx, elem);
diff --git a/src/core/lib/security/transport/server_auth_filter.c b/src/core/lib/security/transport/server_auth_filter.c
index 12e789bde9..1958a72b93 100644
--- a/src/core/lib/security/transport/server_auth_filter.c
+++ b/src/core/lib/security/transport/server_auth_filter.c
@@ -48,7 +48,7 @@ typedef struct call_data {
      up-call on transport_op, and remember to call our on_done_recv member after
      handling it. */
   grpc_closure auth_on_recv;
-  grpc_transport_stream_op transport_op;
+  grpc_transport_stream_op *transport_op;
   grpc_metadata_array md;
   const grpc_metadata *consumed_md;
   size_t num_consumed_md;
@@ -106,6 +106,10 @@ static grpc_mdelem *remove_consumed_md(void *user_data, grpc_mdelem *md) {
   return md;
 }
 
+static void destroy_op(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
+  gpr_free(arg);
+}
+
 /* called from application code */
 static void on_md_processing_done(
     void *user_data, const grpc_metadata *consumed_md, size_t num_consumed_md,
@@ -131,21 +135,22 @@ static void on_md_processing_done(
     grpc_exec_ctx_sched(&exec_ctx, calld->on_done_recv, GRPC_ERROR_NONE, NULL);
   } else {
     gpr_slice message;
-    grpc_transport_stream_op close_op;
-    memset(&close_op, 0, sizeof(close_op));
+    grpc_transport_stream_op *close_op = gpr_malloc(sizeof(*close_op));
+    memset(close_op, 0, sizeof(*close_op));
     grpc_metadata_array_destroy(&calld->md);
     error_details = error_details != NULL
                         ? error_details
                         : "Authentication metadata processing failed.";
     message = gpr_slice_from_copied_string(error_details);
-    calld->transport_op.send_initial_metadata = NULL;
-    if (calld->transport_op.send_message != NULL) {
-      grpc_byte_stream_destroy(&exec_ctx, calld->transport_op.send_message);
-      calld->transport_op.send_message = NULL;
+    calld->transport_op->send_initial_metadata = NULL;
+    if (calld->transport_op->send_message != NULL) {
+      grpc_byte_stream_destroy(&exec_ctx, calld->transport_op->send_message);
+      calld->transport_op->send_message = NULL;
     }
-    calld->transport_op.send_trailing_metadata = NULL;
-    grpc_transport_stream_op_add_close(&close_op, status, &message);
-    grpc_call_next_op(&exec_ctx, elem, &close_op);
+    calld->transport_op->send_trailing_metadata = NULL;
+    close_op->on_complete = grpc_closure_create(destroy_op, close_op);
+    grpc_transport_stream_op_add_close(close_op, status, &message);
+    grpc_call_next_op(&exec_ctx, elem, close_op);
     grpc_exec_ctx_sched(&exec_ctx, calld->on_done_recv,
                         grpc_error_set_int(GRPC_ERROR_CREATE(error_details),
                                            GRPC_ERROR_INT_GRPC_STATUS, status),
@@ -182,7 +187,7 @@ static void set_recv_ops_md_callbacks(grpc_call_element *elem,
     calld->recv_initial_metadata = op->recv_initial_metadata;
     calld->on_done_recv = op->recv_initial_metadata_ready;
     op->recv_initial_metadata_ready = &calld->auth_on_recv;
-    calld->transport_op = *op;
+    calld->transport_op = op;
   }
 }
 
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index e5668be47f..7739a395b9 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -109,6 +109,10 @@ typedef struct batch_control {
   uint8_t recv_message;
   uint8_t recv_final_op;
   uint8_t is_notify_tag_closure;
+
+  /* TODO(ctiller): now that this is inlined, figure out how much of the above
+                    state can be eliminated */
+  grpc_transport_stream_op op;
 } batch_control;
 
 struct grpc_call {
@@ -761,6 +765,7 @@ typedef struct termination_closure {
   grpc_error *error;
   grpc_closure *op_closure;
   enum { TC_CANCEL, TC_CLOSE } type;
+  grpc_transport_stream_op op;
 } termination_closure;
 
 static void done_termination(grpc_exec_ctx *exec_ctx, void *tcp,
@@ -780,26 +785,24 @@ static void done_termination(grpc_exec_ctx *exec_ctx, void *tcp,
 }
 
 static void send_cancel(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) {
-  grpc_transport_stream_op op;
   termination_closure *tc = tcp;
-  memset(&op, 0, sizeof(op));
-  op.cancel_error = tc->error;
+  memset(&tc->op, 0, sizeof(tc->op));
+  tc->op.cancel_error = tc->error;
   /* reuse closure to catch completion */
   grpc_closure_init(&tc->closure, done_termination, tc);
-  op.on_complete = &tc->closure;
-  execute_op(exec_ctx, tc->call, &op);
+  tc->op.on_complete = &tc->closure;
+  execute_op(exec_ctx, tc->call, &tc->op);
 }
 
 static void send_close(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) {
-  grpc_transport_stream_op op;
   termination_closure *tc = tcp;
-  memset(&op, 0, sizeof(op));
-  op.close_error = tc->error;
+  memset(&tc->op, 0, sizeof(tc->op));
+  tc->op.close_error = tc->error;
   /* reuse closure to catch completion */
   grpc_closure_init(&tc->closure, done_termination, tc);
-  tc->op_closure = op.on_complete;
-  op.on_complete = &tc->closure;
-  execute_op(exec_ctx, tc->call, &op);
+  tc->op_closure = tc->op.on_complete;
+  tc->op.on_complete = &tc->closure;
+  execute_op(exec_ctx, tc->call, &tc->op);
 }
 
 static grpc_call_error terminate_with_status(grpc_exec_ctx *exec_ctx,
@@ -1353,7 +1356,6 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
                                         grpc_call *call, const grpc_op *ops,
                                         size_t nops, void *notify_tag,
                                         int is_notify_tag_closure) {
-  grpc_transport_stream_op stream_op;
   size_t i;
   const grpc_op *op;
   batch_control *bctl;
@@ -1364,8 +1366,6 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
 
   GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, notify_tag);
 
-  memset(&stream_op, 0, sizeof(stream_op));
-
   /* TODO(ctiller): this feels like it could be made lock-free */
   gpr_mu_lock(&call->mu);
   bctl = allocate_batch_control(call);
@@ -1374,6 +1374,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
   bctl->notify_tag = notify_tag;
   bctl->is_notify_tag_closure = (uint8_t)(is_notify_tag_closure != 0);
 
+  grpc_transport_stream_op *stream_op = &bctl->op;
+  memset(stream_op, 0, sizeof(*stream_op));
+
   if (nops == 0) {
     GRPC_CALL_INTERNAL_REF(call, "completion");
     bctl->error = GRPC_ERROR_NONE;
@@ -1452,9 +1455,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
         }
         /* TODO(ctiller): just make these the same variable? */
         call->metadata_batch[0][0].deadline = call->send_deadline;
-        stream_op.send_initial_metadata =
+        stream_op->send_initial_metadata =
             &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */];
-        stream_op.send_initial_metadata_flags = op->flags;
+        stream_op->send_initial_metadata_flags = op->flags;
         break;
       case GRPC_OP_SEND_MESSAGE:
         if (!are_write_flags_valid(op->flags)) {
@@ -1474,7 +1477,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
         grpc_slice_buffer_stream_init(
             &call->sending_stream,
             &op->data.send_message->data.raw.slice_buffer, op->flags);
-        stream_op.send_message = &call->sending_stream.base;
+        stream_op->send_message = &call->sending_stream.base;
         break;
       case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
         /* Flag validation: currently allow no flags */
@@ -1492,7 +1495,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
         }
         bctl->send_final_op = 1;
         call->sent_final_op = 1;
-        stream_op.send_trailing_metadata =
+        stream_op->send_trailing_metadata =
             &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
         break;
       case GRPC_OP_SEND_STATUS_FROM_SERVER:
@@ -1539,7 +1542,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
           error = GRPC_CALL_ERROR_INVALID_METADATA;
           goto done_with_error;
         }
-        stream_op.send_trailing_metadata =
+        stream_op->send_trailing_metadata =
             &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
         break;
       case GRPC_OP_RECV_INITIAL_METADATA:
@@ -1557,9 +1560,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
         grpc_closure_init(&call->receiving_initial_metadata_ready,
                           receiving_initial_metadata_ready, bctl);
         bctl->recv_initial_metadata = 1;
-        stream_op.recv_initial_metadata =
+        stream_op->recv_initial_metadata =
             &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
-        stream_op.recv_initial_metadata_ready =
+        stream_op->recv_initial_metadata_ready =
             &call->receiving_initial_metadata_ready;
         num_completion_callbacks_needed++;
         break;
@@ -1576,10 +1579,10 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
         call->receiving_message = 1;
         bctl->recv_message = 1;
         call->receiving_buffer = op->data.recv_message;
-        stream_op.recv_message = &call->receiving_stream;
+        stream_op->recv_message = &call->receiving_stream;
         grpc_closure_init(&call->receiving_stream_ready, receiving_stream_ready,
                           bctl);
-        stream_op.recv_message_ready = &call->receiving_stream_ready;
+        stream_op->recv_message_ready = &call->receiving_stream_ready;
         num_completion_callbacks_needed++;
         break;
       case GRPC_OP_RECV_STATUS_ON_CLIENT:
@@ -1605,9 +1608,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
         call->final_op.client.status_details_capacity =
             op->data.recv_status_on_client.status_details_capacity;
         bctl->recv_final_op = 1;
-        stream_op.recv_trailing_metadata =
+        stream_op->recv_trailing_metadata =
             &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
-        stream_op.collect_stats = &call->stats.transport_stream_stats;
+        stream_op->collect_stats = &call->stats.transport_stream_stats;
         break;
       case GRPC_OP_RECV_CLOSE_ON_SERVER:
         /* Flag validation: currently allow no flags */
@@ -1627,9 +1630,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
         call->final_op.server.cancelled =
             op->data.recv_close_on_server.cancelled;
         bctl->recv_final_op = 1;
-        stream_op.recv_trailing_metadata =
+        stream_op->recv_trailing_metadata =
             &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
-        stream_op.collect_stats = &call->stats.transport_stream_stats;
+        stream_op->collect_stats = &call->stats.transport_stream_stats;
         break;
     }
   }
@@ -1640,12 +1643,12 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
   }
   gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed);
 
-  stream_op.context = call->context;
+  stream_op->context = call->context;
   grpc_closure_init(&bctl->finish_batch, finish_batch, bctl);
-  stream_op.on_complete = &bctl->finish_batch;
+  stream_op->on_complete = &bctl->finish_batch;
   gpr_mu_unlock(&call->mu);
 
-  execute_op(exec_ctx, call, &stream_op);
+  execute_op(exec_ctx, call, stream_op);
 
 done:
   GPR_TIMER_END("grpc_call_start_batch", 0);
-- 
GitLab