From c46beaaa29af84f676bde9d013a85dffed1d58c9 Mon Sep 17 00:00:00 2001
From: Craig Tiller <ctiller@google.com>
Date: Wed, 24 Feb 2016 09:17:19 -0800
Subject: [PATCH] Add an implementation firewall against pollset_set

So multiple implementations can exist in one binary
---
 src/core/channel/client_channel.c             | 45 +++++++++++--------
 .../client_config/lb_policies/pick_first.c    | 31 +++++++------
 .../client_config/lb_policies/round_robin.c   | 25 ++++++-----
 src/core/client_config/lb_policy.c            |  4 +-
 src/core/client_config/lb_policy.h            |  3 +-
 src/core/client_config/subchannel.c           | 30 ++++++-------
 src/core/httpcli/httpcli.c                    | 22 ++++-----
 src/core/httpcli/httpcli.h                    |  3 +-
 src/core/iomgr/pollset_set.h                  | 10 +----
 src/core/iomgr/pollset_set_posix.c            | 23 +++++++++-
 src/core/iomgr/pollset_set_posix.h            | 17 +------
 src/core/iomgr/pollset_set_windows.c          |  4 +-
 src/core/iomgr/pollset_set_windows.h          |  2 +-
 src/core/iomgr/tcp_client_posix.c             | 12 ++---
 src/core/iomgr/tcp_posix.c                    |  5 ++-
 15 files changed, 128 insertions(+), 108 deletions(-)

diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 7176c01b05..a96b49ac12 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -78,8 +78,8 @@ typedef struct client_channel_channel_data {
   int exit_idle_when_lb_policy_arrives;
   /** owning stack */
   grpc_channel_stack *owning_stack;
-  /** interested parties */
-  grpc_pollset_set interested_parties;
+  /** interested parties (owned) */
+  grpc_pollset_set *interested_parties;
 } channel_data;
 
 /** We create one watcher for each new lb_policy that is returned from a
@@ -183,8 +183,8 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
   chand->incoming_configuration = NULL;
 
   if (lb_policy != NULL) {
-    grpc_pollset_set_add_pollset_set(exec_ctx, &lb_policy->interested_parties,
-                                     &chand->interested_parties);
+    grpc_pollset_set_add_pollset_set(exec_ctx, lb_policy->interested_parties,
+                                     chand->interested_parties);
   }
 
   gpr_mu_lock(&chand->mu_config);
@@ -231,9 +231,8 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
   }
 
   if (old_lb_policy != NULL) {
-    grpc_pollset_set_del_pollset_set(exec_ctx,
-                                     &old_lb_policy->interested_parties,
-                                     &chand->interested_parties);
+    grpc_pollset_set_del_pollset_set(
+        exec_ctx, old_lb_policy->interested_parties, chand->interested_parties);
     GRPC_LB_POLICY_UNREF(exec_ctx, old_lb_policy, "channel");
   }
 
@@ -254,7 +253,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
 
   GPR_ASSERT(op->set_accept_stream == NULL);
   if (op->bind_pollset != NULL) {
-    grpc_pollset_set_add_pollset(exec_ctx, &chand->interested_parties,
+    grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties,
                                  op->bind_pollset);
   }
 
@@ -284,8 +283,8 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
     chand->resolver = NULL;
     if (chand->lb_policy != NULL) {
       grpc_pollset_set_del_pollset_set(exec_ctx,
-                                       &chand->lb_policy->interested_parties,
-                                       &chand->interested_parties);
+                                       chand->lb_policy->interested_parties,
+                                       chand->interested_parties);
       GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
       chand->lb_policy = NULL;
     }
@@ -411,7 +410,7 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx,
 
   grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
                                "client_channel");
-  grpc_pollset_set_init(&chand->interested_parties);
+  chand->interested_parties = grpc_pollset_set_create();
 }
 
 /* Destructor for channel_data */
@@ -425,12 +424,12 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
   }
   if (chand->lb_policy != NULL) {
     grpc_pollset_set_del_pollset_set(exec_ctx,
-                                     &chand->lb_policy->interested_parties,
-                                     &chand->interested_parties);
+                                     chand->lb_policy->interested_parties,
+                                     chand->interested_parties);
     GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
   }
   grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker);
-  grpc_pollset_set_destroy(&chand->interested_parties);
+  grpc_pollset_set_destroy(chand->interested_parties);
   gpr_mu_destroy(&chand->mu_config);
 }
 
@@ -441,9 +440,17 @@ static void cc_set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
 }
 
 const grpc_channel_filter grpc_client_channel_filter = {
-    cc_start_transport_stream_op, cc_start_transport_op, sizeof(call_data),
-    init_call_elem, cc_set_pollset, destroy_call_elem, sizeof(channel_data),
-    init_channel_elem, destroy_channel_elem, cc_get_peer, "client-channel",
+    cc_start_transport_stream_op,
+    cc_start_transport_op,
+    sizeof(call_data),
+    init_call_elem,
+    cc_set_pollset,
+    destroy_call_elem,
+    sizeof(channel_data),
+    init_channel_elem,
+    destroy_channel_elem,
+    cc_get_peer,
+    "client-channel",
 };
 
 void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx,
@@ -501,7 +508,7 @@ static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg,
                                        bool iomgr_success) {
   external_connectivity_watcher *w = arg;
   grpc_closure *follow_up = w->on_complete;
-  grpc_pollset_set_del_pollset(exec_ctx, &w->chand->interested_parties,
+  grpc_pollset_set_del_pollset(exec_ctx, w->chand->interested_parties,
                                w->pollset);
   GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack,
                            "external_connectivity_watcher");
@@ -517,7 +524,7 @@ void grpc_client_channel_watch_connectivity_state(
   w->chand = chand;
   w->pollset = pollset;
   w->on_complete = on_complete;
-  grpc_pollset_set_add_pollset(exec_ctx, &chand->interested_parties, pollset);
+  grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, pollset);
   grpc_closure_init(&w->my_closure, on_external_watch_complete, w);
   GRPC_CHANNEL_STACK_REF(w->chand->owning_stack,
                          "external_connectivity_watcher");
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c
index 459bbebb68..9f38f398d8 100644
--- a/src/core/client_config/lb_policies/pick_first.c
+++ b/src/core/client_config/lb_policies/pick_first.c
@@ -31,8 +31,8 @@
  *
  */
 
-#include "src/core/client_config/lb_policy_factory.h"
 #include "src/core/client_config/lb_policies/pick_first.h"
+#include "src/core/client_config/lb_policy_factory.h"
 
 #include <string.h>
 
@@ -119,7 +119,7 @@ void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
   while (pp != NULL) {
     pending_pick *next = pp->next;
     *pp->target = NULL;
-    grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties,
+    grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
                                  pp->pollset);
     grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL);
     gpr_free(pp);
@@ -137,7 +137,7 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
   while (pp != NULL) {
     pending_pick *next = pp->next;
     if (pp->target == target) {
-      grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties,
+      grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
                                    pp->pollset);
       *target = NULL;
       grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL);
@@ -158,7 +158,7 @@ static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) {
   GRPC_LB_POLICY_WEAK_REF(&p->base, "pick_first_connectivity");
   grpc_subchannel_notify_on_state_change(
       exec_ctx, p->subchannels[p->checking_subchannel],
-      &p->base.interested_parties, &p->checking_connectivity,
+      p->base.interested_parties, &p->checking_connectivity,
       &p->connectivity_changed);
 }
 
@@ -195,8 +195,7 @@ int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset,
     if (!p->started_picking) {
       start_picking(exec_ctx, p);
     }
-    grpc_pollset_set_add_pollset(exec_ctx, &p->base.interested_parties,
-                                 pollset);
+    grpc_pollset_set_add_pollset(exec_ctx, p->base.interested_parties, pollset);
     pp = gpr_malloc(sizeof(*pp));
     pp->next = p->pending_picks;
     pp->pollset = pollset;
@@ -253,7 +252,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
                                 p->checking_connectivity, "selected_changed");
     if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) {
       grpc_connected_subchannel_notify_on_state_change(
-          exec_ctx, selected, &p->base.interested_parties,
+          exec_ctx, selected, p->base.interested_parties,
           &p->checking_connectivity, &p->connectivity_changed);
     } else {
       GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
@@ -278,13 +277,13 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
         while ((pp = p->pending_picks)) {
           p->pending_picks = pp->next;
           *pp->target = selected;
-          grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties,
+          grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
                                        pp->pollset);
           grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL);
           gpr_free(pp);
         }
         grpc_connected_subchannel_notify_on_state_change(
-            exec_ctx, selected, &p->base.interested_parties,
+            exec_ctx, selected, p->base.interested_parties,
             &p->checking_connectivity, &p->connectivity_changed);
         break;
       case GRPC_CHANNEL_TRANSIENT_FAILURE:
@@ -298,7 +297,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
         if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
           grpc_subchannel_notify_on_state_change(
               exec_ctx, p->subchannels[p->checking_subchannel],
-              &p->base.interested_parties, &p->checking_connectivity,
+              p->base.interested_parties, &p->checking_connectivity,
               &p->connectivity_changed);
         } else {
           goto loop;
@@ -311,7 +310,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
                                     "connecting_changed");
         grpc_subchannel_notify_on_state_change(
             exec_ctx, p->subchannels[p->checking_subchannel],
-            &p->base.interested_parties, &p->checking_connectivity,
+            p->base.interested_parties, &p->checking_connectivity,
             &p->connectivity_changed);
         break;
       case GRPC_CHANNEL_FATAL_FAILURE:
@@ -379,8 +378,14 @@ void pf_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
 }
 
 static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
-    pf_destroy, pf_shutdown, pf_pick, pf_cancel_pick, pf_ping_one, pf_exit_idle,
-    pf_check_connectivity, pf_notify_on_state_change};
+    pf_destroy,
+    pf_shutdown,
+    pf_pick,
+    pf_cancel_pick,
+    pf_ping_one,
+    pf_exit_idle,
+    pf_check_connectivity,
+    pf_notify_on_state_change};
 
 static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {}
 
diff --git a/src/core/client_config/lb_policies/round_robin.c b/src/core/client_config/lb_policies/round_robin.c
index b1171c45b0..114ece6e4d 100644
--- a/src/core/client_config/lb_policies/round_robin.c
+++ b/src/core/client_config/lb_policies/round_robin.c
@@ -260,7 +260,7 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
   while (pp != NULL) {
     pending_pick *next = pp->next;
     if (pp->target == target) {
-      grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties,
+      grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
                                    pp->pollset);
       *target = NULL;
       grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL);
@@ -285,7 +285,7 @@ static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) {
     subchannel_data *sd = p->subchannels[i];
     sd->connectivity_state = GRPC_CHANNEL_IDLE;
     grpc_subchannel_notify_on_state_change(
-        exec_ctx, sd->subchannel, &p->base.interested_parties,
+        exec_ctx, sd->subchannel, p->base.interested_parties,
         &sd->connectivity_state, &sd->connectivity_changed_closure);
     GRPC_LB_POLICY_WEAK_REF(&p->base, "round_robin_connectivity");
   }
@@ -322,8 +322,7 @@ int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset,
     if (!p->started_picking) {
       start_picking(exec_ctx, p);
     }
-    grpc_pollset_set_add_pollset(exec_ctx, &p->base.interested_parties,
-                                 pollset);
+    grpc_pollset_set_add_pollset(exec_ctx, p->base.interested_parties, pollset);
     pp = gpr_malloc(sizeof(*pp));
     pp->next = p->pending_picks;
     pp->pollset = pollset;
@@ -374,13 +373,13 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
                     "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)",
                     selected->subchannel, selected);
           }
-          grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties,
+          grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
                                        pp->pollset);
           grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL);
           gpr_free(pp);
         }
         grpc_subchannel_notify_on_state_change(
-            exec_ctx, sd->subchannel, &p->base.interested_parties,
+            exec_ctx, sd->subchannel, p->base.interested_parties,
             &sd->connectivity_state, &sd->connectivity_changed_closure);
         break;
       case GRPC_CHANNEL_CONNECTING:
@@ -389,13 +388,13 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
                                     sd->connectivity_state,
                                     "connecting_changed");
         grpc_subchannel_notify_on_state_change(
-            exec_ctx, sd->subchannel, &p->base.interested_parties,
+            exec_ctx, sd->subchannel, p->base.interested_parties,
             &sd->connectivity_state, &sd->connectivity_changed_closure);
         break;
       case GRPC_CHANNEL_TRANSIENT_FAILURE:
         /* renew state notification */
         grpc_subchannel_notify_on_state_change(
-            exec_ctx, sd->subchannel, &p->base.interested_parties,
+            exec_ctx, sd->subchannel, p->base.interested_parties,
             &sd->connectivity_state, &sd->connectivity_changed_closure);
 
         /* remove from ready list if still present */
@@ -484,8 +483,14 @@ static void rr_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
 }
 
 static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = {
-    rr_destroy, rr_shutdown, rr_pick, rr_cancel_pick, rr_ping_one, rr_exit_idle,
-    rr_check_connectivity, rr_notify_on_state_change};
+    rr_destroy,
+    rr_shutdown,
+    rr_pick,
+    rr_cancel_pick,
+    rr_ping_one,
+    rr_exit_idle,
+    rr_check_connectivity,
+    rr_notify_on_state_change};
 
 static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {}
 
diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c
index d4672f6b25..5ff623e006 100644
--- a/src/core/client_config/lb_policy.c
+++ b/src/core/client_config/lb_policy.c
@@ -39,7 +39,7 @@ void grpc_lb_policy_init(grpc_lb_policy *policy,
                          const grpc_lb_policy_vtable *vtable) {
   policy->vtable = vtable;
   gpr_atm_no_barrier_store(&policy->ref_pair, 1 << WEAK_REF_BITS);
-  grpc_pollset_set_init(&policy->interested_parties);
+  policy->interested_parties = grpc_pollset_set_create();
 }
 
 #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG
@@ -93,7 +93,7 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx,
   gpr_atm old_val =
       ref_mutate(policy, -(gpr_atm)1, 1 REF_MUTATE_PASS_ARGS("WEAK_UNREF"));
   if (old_val == 1) {
-    grpc_pollset_set_destroy(&policy->interested_parties);
+    grpc_pollset_set_destroy(policy->interested_parties);
     policy->vtable->destroy(exec_ctx, policy);
   }
 }
diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h
index db5238c8ca..4fbb12da39 100644
--- a/src/core/client_config/lb_policy.h
+++ b/src/core/client_config/lb_policy.h
@@ -48,7 +48,8 @@ typedef void (*grpc_lb_completion)(void *cb_arg, grpc_subchannel *subchannel,
 struct grpc_lb_policy {
   const grpc_lb_policy_vtable *vtable;
   gpr_atm ref_pair;
-  grpc_pollset_set interested_parties;
+  /* owned pointer to interested parties in load balancing decisions */
+  grpc_pollset_set *interested_parties;
 };
 
 struct grpc_lb_policy_vtable {
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index 6599c75dba..291ad3472c 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -108,7 +108,7 @@ struct grpc_subchannel {
 
   /** pollset_set tracking who's interested in a connection
       being setup */
-  grpc_pollset_set pollset_set;
+  grpc_pollset_set *pollset_set;
 
   /** active connection, or null; of type grpc_connected_subchannel */
   gpr_atm connected_subchannel;
@@ -184,8 +184,8 @@ static void connection_destroy(grpc_exec_ctx *exec_ctx, void *arg,
   gpr_free(c);
 }
 
-void grpc_connected_subchannel_ref(grpc_connected_subchannel *c
-                                       GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+void grpc_connected_subchannel_ref(
+    grpc_connected_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
   GRPC_CHANNEL_STACK_REF(CHANNEL_STACK_FROM_CONNECTION(c), REF_REASON);
 }
 
@@ -209,7 +209,7 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg,
   gpr_slice_unref(c->initial_connect_string);
   grpc_connectivity_state_destroy(exec_ctx, &c->state_tracker);
   grpc_connector_unref(exec_ctx, c->connector);
-  grpc_pollset_set_destroy(&c->pollset_set);
+  grpc_pollset_set_destroy(c->pollset_set);
   grpc_subchannel_key_destroy(exec_ctx, c->key);
   gpr_free(c);
 }
@@ -226,8 +226,8 @@ static gpr_atm ref_mutate(grpc_subchannel *c, gpr_atm delta,
   return old_val;
 }
 
-grpc_subchannel *grpc_subchannel_ref(grpc_subchannel *c
-                                         GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+grpc_subchannel *grpc_subchannel_ref(
+    grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
   gpr_atm old_refs;
   old_refs = ref_mutate(c, (1 << INTERNAL_REF_BITS),
                         0 REF_MUTATE_PURPOSE("STRONG_REF"));
@@ -235,8 +235,8 @@ grpc_subchannel *grpc_subchannel_ref(grpc_subchannel *c
   return c;
 }
 
-grpc_subchannel *grpc_subchannel_weak_ref(grpc_subchannel *c
-                                              GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+grpc_subchannel *grpc_subchannel_weak_ref(
+    grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
   gpr_atm old_refs;
   old_refs = ref_mutate(c, 1, 0 REF_MUTATE_PURPOSE("WEAK_REF"));
   GPR_ASSERT(old_refs != 0);
@@ -326,7 +326,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
   }
   c->addr = gpr_malloc(args->addr_len);
   memcpy(c->addr, args->addr, args->addr_len);
-  grpc_pollset_set_init(&c->pollset_set);
+  c->pollset_set = grpc_pollset_set_create();
   c->addr_len = args->addr_len;
   grpc_set_initial_connect_string(&c->addr, &c->addr_len,
                                   &c->initial_connect_string);
@@ -345,7 +345,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
 static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
   grpc_connect_in_args args;
 
-  args.interested_parties = &c->pollset_set;
+  args.interested_parties = c->pollset_set;
   args.addr = c->addr;
   args.addr_len = c->addr_len;
   args.deadline = compute_connect_deadline(c);
@@ -379,7 +379,7 @@ static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg,
   external_state_watcher *w = arg;
   grpc_closure *follow_up = w->notify;
   if (w->pollset_set != NULL) {
-    grpc_pollset_set_del_pollset_set(exec_ctx, &w->subchannel->pollset_set,
+    grpc_pollset_set_del_pollset_set(exec_ctx, w->subchannel->pollset_set,
                                      w->pollset_set);
   }
   gpr_mu_lock(&w->subchannel->mu);
@@ -415,7 +415,7 @@ void grpc_subchannel_notify_on_state_change(
     w->notify = notify;
     grpc_closure_init(&w->closure, on_external_state_watcher_done, w);
     if (interested_parties != NULL) {
-      grpc_pollset_set_add_pollset_set(exec_ctx, &c->pollset_set,
+      grpc_pollset_set_add_pollset_set(exec_ctx, c->pollset_set,
                                        interested_parties);
     }
     GRPC_SUBCHANNEL_WEAK_REF(c, "external_state_watcher");
@@ -573,7 +573,7 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
   GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher");
   GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
   grpc_connected_subchannel_notify_on_state_change(
-      exec_ctx, con, &c->pollset_set, &sw_subchannel->connectivity_state,
+      exec_ctx, con, c->pollset_set, &sw_subchannel->connectivity_state,
       &sw_subchannel->closure);
 
   /* signal completion */
@@ -690,8 +690,8 @@ static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call,
   GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0);
 }
 
-void grpc_subchannel_call_ref(grpc_subchannel_call *c
-                                  GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+void grpc_subchannel_call_ref(
+    grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
   GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON);
 }
 
diff --git a/src/core/httpcli/httpcli.c b/src/core/httpcli/httpcli.c
index 71237bb614..68fa8c8cf5 100644
--- a/src/core/httpcli/httpcli.c
+++ b/src/core/httpcli/httpcli.c
@@ -31,20 +31,20 @@
  *
  */
 
-#include "src/core/iomgr/sockaddr.h"
 #include "src/core/httpcli/httpcli.h"
+#include "src/core/iomgr/sockaddr.h"
 
 #include <string.h>
 
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+#include "src/core/httpcli/format_request.h"
+#include "src/core/httpcli/parser.h"
 #include "src/core/iomgr/endpoint.h"
 #include "src/core/iomgr/resolve_address.h"
 #include "src/core/iomgr/tcp_client.h"
-#include "src/core/httpcli/format_request.h"
-#include "src/core/httpcli/parser.h"
 #include "src/core/support/string.h"
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/string_util.h>
 
 typedef struct {
   gpr_slice request_text;
@@ -84,18 +84,18 @@ const grpc_httpcli_handshaker grpc_httpcli_plaintext = {"http",
                                                         plaintext_handshake};
 
 void grpc_httpcli_context_init(grpc_httpcli_context *context) {
-  grpc_pollset_set_init(&context->pollset_set);
+  context->pollset_set = grpc_pollset_set_create();
 }
 
 void grpc_httpcli_context_destroy(grpc_httpcli_context *context) {
-  grpc_pollset_set_destroy(&context->pollset_set);
+  grpc_pollset_set_destroy(context->pollset_set);
 }
 
 static void next_address(grpc_exec_ctx *exec_ctx, internal_request *req);
 
 static void finish(grpc_exec_ctx *exec_ctx, internal_request *req,
                    int success) {
-  grpc_pollset_set_del_pollset(exec_ctx, &req->context->pollset_set,
+  grpc_pollset_set_del_pollset(exec_ctx, req->context->pollset_set,
                                req->pollset);
   req->on_response(exec_ctx, req->user_data, success ? &req->parser.r : NULL);
   grpc_httpcli_parser_destroy(&req->parser);
@@ -197,7 +197,7 @@ static void next_address(grpc_exec_ctx *exec_ctx, internal_request *req) {
   addr = &req->addresses->addrs[req->next_address++];
   grpc_closure_init(&req->connected, on_connected, req);
   grpc_tcp_client_connect(
-      exec_ctx, &req->connected, &req->ep, &req->context->pollset_set,
+      exec_ctx, &req->connected, &req->ep, req->context->pollset_set,
       (struct sockaddr *)&addr->addr, addr->len, req->deadline);
 }
 
@@ -237,7 +237,7 @@ static void internal_request_begin(
   req->host = gpr_strdup(request->host);
   req->ssl_host_override = gpr_strdup(request->ssl_host_override);
 
-  grpc_pollset_set_add_pollset(exec_ctx, &req->context->pollset_set,
+  grpc_pollset_set_add_pollset(exec_ctx, req->context->pollset_set,
                                req->pollset);
   grpc_resolve_address(request->host, req->handshaker->default_port,
                        on_resolved, req);
diff --git a/src/core/httpcli/httpcli.h b/src/core/httpcli/httpcli.h
index 30875d71f1..86e17c1d69 100644
--- a/src/core/httpcli/httpcli.h
+++ b/src/core/httpcli/httpcli.h
@@ -39,6 +39,7 @@
 #include <grpc/support/time.h>
 
 #include "src/core/iomgr/endpoint.h"
+#include "src/core/iomgr/iomgr_internal.h"
 #include "src/core/iomgr/pollset_set.h"
 
 /* User agent this library reports */
@@ -56,7 +57,7 @@ typedef struct grpc_httpcli_header {
    TODO(ctiller): allow caching and capturing multiple requests for the
                   same content and combining them */
 typedef struct grpc_httpcli_context {
-  grpc_pollset_set pollset_set;
+  grpc_pollset_set *pollset_set;
 } grpc_httpcli_context;
 
 typedef struct {
diff --git a/src/core/iomgr/pollset_set.h b/src/core/iomgr/pollset_set.h
index 09c04438f7..9591bf0d32 100644
--- a/src/core/iomgr/pollset_set.h
+++ b/src/core/iomgr/pollset_set.h
@@ -41,15 +41,9 @@
    fd's (etc) that have been registered with the set_set to that pollset.
    Registering fd's automatically adds them to all current pollsets. */
 
-#ifdef GPR_POSIX_SOCKET
-#include "src/core/iomgr/pollset_set_posix.h"
-#endif
+typedef struct grpc_pollset_set grpc_pollset_set;
 
-#ifdef GPR_WIN32
-#include "src/core/iomgr/pollset_set_windows.h"
-#endif
-
-void grpc_pollset_set_init(grpc_pollset_set *pollset_set);
+grpc_pollset_set *grpc_pollset_set_create(void);
 void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set);
 void grpc_pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
                                   grpc_pollset_set *pollset_set,
diff --git a/src/core/iomgr/pollset_set_posix.c b/src/core/iomgr/pollset_set_posix.c
index 3610e36b58..9dc9aff4a8 100644
--- a/src/core/iomgr/pollset_set_posix.c
+++ b/src/core/iomgr/pollset_set_posix.c
@@ -42,11 +42,29 @@
 #include <grpc/support/useful.h>
 
 #include "src/core/iomgr/pollset_posix.h"
-#include "src/core/iomgr/pollset_set.h"
+#include "src/core/iomgr/pollset_set_posix.h"
 
-void grpc_pollset_set_init(grpc_pollset_set *pollset_set) {
+struct grpc_pollset_set {
+  gpr_mu mu;
+
+  size_t pollset_count;
+  size_t pollset_capacity;
+  grpc_pollset **pollsets;
+
+  size_t pollset_set_count;
+  size_t pollset_set_capacity;
+  struct grpc_pollset_set **pollset_sets;
+
+  size_t fd_count;
+  size_t fd_capacity;
+  grpc_fd **fds;
+};
+
+grpc_pollset_set *grpc_pollset_set_create(void) {
+  grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set));
   memset(pollset_set, 0, sizeof(*pollset_set));
   gpr_mu_init(&pollset_set->mu);
+  return pollset_set;
 }
 
 void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set) {
@@ -58,6 +76,7 @@ void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set) {
   gpr_free(pollset_set->pollsets);
   gpr_free(pollset_set->pollset_sets);
   gpr_free(pollset_set->fds);
+  gpr_free(pollset_set);
 }
 
 void grpc_pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/iomgr/pollset_set_posix.h b/src/core/iomgr/pollset_set_posix.h
index 5ee83b5dd6..7d1aaf4181 100644
--- a/src/core/iomgr/pollset_set_posix.h
+++ b/src/core/iomgr/pollset_set_posix.h
@@ -35,22 +35,7 @@
 #define GRPC_INTERNAL_CORE_IOMGR_POLLSET_SET_POSIX_H
 
 #include "src/core/iomgr/fd_posix.h"
-
-typedef struct grpc_pollset_set {
-  gpr_mu mu;
-
-  size_t pollset_count;
-  size_t pollset_capacity;
-  grpc_pollset **pollsets;
-
-  size_t pollset_set_count;
-  size_t pollset_set_capacity;
-  struct grpc_pollset_set **pollset_sets;
-
-  size_t fd_count;
-  size_t fd_capacity;
-  grpc_fd **fds;
-} grpc_pollset_set;
+#include "src/core/iomgr/pollset_set.h"
 
 void grpc_pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
                              grpc_pollset_set *pollset_set, grpc_fd *fd);
diff --git a/src/core/iomgr/pollset_set_windows.c b/src/core/iomgr/pollset_set_windows.c
index 157b46ec32..9cf8fd4472 100644
--- a/src/core/iomgr/pollset_set_windows.c
+++ b/src/core/iomgr/pollset_set_windows.c
@@ -35,9 +35,9 @@
 
 #ifdef GPR_WINSOCK_SOCKET
 
-#include "src/core/iomgr/pollset_set.h"
+#include "src/core/iomgr/pollset_set_windows.h"
 
-void grpc_pollset_set_init(grpc_pollset_set* pollset_set) {}
+grpc_pollset_set* grpc_pollset_set_create(pollset_set) { return NULL; }
 
 void grpc_pollset_set_destroy(grpc_pollset_set* pollset_set) {}
 
diff --git a/src/core/iomgr/pollset_set_windows.h b/src/core/iomgr/pollset_set_windows.h
index cada0d2b61..aa5abe9133 100644
--- a/src/core/iomgr/pollset_set_windows.h
+++ b/src/core/iomgr/pollset_set_windows.h
@@ -34,6 +34,6 @@
 #ifndef GRPC_INTERNAL_CORE_IOMGR_POLLSET_SET_WINDOWS_H
 #define GRPC_INTERNAL_CORE_IOMGR_POLLSET_SET_WINDOWS_H
 
-typedef struct grpc_pollset_set { void *unused; } grpc_pollset_set;
+#include "src/core/iomgr/pollset_set.h"
 
 #endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_WINDOWS_H */
diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c
index c76c2e3b0f..15727856ab 100644
--- a/src/core/iomgr/tcp_client_posix.c
+++ b/src/core/iomgr/tcp_client_posix.c
@@ -42,17 +42,19 @@
 #include <string.h>
 #include <unistd.h>
 
-#include "src/core/iomgr/timer.h"
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/time.h>
+
 #include "src/core/iomgr/iomgr_posix.h"
 #include "src/core/iomgr/pollset_posix.h"
+#include "src/core/iomgr/pollset_set_posix.h"
 #include "src/core/iomgr/sockaddr_utils.h"
 #include "src/core/iomgr/socket_utils_posix.h"
 #include "src/core/iomgr/tcp_posix.h"
+#include "src/core/iomgr/timer.h"
 #include "src/core/support/string.h"
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/string_util.h>
-#include <grpc/support/time.h>
 
 extern int grpc_tcp_trace;
 
diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c
index fba3563427..e8f73811ce 100644
--- a/src/core/iomgr/tcp_posix.c
+++ b/src/core/iomgr/tcp_posix.c
@@ -53,6 +53,7 @@
 
 #include "src/core/debug/trace.h"
 #include "src/core/iomgr/pollset_posix.h"
+#include "src/core/iomgr/pollset_set_posix.h"
 #include "src/core/profiling/timers.h"
 #include "src/core/support/string.h"
 
@@ -296,7 +297,7 @@ static flush_result tcp_flush(grpc_tcp *tcp) {
     unwind_slice_idx = tcp->outgoing_slice_idx;
     unwind_byte_idx = tcp->outgoing_byte_idx;
     for (iov_size = 0; tcp->outgoing_slice_idx != tcp->outgoing_buffer->count &&
-                           iov_size != MAX_WRITE_IOVEC;
+                       iov_size != MAX_WRITE_IOVEC;
          iov_size++) {
       iov[iov_size].iov_base =
           GPR_SLICE_START_PTR(
@@ -445,7 +446,7 @@ static char *tcp_get_peer(grpc_endpoint *ep) {
 }
 
 static const grpc_endpoint_vtable vtable = {
-    tcp_read, tcp_write, tcp_add_to_pollset, tcp_add_to_pollset_set,
+    tcp_read,     tcp_write,   tcp_add_to_pollset, tcp_add_to_pollset_set,
     tcp_shutdown, tcp_destroy, tcp_get_peer};
 
 grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size,
-- 
GitLab