From 3ffd8220a17fd2fdf64adc66b03e4e254880471b Mon Sep 17 00:00:00 2001
From: Craig Tiller <ctiller@google.com>
Date: Mon, 21 Sep 2015 08:21:57 -0700
Subject: [PATCH] Call list progress

---
 src/core/client_config/connector.c            |   5 +-
 .../client_config/resolvers/dns_resolver.c    |   2 +-
 .../resolvers/sockaddr_resolver.c             |   2 +-
 src/core/client_config/subchannel.c           |   3 +-
 src/core/client_config/subchannel_factory.c   |   5 +-
 .../merge_channel_args.c                      |   5 +-
 src/core/httpcli/httpcli.c                    |   8 +-
 src/core/iomgr/tcp_client_posix.c             |   1 -
 src/core/iomgr/workqueue.h                    |   3 +-
 src/core/surface/call.c                       | 299 ++++++++++++------
 src/core/surface/call.h                       |  18 +-
 src/core/surface/channel.c                    |   4 +-
 src/core/surface/channel.h                    |   2 +-
 src/core/surface/completion_queue.h           |   3 +-
 src/core/surface/secure_channel_create.c      |   2 +-
 src/core/surface/server.c                     |   2 +-
 16 files changed, 229 insertions(+), 135 deletions(-)

diff --git a/src/core/client_config/connector.c b/src/core/client_config/connector.c
index 31f0b84efe..a0f346faeb 100644
--- a/src/core/client_config/connector.c
+++ b/src/core/client_config/connector.c
@@ -37,8 +37,9 @@ void grpc_connector_ref(grpc_connector *connector) {
   connector->vtable->ref(connector);
 }
 
-void grpc_connector_unref(grpc_connector *connector) {
-  connector->vtable->unref(connector);
+void grpc_connector_unref(grpc_connector *connector,
+                          grpc_call_list *call_list) {
+  connector->vtable->unref(connector, call_list);
 }
 
 void grpc_connector_connect(grpc_connector *connector,
diff --git a/src/core/client_config/resolvers/dns_resolver.c b/src/core/client_config/resolvers/dns_resolver.c
index 9e9b3d4917..5750db4b43 100644
--- a/src/core/client_config/resolvers/dns_resolver.c
+++ b/src/core/client_config/resolvers/dns_resolver.c
@@ -145,7 +145,7 @@ static void dns_on_resolved(void *arg, grpc_resolved_addresses *addresses,
       args.addr = (struct sockaddr *)(addresses->addrs[i].addr);
       args.addr_len = (size_t)addresses->addrs[i].len;
       subchannels[i] = grpc_subchannel_factory_create_subchannel(
-          r->subchannel_factory, &args);
+          r->subchannel_factory, &args, call_list);
     }
     memset(&lb_policy_args, 0, sizeof(lb_policy_args));
     lb_policy_args.subchannels = subchannels;
diff --git a/src/core/client_config/resolvers/sockaddr_resolver.c b/src/core/client_config/resolvers/sockaddr_resolver.c
index 15eb60b93a..38293b0f13 100644
--- a/src/core/client_config/resolvers/sockaddr_resolver.c
+++ b/src/core/client_config/resolvers/sockaddr_resolver.c
@@ -139,7 +139,7 @@ static void sockaddr_maybe_finish_next_locked(sockaddr_resolver *r,
       args.addr = (struct sockaddr *)&r->addrs[i];
       args.addr_len = r->addrs_len[i];
       subchannels[i] = grpc_subchannel_factory_create_subchannel(
-          r->subchannel_factory, &args);
+          r->subchannel_factory, &args, call_list);
     }
     memset(&lb_policy_args, 0, sizeof(lb_policy_args));
     lb_policy_args.subchannels = subchannels;
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index 937b3cd71c..bae705f9c3 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -263,7 +263,7 @@ static void subchannel_destroy(grpc_subchannel *c, grpc_call_list *call_list) {
   gpr_free(c->addr);
   grpc_mdctx_unref(c->mdctx);
   grpc_connectivity_state_destroy(&c->state_tracker);
-  grpc_connector_unref(c->connector);
+  grpc_connector_unref(c->connector, call_list);
   gpr_free(c);
 }
 
@@ -320,7 +320,6 @@ static void continue_connect(grpc_subchannel *c, grpc_call_list *call_list) {
   args.addr_len = c->addr_len;
   args.deadline = compute_connect_deadline(c);
   args.channel_args = c->args;
-  args.metadata_context = c->mdctx;
 
   grpc_connector_connect(c->connector, &args, &c->connecting_result,
                          &c->connected, call_list);
diff --git a/src/core/client_config/subchannel_factory.c b/src/core/client_config/subchannel_factory.c
index f71386594c..2a569aba13 100644
--- a/src/core/client_config/subchannel_factory.c
+++ b/src/core/client_config/subchannel_factory.c
@@ -41,6 +41,7 @@ void grpc_subchannel_factory_unref(grpc_subchannel_factory *factory) {
 }
 
 grpc_subchannel *grpc_subchannel_factory_create_subchannel(
-    grpc_subchannel_factory *factory, grpc_subchannel_args *args) {
-  return factory->vtable->create_subchannel(factory, args);
+    grpc_subchannel_factory *factory, grpc_subchannel_args *args,
+    grpc_call_list *call_list) {
+  return factory->vtable->create_subchannel(factory, args, call_list);
 }
diff --git a/src/core/client_config/subchannel_factory_decorators/merge_channel_args.c b/src/core/client_config/subchannel_factory_decorators/merge_channel_args.c
index c1b5507fde..b2c9797b1a 100644
--- a/src/core/client_config/subchannel_factory_decorators/merge_channel_args.c
+++ b/src/core/client_config/subchannel_factory_decorators/merge_channel_args.c
@@ -57,13 +57,14 @@ static void merge_args_factory_unref(grpc_subchannel_factory *scf) {
 }
 
 static grpc_subchannel *merge_args_factory_create_subchannel(
-    grpc_subchannel_factory *scf, grpc_subchannel_args *args) {
+    grpc_subchannel_factory *scf, grpc_subchannel_args *args,
+    grpc_call_list *call_list) {
   merge_args_factory *f = (merge_args_factory *)scf;
   grpc_channel_args *final_args =
       grpc_channel_args_merge(args->args, f->merge_args);
   grpc_subchannel *s;
   args->args = final_args;
-  s = grpc_subchannel_factory_create_subchannel(f->wrapped, args);
+  s = grpc_subchannel_factory_create_subchannel(f->wrapped, args, call_list);
   grpc_channel_args_destroy(final_args);
   return s;
 }
diff --git a/src/core/httpcli/httpcli.c b/src/core/httpcli/httpcli.c
index e35fa20953..40ea2e9688 100644
--- a/src/core/httpcli/httpcli.c
+++ b/src/core/httpcli/httpcli.c
@@ -66,7 +66,6 @@ typedef struct {
   grpc_closure on_read;
   grpc_closure done_write;
   grpc_closure connected;
-  grpc_workqueue *workqueue;
 } internal_request;
 
 static grpc_httpcli_get_override g_get_override = NULL;
@@ -111,7 +110,6 @@ static void finish(internal_request *req, int success,
   grpc_iomgr_unregister_object(&req->iomgr_obj);
   gpr_slice_buffer_destroy(&req->incoming);
   gpr_slice_buffer_destroy(&req->outgoing);
-  GRPC_WORKQUEUE_UNREF(req->workqueue, "destroy");
   gpr_free(req);
 }
 
@@ -196,8 +194,8 @@ static void next_address(internal_request *req, grpc_call_list *call_list) {
   addr = &req->addresses->addrs[req->next_address++];
   grpc_closure_init(&req->connected, on_connected, req);
   grpc_tcp_client_connect(&req->connected, &req->ep, &req->context->pollset_set,
-                          req->workqueue, (struct sockaddr *)&addr->addr,
-                          addr->len, req->deadline, call_list);
+                          (struct sockaddr *)&addr->addr, addr->len,
+                          req->deadline, call_list);
 }
 
 static void on_resolved(void *arg, grpc_resolved_addresses *addresses,
@@ -234,8 +232,6 @@ static void internal_request_begin(
   gpr_slice_buffer_init(&req->outgoing);
   grpc_iomgr_register_object(&req->iomgr_obj, name);
   req->host = gpr_strdup(request->host);
-  req->workqueue = grpc_workqueue_create(call_list);
-  grpc_workqueue_add_to_pollset(req->workqueue, pollset);
 
   grpc_pollset_set_add_pollset(&req->context->pollset_set, req->pollset,
                                call_list);
diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c
index a4828201c7..fed832fce6 100644
--- a/src/core/iomgr/tcp_client_posix.c
+++ b/src/core/iomgr/tcp_client_posix.c
@@ -205,7 +205,6 @@ finish:
 
 void grpc_tcp_client_connect(grpc_closure *closure, grpc_endpoint **ep,
                              grpc_pollset_set *interested_parties,
-                             grpc_workqueue *workqueue,
                              const struct sockaddr *addr, size_t addr_len,
                              gpr_timespec deadline, grpc_call_list *call_list) {
   int fd;
diff --git a/src/core/iomgr/workqueue.h b/src/core/iomgr/workqueue.h
index b9d2a87dca..ea7031a9f3 100644
--- a/src/core/iomgr/workqueue.h
+++ b/src/core/iomgr/workqueue.h
@@ -73,7 +73,8 @@ void grpc_workqueue_unref(grpc_workqueue *workqueue, grpc_call_list *call_list);
 
 /** Bind this workqueue to a pollset */
 void grpc_workqueue_add_to_pollset(grpc_workqueue *workqueue,
-                                   grpc_pollset *pollset);
+                                   grpc_pollset *pollset,
+                                   grpc_call_list *call_list);
 
 /** Add a work item to a workqueue */
 void grpc_workqueue_push(grpc_workqueue *workqueue, grpc_closure *closure,
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index b5a4155e02..f2f8f0a6ed 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -163,8 +163,6 @@ struct grpc_call {
   gpr_uint8 bound_pollset;
   /* is an error status set */
   gpr_uint8 error_status_set;
-  /** should the alarm be cancelled */
-  gpr_uint8 cancel_alarm;
   /** bitmask of allocated completion events in completions */
   gpr_uint8 allocated_completions;
   /** flag indicating that cancellation is inherited */
@@ -278,19 +276,25 @@ struct grpc_call {
 #define CALL_FROM_TOP_ELEM(top_elem) \
   CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem))
 
-static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline);
-static void call_on_done_recv(void *call, int success);
-static void call_on_done_send(void *call, int success);
+static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline,
+                               grpc_call_list *call_list);
+static void call_on_done_recv(void *call, int success,
+                              grpc_call_list *call_list);
+static void call_on_done_send(void *call, int success,
+                              grpc_call_list *call_list);
 static int fill_send_ops(grpc_call *call, grpc_transport_stream_op *op);
-static void execute_op(grpc_call *call, grpc_transport_stream_op *op);
-static void recv_metadata(grpc_call *call, grpc_metadata_batch *metadata);
+static void execute_op(grpc_call *call, grpc_transport_stream_op *op,
+                       grpc_call_list *call_list);
+static void recv_metadata(grpc_call *call, grpc_metadata_batch *metadata,
+                          grpc_call_list *call_list);
 static void finish_read_ops(grpc_call *call);
 static grpc_call_error cancel_with_status(grpc_call *c, grpc_status_code status,
                                           const char *description);
-static void finished_loose_op(void *call, int success);
+static void finished_loose_op(void *call, int success,
+                              grpc_call_list *call_list);
 
 static void lock(grpc_call *call);
-static void unlock(grpc_call *call);
+static void unlock(grpc_call *call, grpc_call_list *call_list);
 
 grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call,
                             gpr_uint32 propagation_mask,
@@ -303,6 +307,7 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call,
   grpc_transport_stream_op initial_op;
   grpc_transport_stream_op *initial_op_ptr = NULL;
   grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel);
+  grpc_call_list call_list = GRPC_CALL_LIST_INIT;
   grpc_call *call =
       gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size);
   memset(call, 0, sizeof(grpc_call));
@@ -395,19 +400,20 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call,
   }
   if (gpr_time_cmp(send_deadline, gpr_inf_future(send_deadline.clock_type)) !=
       0) {
-    set_deadline_alarm(call, send_deadline);
+    set_deadline_alarm(call, send_deadline, &call_list);
   }
+  grpc_call_list_run(&call_list);
   return call;
 }
 
-void grpc_call_set_completion_queue(grpc_call *call,
-                                    grpc_completion_queue *cq) {
+void grpc_call_set_completion_queue(grpc_call *call, grpc_completion_queue *cq,
+                                    grpc_call_list *call_list) {
   lock(call);
   call->cq = cq;
   if (cq) {
     GRPC_CQ_INTERNAL_REF(cq, "bind");
   }
-  unlock(call);
+  unlock(call, call_list);
 }
 
 grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call) {
@@ -429,13 +435,14 @@ static grpc_cq_completion *allocate_completion(grpc_call *call) {
   abort();
 }
 
-static void done_completion(void *call, grpc_cq_completion *completion) {
+static void done_completion(void *call, grpc_cq_completion *completion,
+                            grpc_call_list *call_list) {
   grpc_call *c = call;
   gpr_mu_lock(&c->completion_mu);
   c->allocated_completions &=
       (gpr_uint8) ~(1u << (completion - c->completions));
   gpr_mu_unlock(&c->completion_mu);
-  GRPC_CALL_INTERNAL_UNREF(c, "completion", 1);
+  GRPC_CALL_INTERNAL_UNREF(c, "completion", call_list);
 }
 
 #ifdef GRPC_CALL_REF_COUNT_DEBUG
@@ -448,10 +455,10 @@ void grpc_call_internal_ref(grpc_call *c) {
   gpr_ref(&c->internal_refcount);
 }
 
-static void destroy_call(void *call, int ignored_success) {
+static void destroy_call(grpc_call *call, grpc_call_list *call_list) {
   size_t i;
   grpc_call *c = call;
-  grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c));
+  grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c), call_list);
   GRPC_CHANNEL_INTERNAL_UNREF(c->channel, "call");
   gpr_mu_destroy(&c->mu);
   gpr_mu_destroy(&c->completion_mu);
@@ -487,21 +494,14 @@ static void destroy_call(void *call, int ignored_success) {
 
 #ifdef GRPC_CALL_REF_COUNT_DEBUG
 void grpc_call_internal_unref(grpc_call *c, const char *reason,
-                              int allow_immediate_deletion) {
+                              grpc_call_list *call_list) {
   gpr_log(GPR_DEBUG, "CALL: unref %p %d -> %d [%s]", c,
           c->internal_refcount.count, c->internal_refcount.count - 1, reason);
 #else
-void grpc_call_internal_unref(grpc_call *c, int allow_immediate_deletion) {
+void grpc_call_internal_unref(grpc_call *c, grpc_call_list *call_list) {
 #endif
   if (gpr_unref(&c->internal_refcount)) {
-    if (allow_immediate_deletion) {
-      destroy_call(c, 1);
-    } else {
-      c->destroy_closure.cb = destroy_call;
-      c->destroy_closure.cb_arg = c;
-      grpc_workqueue_push(grpc_channel_get_workqueue(c->channel),
-                          &c->destroy_closure, 1);
-    }
+    destroy_call(c, call_list);
   }
 }
 
@@ -600,7 +600,7 @@ static int need_more_data(grpc_call *call) {
          (call->cancel_with_status != GRPC_STATUS_OK) || call->destroy_called;
 }
 
-static void unlock(grpc_call *call) {
+static void unlock(grpc_call *call, grpc_call_list *call_list) {
   grpc_transport_stream_op op;
   completed_request completed_requests[GRPC_IOREQ_OP_COUNT];
   int completing_requests = 0;
@@ -608,7 +608,6 @@ static void unlock(grpc_call *call) {
   int i;
   const size_t MAX_RECV_PEEK_AHEAD = 65536;
   size_t buffered_bytes;
-  int cancel_alarm = 0;
 
   memset(&op, 0, sizeof(op));
 
@@ -616,9 +615,6 @@ static void unlock(grpc_call *call) {
   start_op = op.cancel_with_status != GRPC_STATUS_OK;
   call->cancel_with_status = GRPC_STATUS_OK; /* reset */
 
-  cancel_alarm = call->cancel_alarm;
-  call->cancel_alarm = 0;
-
   if (!call->receiving && need_more_data(call)) {
     if (grpc_bbq_empty(&call->incoming_queue) && call->reading_message) {
       op.max_recv_bytes = call->incoming_message_length -
@@ -655,7 +651,7 @@ static void unlock(grpc_call *call) {
     call->bound_pollset = 1;
     op.bind_pollset = grpc_cq_pollset(call->cq);
     grpc_workqueue_add_to_pollset(grpc_channel_get_workqueue(call->channel),
-                                  op.bind_pollset);
+                                  op.bind_pollset, call_list);
     start_op = 1;
   }
 
@@ -670,12 +666,8 @@ static void unlock(grpc_call *call) {
 
   gpr_mu_unlock(&call->mu);
 
-  if (cancel_alarm) {
-    grpc_alarm_cancel(&call->alarm);
-  }
-
   if (start_op) {
-    execute_op(call, &op);
+    execute_op(call, &op, call_list);
   }
 
   if (completing_requests > 0) {
@@ -685,8 +677,8 @@ static void unlock(grpc_call *call) {
     }
     lock(call);
     call->completing = 0;
-    unlock(call);
-    GRPC_CALL_INTERNAL_UNREF(call, "completing", 0);
+    unlock(call, call_list);
+    GRPC_CALL_INTERNAL_UNREF(call, "completing", call_list);
   }
 }
 
@@ -831,7 +823,8 @@ static void early_out_write_ops(grpc_call *call) {
   }
 }
 
-static void call_on_done_send(void *pc, int success) {
+static void call_on_done_send(void *pc, int success,
+                              grpc_call_list *call_list) {
   grpc_call *call = pc;
   lock(call);
   if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_INITIAL_METADATA)) {
@@ -854,8 +847,8 @@ static void call_on_done_send(void *pc, int success) {
   call->send_ops.nops = 0;
   call->last_send_contains = 0;
   call->sending = 0;
-  unlock(call);
-  GRPC_CALL_INTERNAL_UNREF(call, "sending", 0);
+  unlock(call, call_list);
+  GRPC_CALL_INTERNAL_UNREF(call, "sending", call_list);
 }
 
 static void finish_message(grpc_call *call) {
@@ -961,7 +954,8 @@ static int add_slice_to_message(grpc_call *call, gpr_slice slice) {
   }
 }
 
-static void call_on_done_recv(void *pc, int success) {
+static void call_on_done_recv(void *pc, int success,
+                              grpc_call_list *call_list) {
   grpc_call *call = pc;
   grpc_call *child_call;
   grpc_call *next_child_call;
@@ -976,7 +970,7 @@ static void call_on_done_recv(void *pc, int success) {
         case GRPC_NO_OP:
           break;
         case GRPC_OP_METADATA:
-          recv_metadata(call, &op->data.metadata);
+          recv_metadata(call, &op->data.metadata, call_list);
           break;
         case GRPC_OP_BEGIN_MESSAGE:
           success = begin_message(call, op->data.begin_message);
@@ -997,7 +991,9 @@ static void call_on_done_recv(void *pc, int success) {
     if (call->recv_state == GRPC_STREAM_CLOSED) {
       GPR_ASSERT(call->read_state <= READ_STATE_STREAM_CLOSED);
       call->read_state = READ_STATE_STREAM_CLOSED;
-      call->cancel_alarm |= call->have_alarm;
+      if (call->have_alarm) {
+        grpc_alarm_cancel(&call->alarm, call_list);
+      }
       /* propagate cancellation to any interested children */
       child_call = call->first_child;
       if (child_call != NULL) {
@@ -1006,12 +1002,12 @@ static void call_on_done_recv(void *pc, int success) {
           if (child_call->cancellation_is_inherited) {
             GRPC_CALL_INTERNAL_REF(child_call, "propagate_cancel");
             grpc_call_cancel(child_call, NULL);
-            GRPC_CALL_INTERNAL_UNREF(child_call, "propagate_cancel", 0);
+            GRPC_CALL_INTERNAL_UNREF(child_call, "propagate_cancel", call_list);
           }
           child_call = next_child_call;
         } while (child_call != call->first_child);
       }
-      GRPC_CALL_INTERNAL_UNREF(call, "closed", 0);
+      GRPC_CALL_INTERNAL_UNREF(call, "closed", call_list);
     }
     finish_read_ops(call);
   } else {
@@ -1023,9 +1019,9 @@ static void call_on_done_recv(void *pc, int success) {
     finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS_DETAILS, 0);
   }
   call->recv_ops.nops = 0;
-  unlock(call);
+  unlock(call, call_list);
 
-  GRPC_CALL_INTERNAL_UNREF(call, "receiving", 0);
+  GRPC_CALL_INTERNAL_UNREF(call, "receiving", call_list);
   GRPC_TIMER_END(GRPC_PTAG_CALL_ON_DONE_RECV, 0);
 }
 
@@ -1277,17 +1273,19 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
 
 grpc_call_error grpc_call_start_ioreq_and_call_back(
     grpc_call *call, const grpc_ioreq *reqs, size_t nreqs,
-    grpc_ioreq_completion_func on_complete, void *user_data) {
+    grpc_ioreq_completion_func on_complete, void *user_data,
+    grpc_call_list *call_list) {
   grpc_call_error err;
   lock(call);
   err = start_ioreq(call, reqs, nreqs, on_complete, user_data);
-  unlock(call);
+  unlock(call, call_list);
   return err;
 }
 
 void grpc_call_destroy(grpc_call *c) {
   int cancel;
   grpc_call *parent = c->parent;
+  grpc_call_list call_list = GRPC_CALL_LIST_INIT;
 
   if (parent) {
     gpr_mu_lock(&parent->mu);
@@ -1300,17 +1298,20 @@ void grpc_call_destroy(grpc_call *c) {
       c->sibling_next->sibling_prev = c->sibling_prev;
     }
     gpr_mu_unlock(&parent->mu);
-    GRPC_CALL_INTERNAL_UNREF(parent, "child", 1);
+    GRPC_CALL_INTERNAL_UNREF(parent, "child", &call_list);
   }
 
   lock(c);
   GPR_ASSERT(!c->destroy_called);
   c->destroy_called = 1;
-  c->cancel_alarm |= c->have_alarm;
+  if (c->have_alarm) {
+    grpc_alarm_cancel(&c->alarm, &call_list);
+  }
   cancel = c->read_state != READ_STATE_STREAM_CLOSED;
-  unlock(c);
+  unlock(c, &call_list);
   if (cancel) grpc_call_cancel(c, NULL);
-  GRPC_CALL_INTERNAL_UNREF(c, "destroy", 1);
+  GRPC_CALL_INTERNAL_UNREF(c, "destroy", &call_list);
+  grpc_call_list_run(&call_list);
 }
 
 grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) {
@@ -1324,10 +1325,12 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
                                              const char *description,
                                              void *reserved) {
   grpc_call_error r;
-  (void)reserved;
+  grpc_call_list call_list = GRPC_CALL_LIST_INIT;
+  GPR_ASSERT(reserved == NULL);
   lock(c);
   r = cancel_with_status(c, status, description);
-  unlock(c);
+  unlock(c, &call_list);
+  grpc_call_list_run(&call_list);
   return r;
 }
 
@@ -1347,8 +1350,9 @@ static grpc_call_error cancel_with_status(grpc_call *c, grpc_status_code status,
   return GRPC_CALL_OK;
 }
 
-static void finished_loose_op(void *call, int success_ignored) {
-  GRPC_CALL_INTERNAL_UNREF(call, "loose-op", 0);
+static void finished_loose_op(void *call, int success_ignored,
+                              grpc_call_list *call_list) {
+  GRPC_CALL_INTERNAL_UNREF(call, "loose-op", call_list);
 }
 
 typedef struct {
@@ -1356,13 +1360,15 @@ typedef struct {
   grpc_closure closure;
 } finished_loose_op_allocated_args;
 
-static void finished_loose_op_allocated(void *alloc, int success) {
+static void finished_loose_op_allocated(void *alloc, int success,
+                                        grpc_call_list *call_list) {
   finished_loose_op_allocated_args *args = alloc;
-  finished_loose_op(args->call, success);
+  finished_loose_op(args->call, success, call_list);
   gpr_free(args);
 }
 
-static void execute_op(grpc_call *call, grpc_transport_stream_op *op) {
+static void execute_op(grpc_call *call, grpc_transport_stream_op *op,
+                       grpc_call_list *call_list) {
   grpc_call_element *elem;
 
   GPR_ASSERT(op->on_consumed == NULL);
@@ -1380,19 +1386,22 @@ static void execute_op(grpc_call *call, grpc_transport_stream_op *op) {
 
   elem = CALL_ELEM_FROM_CALL(call, 0);
   op->context = call->context;
-  elem->filter->start_transport_stream_op(elem, op);
+  elem->filter->start_transport_stream_op(elem, op, call_list);
 }
 
 char *grpc_call_get_peer(grpc_call *call) {
   grpc_call_element *elem = CALL_ELEM_FROM_CALL(call, 0);
-  return elem->filter->get_peer(elem);
+  grpc_call_list call_list = GRPC_CALL_LIST_INIT;
+  char *result = elem->filter->get_peer(elem, &call_list);
+  grpc_call_list_run(&call_list);
+  return result;
 }
 
 grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
   return CALL_FROM_TOP_ELEM(elem);
 }
 
-static void call_alarm(void *arg, int success) {
+static void call_alarm(void *arg, int success, grpc_call_list *call_list) {
   grpc_call *call = arg;
   lock(call);
   call->have_alarm = 0;
@@ -1401,11 +1410,12 @@ static void call_alarm(void *arg, int success) {
                        "Deadline Exceeded");
   }
   finish_read_ops(call);
-  unlock(call);
-  GRPC_CALL_INTERNAL_UNREF(call, "alarm", 1);
+  unlock(call, call_list);
+  GRPC_CALL_INTERNAL_UNREF(call, "alarm", call_list);
 }
 
-static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline) {
+static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline,
+                               grpc_call_list *call_list) {
   if (call->have_alarm) {
     gpr_log(GPR_ERROR, "Attempt to set deadline alarm twice");
     assert(0);
@@ -1415,7 +1425,7 @@ static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline) {
   call->have_alarm = 1;
   call->send_deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
   grpc_alarm_init(&call->alarm, call->send_deadline, call_alarm, call,
-                  gpr_now(GPR_CLOCK_MONOTONIC));
+                  gpr_now(GPR_CLOCK_MONOTONIC), call_list);
 }
 
 /* we offset status by a small amount when storing it into transport metadata
@@ -1466,7 +1476,8 @@ static gpr_uint32 decode_compression(grpc_mdelem *md) {
   return algorithm;
 }
 
-static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) {
+static void recv_metadata(grpc_call *call, grpc_metadata_batch *md,
+                          grpc_call_list *call_list) {
   grpc_linked_mdelem *l;
   grpc_metadata_array *dest;
   grpc_metadata *mdusr;
@@ -1513,7 +1524,7 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) {
   if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) !=
           0 &&
       !call->is_client) {
-    set_deadline_alarm(call, md->deadline);
+    set_deadline_alarm(call, md->deadline, call_list);
   }
   if (!is_trailing) {
     call->read_state = READ_STATE_GOT_INITIAL_METADATA;
@@ -1571,8 +1582,13 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
   const grpc_op *op;
   grpc_ioreq *req;
   void (*finish_func)(grpc_call *, int, void *) = finish_batch;
+  grpc_call_error error;
+  grpc_call_list call_list = GRPC_CALL_LIST_INIT;
 
-  if (reserved != NULL) return GRPC_CALL_ERROR;
+  if (reserved != NULL) {
+    error = GRPC_CALL_ERROR;
+    goto done;
+  }
 
   GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, tag);
 
@@ -1581,19 +1597,29 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
     GRPC_CALL_INTERNAL_REF(call, "completion");
     grpc_cq_end_op(call->cq, tag, 1, done_completion, call,
                    allocate_completion(call));
-    return GRPC_CALL_OK;
+    error = GRPC_CALL_OK;
+    goto done;
   }
 
   /* rewrite batch ops into ioreq ops */
   for (in = 0, out = 0; in < nops; in++) {
     op = &ops[in];
-    if (op->reserved != NULL) return GRPC_CALL_ERROR;
+    if (op->reserved != NULL) {
+      error = GRPC_CALL_ERROR;
+      goto done;
+    }
     switch (op->op) {
       case GRPC_OP_SEND_INITIAL_METADATA:
         /* Flag validation: currently allow no flags */
-        if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
+        if (op->flags != 0) {
+          error = GRPC_CALL_ERROR_INVALID_FLAGS;
+          goto done;
+        }
         req = &reqs[out++];
-        if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
+        if (out > GRPC_IOREQ_OP_COUNT) {
+          error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
+          goto done;
+        }
         req->op = GRPC_IOREQ_SEND_INITIAL_METADATA;
         req->data.send_metadata.count = op->data.send_initial_metadata.count;
         req->data.send_metadata.metadata =
@@ -1602,36 +1628,55 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
         break;
       case GRPC_OP_SEND_MESSAGE:
         if (!are_write_flags_valid(op->flags)) {
-          return GRPC_CALL_ERROR_INVALID_FLAGS;
+          error = GRPC_CALL_ERROR_INVALID_FLAGS;
+          goto done;
         }
         if (op->data.send_message == NULL) {
-          return GRPC_CALL_ERROR_INVALID_MESSAGE;
+          error = GRPC_CALL_ERROR_INVALID_MESSAGE;
+          goto done;
         }
         req = &reqs[out++];
-        if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
+        if (out > GRPC_IOREQ_OP_COUNT) {
+          error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
+          goto done;
+        }
         req->op = GRPC_IOREQ_SEND_MESSAGE;
         req->data.send_message = op->data.send_message;
         req->flags = op->flags;
         break;
       case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
         /* Flag validation: currently allow no flags */
-        if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
+        if (op->flags != 0) {
+          error = GRPC_CALL_ERROR_INVALID_FLAGS;
+          goto done;
+        }
         if (!call->is_client) {
-          return GRPC_CALL_ERROR_NOT_ON_SERVER;
+          error = GRPC_CALL_ERROR_NOT_ON_SERVER;
+          goto done;
         }
         req = &reqs[out++];
-        if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
+        if (out > GRPC_IOREQ_OP_COUNT) {
+          error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
+          goto done;
+        }
         req->op = GRPC_IOREQ_SEND_CLOSE;
         req->flags = op->flags;
         break;
       case GRPC_OP_SEND_STATUS_FROM_SERVER:
         /* Flag validation: currently allow no flags */
-        if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
+        if (op->flags != 0) {
+          error = GRPC_CALL_ERROR_INVALID_FLAGS;
+          goto done;
+        }
         if (call->is_client) {
-          return GRPC_CALL_ERROR_NOT_ON_CLIENT;
+          error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
+          goto done;
         }
         req = &reqs[out++];
-        if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
+        if (out > GRPC_IOREQ_OP_COUNT) {
+          error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
+          goto done;
+        }
         req->op = GRPC_IOREQ_SEND_TRAILING_METADATA;
         req->flags = op->flags;
         req->data.send_metadata.count =
@@ -1639,7 +1684,10 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
         req->data.send_metadata.metadata =
             op->data.send_status_from_server.trailing_metadata;
         req = &reqs[out++];
-        if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
+        if (out > GRPC_IOREQ_OP_COUNT) {
+          error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
+          goto done;
+        }
         req->op = GRPC_IOREQ_SEND_STATUS;
         req->data.send_status.code = op->data.send_status_from_server.status;
         req->data.send_status.details =
@@ -1649,17 +1697,27 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
                       op->data.send_status_from_server.status_details, 0)
                 : NULL;
         req = &reqs[out++];
-        if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
+        if (out > GRPC_IOREQ_OP_COUNT) {
+          error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
+          goto done;
+        }
         req->op = GRPC_IOREQ_SEND_CLOSE;
         break;
       case GRPC_OP_RECV_INITIAL_METADATA:
         /* Flag validation: currently allow no flags */
-        if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
+        if (op->flags != 0) {
+          error = GRPC_CALL_ERROR_INVALID_FLAGS;
+          goto done;
+        }
         if (!call->is_client) {
-          return GRPC_CALL_ERROR_NOT_ON_SERVER;
+          error = GRPC_CALL_ERROR_NOT_ON_SERVER;
+          goto done;
         }
         req = &reqs[out++];
-        if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
+        if (out > GRPC_IOREQ_OP_COUNT) {
+          error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
+          goto done;
+        }
         req->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
         req->data.recv_metadata = op->data.recv_initial_metadata;
         req->data.recv_metadata->count = 0;
@@ -1667,55 +1725,86 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
         break;
       case GRPC_OP_RECV_MESSAGE:
         /* Flag validation: currently allow no flags */
-        if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
+        if (op->flags != 0) {
+          error = GRPC_CALL_ERROR_INVALID_FLAGS;
+          goto done;
+        }
         req = &reqs[out++];
-        if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
+        if (out > GRPC_IOREQ_OP_COUNT) {
+          error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
+          goto done;
+        }
         req->op = GRPC_IOREQ_RECV_MESSAGE;
         req->data.recv_message = op->data.recv_message;
         req->flags = op->flags;
         break;
       case GRPC_OP_RECV_STATUS_ON_CLIENT:
         /* Flag validation: currently allow no flags */
-        if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
+        if (op->flags != 0) {
+          error = GRPC_CALL_ERROR_INVALID_FLAGS;
+          goto done;
+        }
         if (!call->is_client) {
-          return GRPC_CALL_ERROR_NOT_ON_SERVER;
+          error = GRPC_CALL_ERROR_NOT_ON_SERVER;
+          goto done;
         }
         req = &reqs[out++];
-        if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
+        if (out > GRPC_IOREQ_OP_COUNT) {
+          error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
+          goto done;
+        }
         req->op = GRPC_IOREQ_RECV_STATUS;
         req->flags = op->flags;
         req->data.recv_status.set_value = set_status_value_directly;
         req->data.recv_status.user_data = op->data.recv_status_on_client.status;
         req = &reqs[out++];
-        if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
+        if (out > GRPC_IOREQ_OP_COUNT) {
+          error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
+          goto done;
+        }
         req->op = GRPC_IOREQ_RECV_STATUS_DETAILS;
         req->data.recv_status_details.details =
             op->data.recv_status_on_client.status_details;
         req->data.recv_status_details.details_capacity =
             op->data.recv_status_on_client.status_details_capacity;
         req = &reqs[out++];
-        if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
+        if (out > GRPC_IOREQ_OP_COUNT) {
+          error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
+          goto done;
+        }
         req->op = GRPC_IOREQ_RECV_TRAILING_METADATA;
         req->data.recv_metadata =
             op->data.recv_status_on_client.trailing_metadata;
         req->data.recv_metadata->count = 0;
         req = &reqs[out++];
-        if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
+        if (out > GRPC_IOREQ_OP_COUNT) {
+          error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
+          goto done;
+        }
         req->op = GRPC_IOREQ_RECV_CLOSE;
         finish_func = finish_batch_with_close;
         break;
       case GRPC_OP_RECV_CLOSE_ON_SERVER:
         /* Flag validation: currently allow no flags */
-        if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
+        if (op->flags != 0) {
+          error = GRPC_CALL_ERROR_INVALID_FLAGS;
+          goto done;
+        }
         req = &reqs[out++];
-        if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
+        if (out > GRPC_IOREQ_OP_COUNT) {
+          error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
+          goto done;
+        }
         req->op = GRPC_IOREQ_RECV_STATUS;
         req->flags = op->flags;
         req->data.recv_status.set_value = set_cancelled_value;
         req->data.recv_status.user_data =
             op->data.recv_close_on_server.cancelled;
         req = &reqs[out++];
-        if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
+        if (out > GRPC_IOREQ_OP_COUNT) {
+          error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
+          goto done;
+        }
         req->op = GRPC_IOREQ_RECV_CLOSE;
         finish_func = finish_batch_with_close;
         break;
@@ -1725,7 +1814,11 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
   GRPC_CALL_INTERNAL_REF(call, "completion");
   grpc_cq_begin_op(call->cq);
 
-  return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_func, tag);
+  error = grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_func, tag,
+                                              &call_list);
+done:
+  grpc_call_list_run(&call_list);
+  return error;
 }
 
 void grpc_call_context_set(grpc_call *call, grpc_context_index elem,
diff --git a/src/core/surface/call.h b/src/core/surface/call.h
index 00638e43b5..7a7178bc7b 100644
--- a/src/core/surface/call.h
+++ b/src/core/surface/call.h
@@ -97,28 +97,30 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call,
                             size_t add_initial_metadata_count,
                             gpr_timespec send_deadline);
 
-void grpc_call_set_completion_queue(grpc_call *call, grpc_completion_queue *cq);
+void grpc_call_set_completion_queue(grpc_call *call, grpc_completion_queue *cq,
+                                    grpc_call_list *call_list);
 grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call);
 
 #ifdef GRPC_CALL_REF_COUNT_DEBUG
 void grpc_call_internal_ref(grpc_call *call, const char *reason);
 void grpc_call_internal_unref(grpc_call *call, const char *reason,
-                              int allow_immediate_deletion);
+                              grpc_call_list *call_list);
 #define GRPC_CALL_INTERNAL_REF(call, reason) \
   grpc_call_internal_ref(call, reason)
-#define GRPC_CALL_INTERNAL_UNREF(call, reason, allow_immediate_deletion) \
-  grpc_call_internal_unref(call, reason, allow_immediate_deletion)
+#define GRPC_CALL_INTERNAL_UNREF(call, reason, call_list) \
+  grpc_call_internal_unref(call, reason, call_list)
 #else
 void grpc_call_internal_ref(grpc_call *call);
-void grpc_call_internal_unref(grpc_call *call, int allow_immediate_deletion);
+void grpc_call_internal_unref(grpc_call *call, grpc_call_list *call_list);
 #define GRPC_CALL_INTERNAL_REF(call, reason) grpc_call_internal_ref(call)
-#define GRPC_CALL_INTERNAL_UNREF(call, reason, allow_immediate_deletion) \
-  grpc_call_internal_unref(call, allow_immediate_deletion)
+#define GRPC_CALL_INTERNAL_UNREF(call, reason, call_list) \
+  grpc_call_internal_unref(call, call_list)
 #endif
 
 grpc_call_error grpc_call_start_ioreq_and_call_back(
     grpc_call *call, const grpc_ioreq *reqs, size_t nreqs,
-    grpc_ioreq_completion_func on_complete, void *user_data);
+    grpc_ioreq_completion_func on_complete, void *user_data,
+    grpc_call_list *call_list);
 
 grpc_call_stack *grpc_call_get_call_stack(grpc_call *call);
 
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index 6f49060be5..fdba09fcce 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -93,7 +93,7 @@ struct grpc_channel {
 grpc_channel *grpc_channel_create_from_filters(
     const char *target, const grpc_channel_filter **filters, size_t num_filters,
     const grpc_channel_args *args, grpc_mdctx *mdctx, grpc_workqueue *workqueue,
-    int is_client) {
+    int is_client, grpc_call_list *call_list) {
   size_t i;
   size_t size =
       sizeof(grpc_channel) + grpc_channel_stack_size(filters, num_filters);
@@ -181,7 +181,7 @@ grpc_channel *grpc_channel_create_from_filters(
 
   grpc_channel_stack_init(filters, num_filters, channel, args,
                           channel->metadata_context,
-                          CHANNEL_STACK_FROM_CHANNEL(channel));
+                          CHANNEL_STACK_FROM_CHANNEL(channel), call_list);
 
   return channel;
 }
diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h
index 9fc821d64b..664ecc1c5a 100644
--- a/src/core/surface/channel.h
+++ b/src/core/surface/channel.h
@@ -41,7 +41,7 @@
 grpc_channel *grpc_channel_create_from_filters(
     const char *target, const grpc_channel_filter **filters, size_t count,
     const grpc_channel_args *args, grpc_mdctx *mdctx, grpc_workqueue *workqueue,
-    int is_client);
+    int is_client, grpc_call_list *call_list);
 
 /** Get a (borrowed) pointer to this channels underlying channel stack */
 grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel);
diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h
index 74dc09e36e..793baff03a 100644
--- a/src/core/surface/completion_queue.h
+++ b/src/core/surface/completion_queue.h
@@ -72,7 +72,8 @@ void grpc_cq_begin_op(grpc_completion_queue *cc);
 
 /* Queue a GRPC_OP_COMPLETED operation */
 void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success,
-                    void (*done)(void *done_arg, grpc_cq_completion *storage),
+                    void (*done)(void *done_arg, grpc_cq_completion *storage,
+                                 grpc_call_list *call_list),
                     void *done_arg, grpc_cq_completion *storage);
 
 grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc);
diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c
index 24d8b3aa2c..4a75d03f0a 100644
--- a/src/core/surface/secure_channel_create.c
+++ b/src/core/surface/secure_channel_create.c
@@ -291,7 +291,7 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds,
   f->merge_args = grpc_channel_args_copy(args_copy);
   f->master = channel;
   GRPC_CHANNEL_INTERNAL_REF(channel, "subchannel_factory");
-  resolver = grpc_resolver_create(target, &f->base, workqueue);
+  resolver = grpc_resolver_create(target, &f->base);
   if (!resolver) {
     return NULL;
   }
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 75600027ed..e38c6028d9 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -1309,7 +1309,7 @@ static void publish_registered_or_batch(grpc_call *call, int success,
   server_ref(chand->server);
   grpc_cq_end_op(calld->cq_new, rc->tag, success, done_request_event, rc,
                  &rc->completion);
-  GRPC_CALL_INTERNAL_UNREF(call, "server", 0);
+  GRPC_CALL_INTERNAL_UNREF(call, "server", call_list);
 }
 
 const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) {
-- 
GitLab