From 47a708e252c8f56091c11e63eadba51a995ca7c8 Mon Sep 17 00:00:00 2001
From: Craig Tiller <ctiller@google.com>
Date: Tue, 15 Sep 2015 16:16:06 -0700
Subject: [PATCH] Core compiles with workqueues

---
 src/core/channel/client_channel.c             |  5 +-
 src/core/client_config/connector.h            |  2 +
 .../client_config/lb_policies/pick_first.c    |  4 +-
 .../resolvers/zookeeper_resolver.c            | 34 +++++----
 src/core/client_config/subchannel.c           |  7 +-
 src/core/httpcli/httpcli.c                    | 73 +++++++++----------
 src/core/iomgr/tcp_client_posix.c             |  3 +-
 src/core/iomgr/tcp_posix.c                    |  2 +-
 src/core/iomgr/tcp_server_posix.c             |  9 ++-
 src/core/iomgr/udp_server.c                   |  6 +-
 src/core/iomgr/workqueue_posix.c              |  6 +-
 src/core/security/server_secure_chttp2.c      | 11 ++-
 src/core/surface/call.c                       |  3 +-
 src/core/surface/channel.c                    |  7 +-
 src/core/surface/channel.h                    |  3 +-
 src/core/surface/channel_connectivity.c       |  3 +-
 src/core/surface/channel_create.c             | 17 +++--
 src/core/surface/lame_client.c                |  3 +-
 src/core/surface/secure_channel_create.c      | 21 +++---
 src/core/surface/server.c                     | 31 +++++---
 src/core/surface/server.h                     |  1 +
 src/core/surface/server_chttp2.c              |  9 ++-
 src/core/transport/chttp2/frame_ping.c        |  4 +-
 src/core/transport/chttp2_transport.c         | 10 +--
 src/core/transport/chttp2_transport.h         |  3 +-
 src/core/transport/connectivity_state.c       | 15 ++--
 src/core/transport/connectivity_state.h       |  4 +
 test/core/bad_client/bad_client.c             |  5 +-
 28 files changed, 178 insertions(+), 123 deletions(-)

diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 2624fcdd53..6fefdec2f6 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -670,8 +670,9 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
   grpc_iomgr_closure_init(&chand->on_config_changed, cc_on_config_changed,
                           chand);
 
-  grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
-                               "client_channel");
+  grpc_connectivity_state_init(&chand->state_tracker,
+                               grpc_channel_get_workqueue(master),
+                               GRPC_CHANNEL_IDLE, "client_channel");
 }
 
 /* Destructor for channel_data */
diff --git a/src/core/client_config/connector.h b/src/core/client_config/connector.h
index 39f3467990..bdaeda86ae 100644
--- a/src/core/client_config/connector.h
+++ b/src/core/client_config/connector.h
@@ -57,6 +57,8 @@ typedef struct {
   const grpc_channel_args *channel_args;
   /** metadata context */
   grpc_mdctx *metadata_context;
+  /** workqueue */
+  grpc_workqueue *workqueue;
 } grpc_connect_in_args;
 
 typedef struct {
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c
index 151b6f12f8..06186403e5 100644
--- a/src/core/client_config/lb_policies/pick_first.c
+++ b/src/core/client_config/lb_policies/pick_first.c
@@ -332,8 +332,8 @@ static grpc_lb_policy *create_pick_first(grpc_lb_policy_factory *factory,
   p->num_subchannels = args->num_subchannels;
   p->workqueue = args->workqueue;
   grpc_workqueue_ref(p->workqueue);
-  grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE,
-                               "pick_first");
+  grpc_connectivity_state_init(&p->state_tracker, args->workqueue,
+                               GRPC_CHANNEL_IDLE, "pick_first");
   memcpy(p->subchannels, args->subchannels,
          sizeof(grpc_subchannel *) * args->num_subchannels);
   grpc_iomgr_closure_init(&p->connectivity_changed, pf_connectivity_changed, p);
diff --git a/src/core/client_config/resolvers/zookeeper_resolver.c b/src/core/client_config/resolvers/zookeeper_resolver.c
index 2594e6fae9..bc04203744 100644
--- a/src/core/client_config/resolvers/zookeeper_resolver.c
+++ b/src/core/client_config/resolvers/zookeeper_resolver.c
@@ -61,6 +61,8 @@ typedef struct {
   grpc_subchannel_factory *subchannel_factory;
   /** load balancing policy name */
   char *lb_policy_name;
+  /** work queue */
+  grpc_workqueue *workqueue;
 
   /** mutex guarding the rest of the state */
   gpr_mu mu;
@@ -108,7 +110,7 @@ static void zookeeper_shutdown(grpc_resolver *resolver) {
   gpr_mu_lock(&r->mu);
   if (r->next_completion != NULL) {
     *r->target_config = NULL;
-    grpc_iomgr_add_callback(r->next_completion);
+    grpc_workqueue_push(r->workqueue, r->next_completion, 1);
     r->next_completion = NULL;
   }
   zookeeper_close(r->zookeeper_handle);
@@ -409,7 +411,7 @@ static void zookeeper_maybe_finish_next_locked(zookeeper_resolver *r) {
     if (r->resolved_config != NULL) {
       grpc_client_config_ref(r->resolved_config);
     }
-    grpc_iomgr_add_callback(r->next_completion);
+    grpc_workqueue_push(r->workqueue, r->next_completion, 1);
     r->next_completion = NULL;
     r->published_version = r->resolved_version;
   }
@@ -422,19 +424,19 @@ static void zookeeper_destroy(grpc_resolver *gr) {
     grpc_client_config_unref(r->resolved_config);
   }
   grpc_subchannel_factory_unref(r->subchannel_factory);
+  grpc_workqueue_unref(r->workqueue);
   gpr_free(r->name);
   gpr_free(r->lb_policy_name);
   gpr_free(r);
 }
 
-static grpc_resolver *zookeeper_create(
-    grpc_uri *uri, const char *lb_policy_name,
-    grpc_subchannel_factory *subchannel_factory) {
+static grpc_resolver *zookeeper_create(grpc_resolver_args *args,
+                                       const char *lb_policy_name) {
   zookeeper_resolver *r;
   size_t length;
-  char *path = uri->path;
+  char *path = args->uri->path;
 
-  if (0 == strcmp(uri->authority, "")) {
+  if (0 == strcmp(args->uri->authority, "")) {
     gpr_log(GPR_ERROR, "No authority specified in zookeeper uri");
     return NULL;
   }
@@ -452,14 +454,19 @@ static grpc_resolver *zookeeper_create(
   grpc_resolver_init(&r->base, &zookeeper_resolver_vtable);
   r->name = gpr_strdup(path);
 
-  r->subchannel_factory = subchannel_factory;
+  r->workqueue = args->workqueue;
+  grpc_workqueue_ref(r->workqueue);
+
+  r->subchannel_factory = args->subchannel_factory;
+  grpc_subchannel_factory_ref(r->subchannel_factory);
+
   r->lb_policy_name = gpr_strdup(lb_policy_name);
-  grpc_subchannel_factory_ref(subchannel_factory);
 
   /** Initializes zookeeper client */
   zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
-  r->zookeeper_handle = zookeeper_init(uri->authority, zookeeper_global_watcher,
-                                       GRPC_ZOOKEEPER_SESSION_TIMEOUT, 0, 0, 0);
+  r->zookeeper_handle =
+      zookeeper_init(args->uri->authority, zookeeper_global_watcher,
+                     GRPC_ZOOKEEPER_SESSION_TIMEOUT, 0, 0, 0);
   if (r->zookeeper_handle == NULL) {
     gpr_log(GPR_ERROR, "Unable to connect to zookeeper server");
     return NULL;
@@ -490,9 +497,8 @@ static char *zookeeper_factory_get_default_hostname(
 }
 
 static grpc_resolver *zookeeper_factory_create_resolver(
-    grpc_resolver_factory *factory, grpc_uri *uri,
-    grpc_subchannel_factory *subchannel_factory) {
-  return zookeeper_create(uri, "pick_first", subchannel_factory);
+    grpc_resolver_factory *factory, grpc_resolver_args *args) {
+  return zookeeper_create(args, "pick_first");
 }
 
 static const grpc_resolver_factory_vtable zookeeper_factory_vtable = {
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index 0718ffbb8c..82212d2c6b 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -260,6 +260,7 @@ static void subchannel_destroy(grpc_subchannel *c) {
   grpc_mdctx_unref(c->mdctx);
   grpc_connectivity_state_destroy(&c->state_tracker);
   grpc_connector_unref(c->connector);
+  grpc_workqueue_unref(c->workqueue);
   gpr_free(c);
 }
 
@@ -296,12 +297,14 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
   c->args = grpc_channel_args_copy(args->args);
   c->mdctx = args->mdctx;
   c->master = args->master;
+  c->workqueue = grpc_channel_get_workqueue(c->master);
+  grpc_workqueue_ref(c->workqueue);
   c->pollset_set = grpc_client_channel_get_connecting_pollset_set(parent_elem);
   c->random = random_seed();
   grpc_mdctx_ref(c->mdctx);
   grpc_iomgr_closure_init(&c->connected, subchannel_connected, c);
-  grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE,
-                               "subchannel");
+  grpc_connectivity_state_init(&c->state_tracker, c->workqueue,
+                               GRPC_CHANNEL_IDLE, "subchannel");
   gpr_mu_init(&c->mu);
   return c;
 }
diff --git a/src/core/httpcli/httpcli.c b/src/core/httpcli/httpcli.c
index 1e38479eb1..4bfe3cf973 100644
--- a/src/core/httpcli/httpcli.c
+++ b/src/core/httpcli/httpcli.c
@@ -65,6 +65,7 @@ typedef struct {
   gpr_slice_buffer outgoing;
   grpc_iomgr_closure on_read;
   grpc_iomgr_closure done_write;
+  grpc_workqueue *workqueue;
 } internal_request;
 
 static grpc_httpcli_get_override g_get_override = NULL;
@@ -105,6 +106,7 @@ static void finish(internal_request *req, int success) {
   grpc_iomgr_unregister_object(&req->iomgr_obj);
   gpr_slice_buffer_destroy(&req->incoming);
   gpr_slice_buffer_destroy(&req->outgoing);
+  grpc_workqueue_unref(req->workqueue);
   gpr_free(req);
 }
 
@@ -202,8 +204,8 @@ static void next_address(internal_request *req) {
   }
   addr = &req->addresses->addrs[req->next_address++];
   grpc_tcp_client_connect(on_connected, req, &req->context->pollset_set,
-                          (struct sockaddr *)&addr->addr, addr->len,
-                          req->deadline);
+                          req->workqueue, (struct sockaddr *)&addr->addr,
+                          addr->len, req->deadline);
 }
 
 static void on_resolved(void *arg, grpc_resolved_addresses *addresses) {
@@ -217,19 +219,16 @@ static void on_resolved(void *arg, grpc_resolved_addresses *addresses) {
   next_address(req);
 }
 
-void grpc_httpcli_get(grpc_httpcli_context *context, grpc_pollset *pollset,
-                      const grpc_httpcli_request *request,
-                      gpr_timespec deadline,
-                      grpc_httpcli_response_cb on_response, void *user_data) {
-  internal_request *req;
-  char *name;
-  if (g_get_override &&
-      g_get_override(request, deadline, on_response, user_data)) {
-    return;
-  }
-  req = gpr_malloc(sizeof(internal_request));
+static void internal_request_begin(grpc_httpcli_context *context,
+                                   grpc_pollset *pollset,
+                                   const grpc_httpcli_request *request,
+                                   gpr_timespec deadline,
+                                   grpc_httpcli_response_cb on_response,
+                                   void *user_data, const char *name,
+                                   gpr_slice request_text) {
+  internal_request *req = gpr_malloc(sizeof(internal_request));
   memset(req, 0, sizeof(*req));
-  req->request_text = grpc_httpcli_format_get_request(request);
+  req->request_text = request_text;
   grpc_httpcli_parser_init(&req->parser);
   req->on_response = on_response;
   req->user_data = user_data;
@@ -242,51 +241,47 @@ void grpc_httpcli_get(grpc_httpcli_context *context, grpc_pollset *pollset,
   grpc_iomgr_closure_init(&req->done_write, done_write, req);
   gpr_slice_buffer_init(&req->incoming);
   gpr_slice_buffer_init(&req->outgoing);
-  gpr_asprintf(&name, "HTTP:GET:%s:%s", request->host, request->path);
   grpc_iomgr_register_object(&req->iomgr_obj, name);
-  gpr_free(name);
   req->host = gpr_strdup(request->host);
+  req->workqueue = grpc_workqueue_create();
+  grpc_workqueue_add_to_pollset(req->workqueue, pollset);
 
   grpc_pollset_set_add_pollset(&req->context->pollset_set, req->pollset);
   grpc_resolve_address(request->host, req->handshaker->default_port,
                        on_resolved, req);
 }
 
+void grpc_httpcli_get(grpc_httpcli_context *context, grpc_pollset *pollset,
+                      const grpc_httpcli_request *request,
+                      gpr_timespec deadline,
+                      grpc_httpcli_response_cb on_response, void *user_data) {
+  char *name;
+  if (g_get_override &&
+      g_get_override(request, deadline, on_response, user_data)) {
+    return;
+  }
+  gpr_asprintf(&name, "HTTP:GET:%s:%s", request->host, request->path);
+  internal_request_begin(context, pollset, request, deadline, on_response,
+                         user_data, name,
+                         grpc_httpcli_format_get_request(request));
+  gpr_free(name);
+}
+
 void grpc_httpcli_post(grpc_httpcli_context *context, grpc_pollset *pollset,
                        const grpc_httpcli_request *request,
                        const char *body_bytes, size_t body_size,
                        gpr_timespec deadline,
                        grpc_httpcli_response_cb on_response, void *user_data) {
-  internal_request *req;
   char *name;
   if (g_post_override && g_post_override(request, body_bytes, body_size,
                                          deadline, on_response, user_data)) {
     return;
   }
-  req = gpr_malloc(sizeof(internal_request));
-  memset(req, 0, sizeof(*req));
-  req->request_text =
-      grpc_httpcli_format_post_request(request, body_bytes, body_size);
-  grpc_httpcli_parser_init(&req->parser);
-  req->on_response = on_response;
-  req->user_data = user_data;
-  req->deadline = deadline;
-  req->handshaker =
-      request->handshaker ? request->handshaker : &grpc_httpcli_plaintext;
-  req->context = context;
-  req->pollset = pollset;
-  grpc_iomgr_closure_init(&req->on_read, on_read, req);
-  grpc_iomgr_closure_init(&req->done_write, done_write, req);
-  gpr_slice_buffer_init(&req->incoming);
-  gpr_slice_buffer_init(&req->outgoing);
   gpr_asprintf(&name, "HTTP:POST:%s:%s", request->host, request->path);
-  grpc_iomgr_register_object(&req->iomgr_obj, name);
+  internal_request_begin(
+      context, pollset, request, deadline, on_response, user_data, name,
+      grpc_httpcli_format_post_request(request, body_bytes, body_size));
   gpr_free(name);
-  req->host = gpr_strdup(request->host);
-
-  grpc_pollset_set_add_pollset(&req->context->pollset_set, req->pollset);
-  grpc_resolve_address(request->host, req->handshaker->default_port,
-                       on_resolved, req);
 }
 
 void grpc_httpcli_set_override(grpc_httpcli_get_override get,
diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c
index c3668f6a92..8b1a3b0f9e 100644
--- a/src/core/iomgr/tcp_client_posix.c
+++ b/src/core/iomgr/tcp_client_posix.c
@@ -195,6 +195,7 @@ finish:
 
 void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
                              void *arg, grpc_pollset_set *interested_parties,
+                             grpc_workqueue *workqueue,
                              const struct sockaddr *addr, size_t addr_len,
                              gpr_timespec deadline) {
   int fd;
@@ -236,7 +237,7 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
   addr_str = grpc_sockaddr_to_uri(addr);
   gpr_asprintf(&name, "tcp-client:%s", addr_str);
 
-  fdobj = grpc_fd_create(fd, name);
+  fdobj = grpc_fd_create(fd, workqueue, name);
 
   if (err >= 0) {
     cb(arg, grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str));
diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c
index 68f469c368..c539cf2d34 100644
--- a/src/core/iomgr/tcp_posix.c
+++ b/src/core/iomgr/tcp_posix.c
@@ -261,7 +261,7 @@ static grpc_endpoint_op_status tcp_read(grpc_endpoint *ep,
     tcp->finished_edge = 0;
     grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
   } else {
-    grpc_iomgr_add_delayed_callback(&tcp->read_closure, 1);
+    grpc_workqueue_push(tcp->em_fd->workqueue, &tcp->read_closure, 1);
   }
   /* TODO(ctiller): immediate return */
   return GRPC_ENDPOINT_PENDING;
diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c
index bcbd0afe6b..02d37350f7 100644
--- a/src/core/iomgr/tcp_server_posix.c
+++ b/src/core/iomgr/tcp_server_posix.c
@@ -124,6 +124,9 @@ struct grpc_tcp_server {
   grpc_pollset **pollsets;
   /* number of pollsets in the pollsets array */
   size_t pollset_count;
+
+  /** workqueue for interally created async work */
+  grpc_workqueue *workqueue;
 };
 
 grpc_tcp_server *grpc_tcp_server_create(void) {
@@ -137,6 +140,7 @@ grpc_tcp_server *grpc_tcp_server_create(void) {
   s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP);
   s->nports = 0;
   s->port_capacity = INIT_PORT_CAP;
+  s->workqueue = grpc_workqueue_create();
   return s;
 }
 
@@ -147,6 +151,7 @@ static void finish_shutdown(grpc_tcp_server *s) {
   gpr_mu_destroy(&s->mu);
 
   gpr_free(s->ports);
+  grpc_workqueue_unref(s->workqueue);
   gpr_free(s);
 }
 
@@ -339,7 +344,7 @@ static void on_read(void *arg, int success) {
     addr_str = grpc_sockaddr_to_uri((struct sockaddr *)&addr);
     gpr_asprintf(&name, "tcp-server-connection:%s", addr_str);
 
-    fdobj = grpc_fd_create(fd, name);
+    fdobj = grpc_fd_create(fd, sp->server->workqueue, name);
     /* TODO(ctiller): revise this when we have server-side sharding
        of channels -- we certainly should not be automatically adding every
        incoming channel to every pollset owned by the server */
@@ -387,7 +392,7 @@ static int add_socket_to_server(grpc_tcp_server *s, int fd,
     sp = &s->ports[s->nports++];
     sp->server = s;
     sp->fd = fd;
-    sp->emfd = grpc_fd_create(fd, name);
+    sp->emfd = grpc_fd_create(fd, s->workqueue, name);
     memcpy(sp->addr.untyped, addr, addr_len);
     sp->addr_len = addr_len;
     GPR_ASSERT(sp->emfd);
diff --git a/src/core/iomgr/udp_server.c b/src/core/iomgr/udp_server.c
index ed9eee8726..d4e8e99bce 100644
--- a/src/core/iomgr/udp_server.c
+++ b/src/core/iomgr/udp_server.c
@@ -121,6 +121,8 @@ struct grpc_udp_server {
   grpc_pollset **pollsets;
   /* number of pollsets in the pollsets array */
   size_t pollset_count;
+
+  grpc_workqueue *workqueue;
 };
 
 grpc_udp_server *grpc_udp_server_create(void) {
@@ -135,6 +137,7 @@ grpc_udp_server *grpc_udp_server_create(void) {
   s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP);
   s->nports = 0;
   s->port_capacity = INIT_PORT_CAP;
+  s->workqueue = grpc_workqueue_create();
 
   return s;
 }
@@ -146,6 +149,7 @@ static void finish_shutdown(grpc_udp_server *s) {
   gpr_cv_destroy(&s->cv);
 
   gpr_free(s->ports);
+  grpc_workqueue_unref(s->workqueue);
   gpr_free(s);
 }
 
@@ -310,7 +314,7 @@ static int add_socket_to_server(grpc_udp_server *s, int fd,
     sp = &s->ports[s->nports++];
     sp->server = s;
     sp->fd = fd;
-    sp->emfd = grpc_fd_create(fd, name);
+    sp->emfd = grpc_fd_create(fd, s->workqueue, name);
     memcpy(sp->addr.untyped, addr, addr_len);
     sp->addr_len = addr_len;
     sp->read_cb = read_cb;
diff --git a/src/core/iomgr/workqueue_posix.c b/src/core/iomgr/workqueue_posix.c
index ef1598c711..26626bef3b 100644
--- a/src/core/iomgr/workqueue_posix.c
+++ b/src/core/iomgr/workqueue_posix.c
@@ -35,6 +35,7 @@
 
 #ifdef GPR_POSIX_SOCKET
 
+#include "src/core/iomgr/fd_posix.h"
 #include "src/core/iomgr/workqueue.h"
 
 #include <stdio.h>
@@ -52,8 +53,9 @@ grpc_workqueue *grpc_workqueue_create(void) {
   workqueue->tail = &workqueue->head;
   grpc_wakeup_fd_init(&workqueue->wakeup_fd);
   sprintf(name, "workqueue:%p", (void *)workqueue);
-  workqueue->wakeup_read_fd =
-      grpc_fd_create(GRPC_WAKEUP_FD_GET_READ_FD(&workqueue->wakeup_fd), name);
+  workqueue->wakeup_read_fd = NULL; /* inspected during grpc_fd_create below */
+  workqueue->wakeup_read_fd = grpc_fd_create(
+      GRPC_WAKEUP_FD_GET_READ_FD(&workqueue->wakeup_fd), workqueue, name);
   grpc_iomgr_closure_init(&workqueue->read_closure, on_readable, workqueue);
   grpc_fd_notify_on_read(workqueue->wakeup_read_fd, &workqueue->read_closure);
   return workqueue;
diff --git a/src/core/security/server_secure_chttp2.c b/src/core/security/server_secure_chttp2.c
index 4749f5f516..a6f50712f5 100644
--- a/src/core/security/server_secure_chttp2.c
+++ b/src/core/security/server_secure_chttp2.c
@@ -85,7 +85,7 @@ static void state_unref(grpc_server_secure_state *state) {
 }
 
 static void setup_transport(void *statep, grpc_transport *transport,
-                            grpc_mdctx *mdctx) {
+                            grpc_mdctx *mdctx, grpc_workqueue *workqueue) {
   static grpc_channel_filter const *extra_filters[] = {
       &grpc_server_auth_filter, &grpc_http_server_filter};
   grpc_server_secure_state *state = statep;
@@ -98,7 +98,8 @@ static void setup_transport(void *statep, grpc_transport *transport,
       grpc_server_get_channel_args(state->server), args_to_add,
       GPR_ARRAY_SIZE(args_to_add));
   grpc_server_setup_transport(state->server, transport, extra_filters,
-                              GPR_ARRAY_SIZE(extra_filters), mdctx, args_copy);
+                              GPR_ARRAY_SIZE(extra_filters), mdctx, workqueue,
+                              args_copy);
   grpc_channel_args_destroy(args_copy);
 }
 
@@ -130,15 +131,17 @@ static void on_secure_transport_setup_done(void *statep,
   grpc_server_secure_state *state = statep;
   grpc_transport *transport;
   grpc_mdctx *mdctx;
+  grpc_workqueue *workqueue;
   if (status == GRPC_SECURITY_OK) {
     gpr_mu_lock(&state->mu);
     remove_tcp_from_list_locked(state, wrapped_endpoint);
     if (!state->is_shutdown) {
       mdctx = grpc_mdctx_create();
+      workqueue = grpc_workqueue_create();
       transport = grpc_create_chttp2_transport(
           grpc_server_get_channel_args(state->server), secure_endpoint, mdctx,
-          0);
-      setup_transport(state, transport, mdctx);
+          workqueue, 0);
+      setup_transport(state, transport, mdctx, workqueue);
       grpc_chttp2_transport_start_reading(transport, NULL, 0);
     } else {
       /* We need to consume this here, because the server may already have gone
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 4168c2ef0c..c2b3040319 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -499,7 +499,8 @@ void grpc_call_internal_unref(grpc_call *c, int allow_immediate_deletion) {
     } else {
       c->destroy_closure.cb = destroy_call;
       c->destroy_closure.cb_arg = c;
-      grpc_iomgr_add_callback(&c->destroy_closure);
+      grpc_workqueue_push(grpc_channel_get_workqueue(c->channel),
+                          &c->destroy_closure, 1);
     }
   }
 }
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index a89523b3ab..bf4aee190f 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -79,6 +79,7 @@ struct grpc_channel {
   registered_call *registered_calls;
   grpc_iomgr_closure destroy_closure;
   char *target;
+  grpc_workqueue *workqueue;
 };
 
 #define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c) + 1))
@@ -92,7 +93,8 @@ struct grpc_channel {
 
 grpc_channel *grpc_channel_create_from_filters(
     const char *target, const grpc_channel_filter **filters, size_t num_filters,
-    const grpc_channel_args *args, grpc_mdctx *mdctx, int is_client) {
+    const grpc_channel_args *args, grpc_mdctx *mdctx, grpc_workqueue *workqueue,
+    int is_client) {
   size_t i;
   size_t size =
       sizeof(grpc_channel) + grpc_channel_stack_size(filters, num_filters);
@@ -104,6 +106,7 @@ grpc_channel *grpc_channel_create_from_filters(
   /* decremented by grpc_channel_destroy */
   gpr_ref_init(&channel->refs, 1);
   channel->metadata_context = mdctx;
+  channel->workqueue = workqueue;
   channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status", 0);
   channel->grpc_compression_algorithm_string =
       grpc_mdstr_from_string(mdctx, "grpc-encoding", 0);
@@ -311,7 +314,7 @@ void grpc_channel_internal_unref(grpc_channel *channel) {
   if (gpr_unref(&channel->refs)) {
     channel->destroy_closure.cb = destroy_channel;
     channel->destroy_closure.cb_arg = channel;
-    grpc_iomgr_add_callback(&channel->destroy_closure);
+    grpc_workqueue_push(channel->workqueue, &channel->destroy_closure, 1);
   }
 }
 
diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h
index 593faec7df..9fc821d64b 100644
--- a/src/core/surface/channel.h
+++ b/src/core/surface/channel.h
@@ -40,7 +40,8 @@
 
 grpc_channel *grpc_channel_create_from_filters(
     const char *target, const grpc_channel_filter **filters, size_t count,
-    const grpc_channel_args *args, grpc_mdctx *mdctx, int is_client);
+    const grpc_channel_args *args, grpc_mdctx *mdctx, grpc_workqueue *workqueue,
+    int is_client);
 
 /** Get a (borrowed) pointer to this channels underlying channel stack */
 grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel);
diff --git a/src/core/surface/channel_connectivity.c b/src/core/surface/channel_connectivity.c
index 88a7c16598..12b15f353f 100644
--- a/src/core/surface/channel_connectivity.c
+++ b/src/core/surface/channel_connectivity.c
@@ -176,7 +176,8 @@ void grpc_channel_watch_connectivity_state(
             "grpc_channel_watch_connectivity_state called on something that is "
             "not a client channel, but '%s'",
             client_channel_elem->filter->name);
-    grpc_iomgr_add_delayed_callback(&w->on_complete, 1);
+    grpc_workqueue_push(grpc_channel_get_workqueue(channel), &w->on_complete,
+                        1);
   } else {
     GRPC_CHANNEL_INTERNAL_REF(channel, "watch_connectivity");
     grpc_client_channel_add_interested_party(client_channel_elem,
diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c
index 9e2cf1cf66..7a4ec00abb 100644
--- a/src/core/surface/channel_create.c
+++ b/src/core/surface/channel_create.c
@@ -74,7 +74,8 @@ static void connected(void *arg, grpc_endpoint *tcp) {
   grpc_iomgr_closure *notify;
   if (tcp != NULL) {
     c->result->transport = grpc_create_chttp2_transport(
-        c->args.channel_args, tcp, c->args.metadata_context, 1);
+        c->args.channel_args, tcp, c->args.metadata_context, c->args.workqueue,
+        1);
     grpc_chttp2_transport_start_reading(c->result->transport, NULL, 0);
     GPR_ASSERT(c->result->transport);
     c->result->filters = gpr_malloc(sizeof(grpc_channel_filter *));
@@ -85,7 +86,7 @@ static void connected(void *arg, grpc_endpoint *tcp) {
   }
   notify = c->notify;
   c->notify = NULL;
-  grpc_iomgr_add_callback(notify);
+  notify->cb(notify->cb_arg, 1);
 }
 
 static void connector_connect(grpc_connector *con,
@@ -98,8 +99,9 @@ static void connector_connect(grpc_connector *con,
   c->notify = notify;
   c->args = *args;
   c->result = result;
-  grpc_tcp_client_connect(connected, c, args->interested_parties, args->addr,
-                          args->addr_len, args->deadline);
+  grpc_tcp_client_connect(connected, c, args->interested_parties,
+                          args->workqueue, args->addr, args->addr_len,
+                          args->deadline);
 }
 
 static const grpc_connector_vtable connector_vtable = {
@@ -164,6 +166,7 @@ grpc_channel *grpc_insecure_channel_create(const char *target,
   grpc_resolver *resolver;
   subchannel_factory *f;
   grpc_mdctx *mdctx = grpc_mdctx_create();
+  grpc_workqueue *workqueue = grpc_workqueue_create();
   size_t n = 0;
   GPR_ASSERT(!reserved);
   if (grpc_channel_args_is_census_enabled(args)) {
@@ -173,8 +176,8 @@ grpc_channel *grpc_insecure_channel_create(const char *target,
   filters[n++] = &grpc_client_channel_filter;
   GPR_ASSERT(n <= MAX_FILTERS);
 
-  channel =
-      grpc_channel_create_from_filters(target, filters, n, args, mdctx, 1);
+  channel = grpc_channel_create_from_filters(target, filters, n, args, mdctx,
+                                             workqueue, 1);
 
   f = gpr_malloc(sizeof(*f));
   f->base.vtable = &subchannel_factory_vtable;
@@ -184,7 +187,7 @@ grpc_channel *grpc_insecure_channel_create(const char *target,
   f->merge_args = grpc_channel_args_copy(args);
   f->master = channel;
   GRPC_CHANNEL_INTERNAL_REF(f->master, "subchannel_factory");
-  resolver = grpc_resolver_create(target, &f->base);
+  resolver = grpc_resolver_create(target, &f->base, workqueue);
   if (!resolver) {
     return NULL;
   }
diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c
index 80704cbf67..a5de900eff 100644
--- a/src/core/surface/lame_client.c
+++ b/src/core/surface/lame_client.c
@@ -150,7 +150,8 @@ grpc_channel *grpc_lame_client_channel_create(const char *target,
   channel_data *chand;
   static const grpc_channel_filter *filters[] = {&lame_filter};
   channel = grpc_channel_create_from_filters(target, filters, 1, NULL,
-                                             grpc_mdctx_create(), 1);
+                                             grpc_mdctx_create(),
+                                             grpc_workqueue_create(), 1);
   elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
   GPR_ASSERT(elem->filter == &lame_filter);
   chand = (channel_data *)elem->channel_data;
diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c
index d141260421..ec077af8dd 100644
--- a/src/core/surface/secure_channel_create.c
+++ b/src/core/surface/secure_channel_create.c
@@ -57,7 +57,6 @@ typedef struct {
   gpr_refcount refs;
 
   grpc_channel_security_connector *security_connector;
-  grpc_workqueue *workqueue;
 
   grpc_iomgr_closure *notify;
   grpc_connect_in_args args;
@@ -72,7 +71,6 @@ static void connector_ref(grpc_connector *con) {
 static void connector_unref(grpc_connector *con) {
   connector *c = (connector *)con;
   if (gpr_unref(&c->refs)) {
-    grpc_workqueue_unref(c->workqueue);
     gpr_free(c);
   }
 }
@@ -88,7 +86,8 @@ static void on_secure_transport_setup_done(void *arg,
     memset(c->result, 0, sizeof(*c->result));
   } else {
     c->result->transport = grpc_create_chttp2_transport(
-        c->args.channel_args, secure_endpoint, c->args.metadata_context, 1);
+        c->args.channel_args, secure_endpoint, c->args.metadata_context,
+        c->args.workqueue, 1);
     grpc_chttp2_transport_start_reading(c->result->transport, NULL, 0);
     c->result->filters = gpr_malloc(sizeof(grpc_channel_filter *) * 2);
     c->result->filters[0] = &grpc_http_client_filter;
@@ -124,8 +123,9 @@ static void connector_connect(grpc_connector *con,
   c->notify = notify;
   c->args = *args;
   c->result = result;
-  grpc_tcp_client_connect(connected, c, args->interested_parties, c->workqueue,
-                          args->addr, args->addr_len, args->deadline);
+  grpc_tcp_client_connect(connected, c, args->interested_parties,
+                          args->workqueue, args->addr, args->addr_len,
+                          args->deadline);
 }
 
 static const grpc_connector_vtable connector_vtable = {
@@ -167,8 +167,6 @@ static grpc_subchannel *subchannel_factory_create_subchannel(
   memset(c, 0, sizeof(*c));
   c->base.vtable = &connector_vtable;
   c->security_connector = f->security_connector;
-  c->workqueue = grpc_channel_get_workqueue(f->master);
-  grpc_workqueue_ref(c->workqueue);
   gpr_ref_init(&c->refs, 1);
   args->mdctx = f->mdctx;
   args->args = final_args;
@@ -197,6 +195,7 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds,
   grpc_channel_args *new_args_from_connector;
   grpc_channel_security_connector *connector;
   grpc_mdctx *mdctx;
+  grpc_workqueue *workqueue;
   grpc_resolver *resolver;
   subchannel_factory *f;
 #define MAX_FILTERS 3
@@ -219,6 +218,7 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds,
         "Failed to create security connector.");
   }
   mdctx = grpc_mdctx_create();
+  workqueue = grpc_workqueue_create();
 
   connector_arg = grpc_security_connector_to_arg(&connector->base);
   args_copy = grpc_channel_args_copy_and_add(
@@ -231,8 +231,8 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds,
   filters[n++] = &grpc_client_channel_filter;
   GPR_ASSERT(n <= MAX_FILTERS);
 
-  channel =
-      grpc_channel_create_from_filters(target, filters, n, args_copy, mdctx, 1);
+  channel = grpc_channel_create_from_filters(target, filters, n, args_copy,
+                                             mdctx, workqueue, 1);
 
   f = gpr_malloc(sizeof(*f));
   f->base.vtable = &subchannel_factory_vtable;
@@ -244,8 +244,7 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds,
   f->merge_args = grpc_channel_args_copy(args_copy);
   f->master = channel;
   GRPC_CHANNEL_INTERNAL_REF(channel, "subchannel_factory");
-  resolver = grpc_resolver_create(target, &f->base,
-                                  grpc_channel_get_workqueue(channel));
+  resolver = grpc_resolver_create(target, &f->base, workqueue);
   if (!resolver) {
     return NULL;
   }
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 3d404f78a4..aba0f94fd4 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -219,6 +219,8 @@ struct grpc_server {
 
   /** when did we print the last shutdown progress message */
   gpr_timespec last_shutdown_message_time;
+
+  grpc_workqueue *workqueue;
 };
 
 #define SERVER_FROM_CALL_ELEM(elem) \
@@ -314,7 +316,7 @@ static void kill_zombie(void *elem, int success) {
 }
 
 static void request_matcher_zombify_all_pending_calls(
-    request_matcher *request_matcher) {
+    request_matcher *request_matcher, grpc_workqueue *workqueue) {
   while (request_matcher->pending_head) {
     call_data *calld = request_matcher->pending_head;
     request_matcher->pending_head = calld->pending_next;
@@ -324,7 +326,7 @@ static void request_matcher_zombify_all_pending_calls(
     grpc_iomgr_closure_init(
         &calld->kill_zombie_closure, kill_zombie,
         grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
-    grpc_iomgr_add_callback(&calld->kill_zombie_closure);
+    grpc_workqueue_push(workqueue, &calld->kill_zombie_closure, 1);
   }
 }
 
@@ -363,6 +365,7 @@ static void server_delete(grpc_server *server) {
   }
   request_matcher_destroy(&server->unregistered_request_matcher);
   gpr_stack_lockfree_destroy(server->request_freelist);
+  grpc_workqueue_unref(server->workqueue);
   gpr_free(server->cqs);
   gpr_free(server->pollsets);
   gpr_free(server->shutdown_tags);
@@ -401,7 +404,8 @@ static void destroy_channel(channel_data *chand) {
   maybe_finish_shutdown(chand->server);
   chand->finish_destroy_channel_closure.cb = finish_destroy_channel;
   chand->finish_destroy_channel_closure.cb_arg = chand;
-  grpc_iomgr_add_callback(&chand->finish_destroy_channel_closure);
+  grpc_workqueue_push(chand->server->workqueue,
+                      &chand->finish_destroy_channel_closure, 1);
 }
 
 static void finish_start_new_rpc(grpc_server *server, grpc_call_element *elem,
@@ -414,7 +418,7 @@ static void finish_start_new_rpc(grpc_server *server, grpc_call_element *elem,
     calld->state = ZOMBIED;
     gpr_mu_unlock(&calld->mu_state);
     grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
-    grpc_iomgr_add_callback(&calld->kill_zombie_closure);
+    grpc_workqueue_push(server->workqueue, &calld->kill_zombie_closure, 1);
     return;
   }
 
@@ -505,10 +509,11 @@ static void kill_pending_work_locked(grpc_server *server) {
   registered_method *rm;
   request_matcher_kill_requests(server, &server->unregistered_request_matcher);
   request_matcher_zombify_all_pending_calls(
-      &server->unregistered_request_matcher);
+      &server->unregistered_request_matcher, server->workqueue);
   for (rm = server->registered_methods; rm; rm = rm->next) {
     request_matcher_kill_requests(server, &rm->request_matcher);
-    request_matcher_zombify_all_pending_calls(&rm->request_matcher);
+    request_matcher_zombify_all_pending_calls(&rm->request_matcher,
+                                              server->workqueue);
   }
 }
 
@@ -561,6 +566,7 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
 static void server_on_recv(void *ptr, int success) {
   grpc_call_element *elem = ptr;
   call_data *calld = elem->call_data;
+  channel_data *chand = elem->channel_data;
   gpr_timespec op_deadline;
 
   if (success && !calld->got_initial_metadata) {
@@ -595,7 +601,8 @@ static void server_on_recv(void *ptr, int success) {
         calld->state = ZOMBIED;
         gpr_mu_unlock(&calld->mu_state);
         grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
-        grpc_iomgr_add_callback(&calld->kill_zombie_closure);
+        grpc_workqueue_push(chand->server->workqueue,
+                            &calld->kill_zombie_closure, 1);
       } else {
         gpr_mu_unlock(&calld->mu_state);
       }
@@ -606,7 +613,8 @@ static void server_on_recv(void *ptr, int success) {
         calld->state = ZOMBIED;
         gpr_mu_unlock(&calld->mu_state);
         grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
-        grpc_iomgr_add_callback(&calld->kill_zombie_closure);
+        grpc_workqueue_push(chand->server->workqueue,
+                            &calld->kill_zombie_closure, 1);
       } else if (calld->state == PENDING) {
         calld->state = ZOMBIED;
         gpr_mu_unlock(&calld->mu_state);
@@ -799,6 +807,7 @@ grpc_server *grpc_server_create_from_filters(
   gpr_ref_init(&server->internal_refcount, 1);
   server->root_channel_data.next = server->root_channel_data.prev =
       &server->root_channel_data;
+  server->workqueue = grpc_workqueue_create();
 
   /* TODO(ctiller): expose a channel_arg for this */
   server->max_requested_calls = 32768;
@@ -873,6 +882,7 @@ void grpc_server_start(grpc_server *server) {
   server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count);
   for (i = 0; i < server->cq_count; i++) {
     server->pollsets[i] = grpc_cq_pollset(server->cqs[i]);
+    grpc_workqueue_add_to_pollset(server->workqueue, server->pollsets[i]);
   }
 
   for (l = server->listeners; l; l = l->next) {
@@ -883,6 +893,7 @@ void grpc_server_start(grpc_server *server) {
 void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport,
                                  grpc_channel_filter const **extra_filters,
                                  size_t num_extra_filters, grpc_mdctx *mdctx,
+                                 grpc_workqueue *workqueue,
                                  const grpc_channel_args *args) {
   size_t num_filters = s->channel_filter_count + num_extra_filters + 1;
   grpc_channel_filter const **filters =
@@ -917,7 +928,7 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport,
   }
 
   channel = grpc_channel_create_from_filters(NULL, filters, num_filters, args,
-                                             mdctx, 0);
+                                             mdctx, workqueue, 0);
   chand = (channel_data *)grpc_channel_stack_element(
               grpc_channel_get_channel_stack(channel), 0)
               ->channel_data;
@@ -1119,7 +1130,7 @@ static grpc_call_error queue_call_request(grpc_server *server,
         grpc_iomgr_closure_init(
             &calld->kill_zombie_closure, kill_zombie,
             grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
-        grpc_iomgr_add_callback(&calld->kill_zombie_closure);
+        grpc_workqueue_push(server->workqueue, &calld->kill_zombie_closure, 1);
       } else {
         GPR_ASSERT(calld->state == PENDING);
         calld->state = ACTIVATED;
diff --git a/src/core/surface/server.h b/src/core/surface/server.h
index c638d682bb..1d82d07ced 100644
--- a/src/core/surface/server.h
+++ b/src/core/surface/server.h
@@ -58,6 +58,7 @@ void grpc_server_listener_destroy_done(void *server);
 void grpc_server_setup_transport(grpc_server *server, grpc_transport *transport,
                                  grpc_channel_filter const **extra_filters,
                                  size_t num_extra_filters, grpc_mdctx *mdctx,
+                                 grpc_workqueue *workqueue,
                                  const grpc_channel_args *args);
 
 const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server);
diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c
index 4ab845bc00..91cf6ece9c 100644
--- a/src/core/surface/server_chttp2.c
+++ b/src/core/surface/server_chttp2.c
@@ -43,11 +43,11 @@
 #include <grpc/support/useful.h>
 
 static void setup_transport(void *server, grpc_transport *transport,
-                            grpc_mdctx *mdctx) {
+                            grpc_mdctx *mdctx, grpc_workqueue *workqueue) {
   static grpc_channel_filter const *extra_filters[] = {
       &grpc_http_server_filter};
   grpc_server_setup_transport(server, transport, extra_filters,
-                              GPR_ARRAY_SIZE(extra_filters), mdctx,
+                              GPR_ARRAY_SIZE(extra_filters), mdctx, workqueue,
                               grpc_server_get_channel_args(server));
 }
 
@@ -60,9 +60,10 @@ static void new_transport(void *server, grpc_endpoint *tcp) {
    * case.
    */
   grpc_mdctx *mdctx = grpc_mdctx_create();
+  grpc_workqueue *workqueue = grpc_workqueue_create();
   grpc_transport *transport = grpc_create_chttp2_transport(
-      grpc_server_get_channel_args(server), tcp, mdctx, 0);
-  setup_transport(server, transport, mdctx);
+      grpc_server_get_channel_args(server), tcp, mdctx, workqueue, 0);
+  setup_transport(server, transport, mdctx, workqueue);
   grpc_chttp2_transport_start_reading(transport, NULL, 0);
 }
 
diff --git a/src/core/transport/chttp2/frame_ping.c b/src/core/transport/chttp2/frame_ping.c
index 05451c7a8a..10d1e0a523 100644
--- a/src/core/transport/chttp2/frame_ping.c
+++ b/src/core/transport/chttp2/frame_ping.c
@@ -89,7 +89,9 @@ grpc_chttp2_parse_error grpc_chttp2_ping_parser_parse(
       for (ping = transport_parsing->pings.next;
            ping != &transport_parsing->pings; ping = ping->next) {
         if (0 == memcmp(p->opaque_8bytes, ping->id, 8)) {
-          grpc_iomgr_add_delayed_callback(ping->on_recv, 1);
+          /* we know no locks are held here, we may as well just call up
+           * directly */
+          ping->on_recv->cb(ping->on_recv->cb_arg, 1);
         }
         ping->next->prev = ping->prev;
         ping->prev->next = ping->next;
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index deb2fedf0c..705a025cca 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -166,7 +166,7 @@ static void destruct_transport(grpc_chttp2_transport *t) {
      and maybe they hold resources that need to be freed */
   while (t->global.pings.next != &t->global.pings) {
     grpc_chttp2_outstanding_ping *ping = t->global.pings.next;
-    grpc_iomgr_add_delayed_callback(ping->on_recv, 0);
+    ping->on_recv->cb(ping->on_recv->cb_arg, 0);
     ping->next->prev = ping->prev;
     ping->prev->next = ping->next;
     gpr_free(ping);
@@ -209,7 +209,7 @@ static void ref_transport(grpc_chttp2_transport *t) { gpr_ref(&t->refs); }
 static void init_transport(grpc_chttp2_transport *t,
                            const grpc_channel_args *channel_args,
                            grpc_endpoint *ep, grpc_mdctx *mdctx,
-                           gpr_uint8 is_client) {
+                           grpc_workqueue *workqueue, gpr_uint8 is_client) {
   size_t i;
   int j;
 
@@ -242,7 +242,7 @@ static void init_transport(grpc_chttp2_transport *t,
   t->parsing.deframe_state =
       is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0;
   t->writing.is_client = is_client;
-  grpc_connectivity_state_init(&t->channel_callback.state_tracker,
+  grpc_connectivity_state_init(&t->channel_callback.state_tracker, workqueue,
                                GRPC_CHANNEL_READY, "transport");
 
   gpr_slice_buffer_init(&t->global.qbuf);
@@ -1280,9 +1280,9 @@ static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream),
 
 grpc_transport *grpc_create_chttp2_transport(
     const grpc_channel_args *channel_args, grpc_endpoint *ep, grpc_mdctx *mdctx,
-    int is_client) {
+    grpc_workqueue *workqueue, int is_client) {
   grpc_chttp2_transport *t = gpr_malloc(sizeof(grpc_chttp2_transport));
-  init_transport(t, channel_args, ep, mdctx, is_client != 0);
+  init_transport(t, channel_args, ep, mdctx, workqueue, is_client != 0);
   return &t->base;
 }
 
diff --git a/src/core/transport/chttp2_transport.h b/src/core/transport/chttp2_transport.h
index fa0d6e4151..8bd8af6236 100644
--- a/src/core/transport/chttp2_transport.h
+++ b/src/core/transport/chttp2_transport.h
@@ -35,6 +35,7 @@
 #define GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_TRANSPORT_H
 
 #include "src/core/iomgr/endpoint.h"
+#include "src/core/iomgr/workqueue.h"
 #include "src/core/transport/transport.h"
 
 extern int grpc_http_trace;
@@ -42,7 +43,7 @@ extern int grpc_flowctl_trace;
 
 grpc_transport *grpc_create_chttp2_transport(
     const grpc_channel_args *channel_args, grpc_endpoint *ep,
-    grpc_mdctx *metadata_context, int is_client);
+    grpc_mdctx *metadata_context, grpc_workqueue *workqueue, int is_client);
 
 void grpc_chttp2_transport_start_reading(grpc_transport *transport,
                                          gpr_slice *slices, size_t nslices);
diff --git a/src/core/transport/connectivity_state.c b/src/core/transport/connectivity_state.c
index 61d26f06f0..716280505e 100644
--- a/src/core/transport/connectivity_state.c
+++ b/src/core/transport/connectivity_state.c
@@ -56,6 +56,7 @@ const char *grpc_connectivity_state_name(grpc_connectivity_state state) {
 }
 
 void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker,
+                                  grpc_workqueue *workqueue,
                                   grpc_connectivity_state init_state,
                                   const char *name) {
   tracker->current_state = init_state;
@@ -64,16 +65,18 @@ void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker,
 }
 
 void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker) {
+  int success;
   grpc_connectivity_state_watcher *w;
   while ((w = tracker->watchers)) {
     tracker->watchers = w->next;
 
     if (GRPC_CHANNEL_FATAL_FAILURE != *w->current) {
       *w->current = GRPC_CHANNEL_FATAL_FAILURE;
-      grpc_iomgr_add_callback(w->notify);
+      success = 1;
     } else {
-      grpc_iomgr_add_delayed_callback(w->notify, 0);
+      success = 0;
     }
+    grpc_workqueue_push(tracker->workqueue, w->notify, success);
     gpr_free(w);
   }
   gpr_free(tracker->name);
@@ -94,7 +97,7 @@ int grpc_connectivity_state_notify_on_state_change(
   }
   if (tracker->current_state != *current) {
     *current = tracker->current_state;
-    grpc_iomgr_add_callback(notify);
+    grpc_workqueue_push(tracker->workqueue, notify, 1);
   } else {
     grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w));
     w->current = current;
@@ -136,13 +139,13 @@ void grpc_connectivity_state_set_with_scheduler(
   tracker->watchers = new;
 }
 
-static void default_scheduler(void *ignored, grpc_iomgr_closure *closure) {
-  grpc_iomgr_add_callback(closure);
+static void default_scheduler(void *workqueue, grpc_iomgr_closure *closure) {
+  grpc_workqueue_push(workqueue, closure, 1);
 }
 
 void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker,
                                  grpc_connectivity_state state,
                                  const char *reason) {
   grpc_connectivity_state_set_with_scheduler(tracker, state, default_scheduler,
-                                             NULL, reason);
+                                             tracker->workqueue, reason);
 }
diff --git a/src/core/transport/connectivity_state.h b/src/core/transport/connectivity_state.h
index a3b0b80c98..6c61e02623 100644
--- a/src/core/transport/connectivity_state.h
+++ b/src/core/transport/connectivity_state.h
@@ -36,6 +36,7 @@
 
 #include <grpc/grpc.h>
 #include "src/core/iomgr/iomgr.h"
+#include "src/core/iomgr/workqueue.h"
 
 typedef struct grpc_connectivity_state_watcher {
   /** we keep watchers in a linked list */
@@ -53,11 +54,14 @@ typedef struct {
   grpc_connectivity_state_watcher *watchers;
   /** a name to help debugging */
   char *name;
+  /** workqueue for async work */
+  grpc_workqueue *workqueue;
 } grpc_connectivity_state_tracker;
 
 extern int grpc_connectivity_state_trace;
 
 void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker,
+                                  grpc_workqueue *grpc_workqueue,
                                   grpc_connectivity_state init_state,
                                   const char *name);
 void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker);
diff --git a/test/core/bad_client/bad_client.c b/test/core/bad_client/bad_client.c
index 41ac83b7b7..b1d3479fa5 100644
--- a/test/core/bad_client/bad_client.c
+++ b/test/core/bad_client/bad_client.c
@@ -65,12 +65,13 @@ static void done_write(void *arg, int success) {
 }
 
 static void server_setup_transport(void *ts, grpc_transport *transport,
-                                   grpc_mdctx *mdctx) {
+                                   grpc_mdctx *mdctx,
+                                   grpc_workqueue *workqueue) {
   thd_args *a = ts;
   static grpc_channel_filter const *extra_filters[] = {
       &grpc_http_server_filter};
   grpc_server_setup_transport(a->server, transport, extra_filters,
-                              GPR_ARRAY_SIZE(extra_filters), mdctx,
+                              GPR_ARRAY_SIZE(extra_filters), mdctx, workqueue,
                               grpc_server_get_channel_args(a->server));
 }
 
-- 
GitLab