From 117a300027b18a1e0a950da707e4710a08b981d3 Mon Sep 17 00:00:00 2001
From: Yuchen Zeng <zyc@google.com>
Date: Thu, 17 Nov 2016 19:43:36 -0800
Subject: [PATCH] Move ev_driver from ares_dns_resolver to grpc_ares_request

---
 .../resolver/dns/c_ares/dns_resolver_ares.c   |  12 +-
 .../resolver/dns/c_ares/grpc_ares_ev_driver.h |   3 +-
 .../dns/c_ares/grpc_ares_ev_driver_posix.c    | 118 ++++++++++--------
 .../resolver/dns/c_ares/grpc_ares_wrapper.c   |  62 ++++++---
 .../resolver/dns/c_ares/grpc_ares_wrapper.h   |   6 +-
 5 files changed, 110 insertions(+), 91 deletions(-)

diff --git a/src/core/ext/resolver/dns/c_ares/dns_resolver_ares.c b/src/core/ext/resolver/dns/c_ares/dns_resolver_ares.c
index 64d4a5933e..e5691942a4 100644
--- a/src/core/ext/resolver/dns/c_ares/dns_resolver_ares.c
+++ b/src/core/ext/resolver/dns/c_ares/dns_resolver_ares.c
@@ -41,7 +41,6 @@
 #include "src/core/ext/client_channel/http_connect_handshaker.h"
 #include "src/core/ext/client_channel/lb_policy_registry.h"
 #include "src/core/ext/client_channel/resolver_registry.h"
-#include "src/core/ext/resolver/dns/c_ares/grpc_ares_ev_driver.h"
 #include "src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper.h"
 #include "src/core/lib/channel/channel_args.h"
 #include "src/core/lib/iomgr/combiner.h"
@@ -65,8 +64,6 @@ typedef struct {
   char *default_port;
   /** channel args. */
   grpc_channel_args *channel_args;
-  /** the event driver to drive the lookups */
-  grpc_ares_ev_driver *ev_driver;
 
   /** Closures used by the combiner */
   grpc_closure dns_ares_shutdown_locked;
@@ -281,7 +278,7 @@ static void dns_ares_start_resolving_locked(grpc_exec_ctx *exec_ctx,
   r->resolving = true;
   r->addresses = NULL;
   grpc_resolve_address_ares(
-      exec_ctx, r->name_to_resolve, r->default_port, r->ev_driver,
+      exec_ctx, r->name_to_resolve, r->default_port, r->base.pollset_set,
       grpc_closure_create(dns_ares_on_resolved, r), &r->addresses);
 }
 
@@ -301,7 +298,6 @@ static void dns_ares_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
 static void dns_ares_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) {
   gpr_log(GPR_DEBUG, "dns_ares_destroy");
   ares_dns_resolver *r = (ares_dns_resolver *)gr;
-  grpc_ares_ev_driver_destroy(exec_ctx, r->ev_driver);
   grpc_combiner_destroy(exec_ctx, r->combiner);
   grpc_ares_cleanup();
   if (r->resolved_result != NULL) {
@@ -340,12 +336,6 @@ static grpc_resolver *dns_ares_create(grpc_resolver_args *args,
   r = gpr_malloc(sizeof(ares_dns_resolver));
   memset(r, 0, sizeof(*r));
   grpc_resolver_init(&r->base, &dns_ares_resolver_vtable);
-  error = grpc_ares_ev_driver_create(&r->ev_driver, r->base.pollset_set);
-  if (error != GRPC_ERROR_NONE) {
-    GRPC_LOG_IF_ERROR("grpc_ares_ev_driver_create() failed", error);
-    gpr_free(r);
-    return NULL;
-  }
   r->combiner = grpc_combiner_create(NULL);
   r->name_to_resolve = proxy_name == NULL ? gpr_strdup(path) : proxy_name;
   r->default_port = gpr_strdup(default_port);
diff --git a/src/core/ext/resolver/dns/c_ares/grpc_ares_ev_driver.h b/src/core/ext/resolver/dns/c_ares/grpc_ares_ev_driver.h
index 09ca047c3b..7165df0afc 100644
--- a/src/core/ext/resolver/dns/c_ares/grpc_ares_ev_driver.h
+++ b/src/core/ext/resolver/dns/c_ares/grpc_ares_ev_driver.h
@@ -58,7 +58,6 @@ grpc_error *grpc_ares_ev_driver_create(grpc_ares_ev_driver **ev_driver,
 /* Destroys \a ev_driver asynchronously. Pending lookups made on \a ev_driver
    will be cancelled and their on_done callbacks will be invoked with a status
    of ARES_ECANCELLED. */
-void grpc_ares_ev_driver_destroy(grpc_exec_ctx *exec_ctx,
-                                 grpc_ares_ev_driver *ev_driver);
+void grpc_ares_ev_driver_destroy(grpc_ares_ev_driver *ev_driver);
 
 #endif /* GRPC_CORE_EXT_RESOLVER_DNS_C_ARES_GRPC_ARES_EV_DRIVER_H */
diff --git a/src/core/ext/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c b/src/core/ext/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c
index 8d8f5f0580..a4733dcb4b 100644
--- a/src/core/ext/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c
+++ b/src/core/ext/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c
@@ -86,6 +86,8 @@ struct grpc_ares_ev_driver {
   fd_node *fds;
   /** is this event driver currently working? */
   bool working;
+  /** is this event driver being shut down */
+  bool shutting_down;
 };
 
 static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx,
@@ -145,19 +147,19 @@ grpc_error *grpc_ares_ev_driver_create(grpc_ares_ev_driver **ev_driver,
   (*ev_driver)->pollset_set = pollset_set;
   (*ev_driver)->fds = NULL;
   (*ev_driver)->working = false;
+  (*ev_driver)->shutting_down = false;
   return GRPC_ERROR_NONE;
 }
 
-void grpc_ares_ev_driver_destroy(grpc_exec_ctx *exec_ctx,
-                                 grpc_ares_ev_driver *ev_driver) {
-  // Shutdown all the working fds, invoke their registered on_readable_cb and
-  // on_writable_cb.
+void grpc_ares_ev_driver_destroy(  // grpc_exec_ctx *exec_ctx,
+    grpc_ares_ev_driver *ev_driver) {
+  // It's not safe to shut down remaining fds here directly, becauses
+  // ares_host_callback does not provide an exec_ctx. We mark the event driver
+  // as being shut down. If the event driver is working,
+  // grpc_ares_notify_on_event_locked will shut down the fds; if it's not
+  // working, grpc_ares_ev_driver_unref will release it directly.
   gpr_mu_lock(&ev_driver->mu);
-  fd_node *fdn;
-  for (fdn = ev_driver->fds; fdn; fdn = fdn->next) {
-    grpc_fd_shutdown(exec_ctx, fdn->grpc_fd);
-    fdn = fdn->next;
-  }
+  ev_driver->shutting_down = true;
   gpr_mu_unlock(&ev_driver->mu);
   grpc_ares_ev_driver_unref(ev_driver);
 }
@@ -243,53 +245,57 @@ void *grpc_ares_ev_driver_get_channel(grpc_ares_ev_driver *ev_driver) {
 static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx,
                                              grpc_ares_ev_driver *ev_driver) {
   fd_node *new_list = NULL;
-  ares_socket_t socks[ARES_GETSOCK_MAXNUM];
-  int socks_bitmask =
-      ares_getsock(ev_driver->channel, socks, ARES_GETSOCK_MAXNUM);
-  for (size_t i = 0; i < ARES_GETSOCK_MAXNUM; i++) {
-    if (ARES_GETSOCK_READABLE(socks_bitmask, i) ||
-        ARES_GETSOCK_WRITABLE(socks_bitmask, i)) {
-      fd_node *fdn = pop_fd_node(&ev_driver->fds, socks[i]);
-      // Create a new fd_node if sock[i] is not in the fd_node list.
-      if (fdn == NULL) {
-        char *fd_name;
-        gpr_asprintf(&fd_name, "ares_ev_driver-%" PRIuPTR, i);
-        fdn = gpr_malloc(sizeof(fd_node));
-        gpr_log(GPR_DEBUG, "new fd: %d", socks[i]);
-        fdn->grpc_fd = grpc_fd_create(socks[i], fd_name);
-        fdn->ev_driver = ev_driver;
-        fdn->readable_registered = false;
-        fdn->writable_registered = false;
-        gpr_mu_init(&fdn->mu);
-        grpc_closure_init(&fdn->read_closure, on_readable_cb, fdn);
-        grpc_closure_init(&fdn->write_closure, on_writable_cb, fdn);
-        grpc_pollset_set_add_fd(exec_ctx, ev_driver->pollset_set, fdn->grpc_fd);
-        gpr_free(fd_name);
-      }
-      fdn->next = new_list;
-      new_list = fdn;
-      gpr_mu_lock(&fdn->mu);
-      // Register read_closure if the socket is readable and read_closure has
-      // not been registered with this socket.
-      if (ARES_GETSOCK_READABLE(socks_bitmask, i) &&
-          !fdn->readable_registered) {
-        grpc_ares_ev_driver_ref(ev_driver);
-        gpr_log(GPR_DEBUG, "notify read on: %d",
-                grpc_fd_wrapped_fd(fdn->grpc_fd));
-        grpc_fd_notify_on_read(exec_ctx, fdn->grpc_fd, &fdn->read_closure);
-        fdn->readable_registered = true;
+  if (!ev_driver->shutting_down) {
+    ares_socket_t socks[ARES_GETSOCK_MAXNUM];
+    int socks_bitmask =
+        ares_getsock(ev_driver->channel, socks, ARES_GETSOCK_MAXNUM);
+    for (size_t i = 0; i < ARES_GETSOCK_MAXNUM; i++) {
+      if (ARES_GETSOCK_READABLE(socks_bitmask, i) ||
+          ARES_GETSOCK_WRITABLE(socks_bitmask, i)) {
+        fd_node *fdn = pop_fd_node(&ev_driver->fds, socks[i]);
+        // Create a new fd_node if sock[i] is not in the fd_node list.
+        if (fdn == NULL) {
+          char *fd_name;
+          gpr_asprintf(&fd_name, "ares_ev_driver-%" PRIuPTR, i);
+          fdn = gpr_malloc(sizeof(fd_node));
+          gpr_log(GPR_DEBUG, "new fd: %d", socks[i]);
+          fdn->grpc_fd = grpc_fd_create(socks[i], fd_name);
+          fdn->ev_driver = ev_driver;
+          fdn->readable_registered = false;
+          fdn->writable_registered = false;
+          gpr_mu_init(&fdn->mu);
+          grpc_closure_init(&fdn->read_closure, on_readable_cb, fdn);
+          grpc_closure_init(&fdn->write_closure, on_writable_cb, fdn);
+          grpc_pollset_set_add_fd(exec_ctx, ev_driver->pollset_set,
+                                  fdn->grpc_fd);
+          gpr_free(fd_name);
+        }
+        fdn->next = new_list;
+        new_list = fdn;
+        gpr_mu_lock(&fdn->mu);
+        // Register read_closure if the socket is readable and read_closure has
+        // not been registered with this socket.
+        if (ARES_GETSOCK_READABLE(socks_bitmask, i) &&
+            !fdn->readable_registered) {
+          grpc_ares_ev_driver_ref(ev_driver);
+          gpr_log(GPR_DEBUG, "notify read on: %d",
+                  grpc_fd_wrapped_fd(fdn->grpc_fd));
+          grpc_fd_notify_on_read(exec_ctx, fdn->grpc_fd, &fdn->read_closure);
+          fdn->readable_registered = true;
+        }
+        // Register write_closure if the socket is writable and write_closure
+        // has
+        // not been registered with this socket.
+        if (ARES_GETSOCK_WRITABLE(socks_bitmask, i) &&
+            !fdn->writable_registered) {
+          gpr_log(GPR_DEBUG, "notify write on: %d",
+                  grpc_fd_wrapped_fd(fdn->grpc_fd));
+          grpc_ares_ev_driver_ref(ev_driver);
+          grpc_fd_notify_on_write(exec_ctx, fdn->grpc_fd, &fdn->write_closure);
+          fdn->writable_registered = true;
+        }
+        gpr_mu_unlock(&fdn->mu);
       }
-      // Register write_closure if the socket is writable and write_closure has
-      // not been registered with this socket.
-      if (ARES_GETSOCK_WRITABLE(socks_bitmask, i) &&
-          !fdn->writable_registered) {
-        gpr_log(GPR_DEBUG, "notify write on: %d",
-                grpc_fd_wrapped_fd(fdn->grpc_fd));
-        grpc_ares_ev_driver_ref(ev_driver);
-        grpc_fd_notify_on_write(exec_ctx, fdn->grpc_fd, &fdn->write_closure);
-        fdn->writable_registered = true;
-      }
-      gpr_mu_unlock(&fdn->mu);
     }
   }
   // Any remaining fds in ev_driver->fds were not returned by ares_getsock() and
@@ -311,12 +317,14 @@ static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx,
 
 void grpc_ares_ev_driver_start(grpc_exec_ctx *exec_ctx,
                                grpc_ares_ev_driver *ev_driver) {
+  grpc_ares_ev_driver_ref(ev_driver);
   gpr_mu_lock(&ev_driver->mu);
   if (!ev_driver->working) {
     ev_driver->working = true;
     grpc_ares_notify_on_event_locked(exec_ctx, ev_driver);
   }
   gpr_mu_unlock(&ev_driver->mu);
+  grpc_ares_ev_driver_unref(ev_driver);
 }
 
 #endif /* !GRPC_NATIVE_ADDRESS_RESOLVE && GRPC_POSIX_SOCKET */
diff --git a/src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper.c b/src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper.c
index 9c83a8bf39..f90222b2e6 100644
--- a/src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper.c
+++ b/src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper.c
@@ -96,6 +96,32 @@ static uint16_t strhtons(const char *port) {
   return htons((unsigned short)atoi(port));
 }
 
+static void grpc_ares_request_unref(grpc_exec_ctx *exec_ctx,
+                                    grpc_ares_request *r) {
+  // If there are no pending queries, invoke on_done callback and destroy the
+  // request
+  if (gpr_unref(&r->pending_queries)) {
+    if (exec_ctx == NULL) {
+      // A new exec_ctx is created here, as the c-ares interface does not
+      // provide one in ares_host_callback. It's safe to schedule on_done with
+      // the newly created exec_ctx, since the caller has been warned not to
+      // acquire locks in on_done. ares_dns_resolver is using combiner to
+      // protect resources needed by on_done.
+      grpc_exec_ctx new_exec_ctx = GRPC_EXEC_CTX_INIT;
+      grpc_exec_ctx_sched(&new_exec_ctx, r->on_done, r->error, NULL);
+      grpc_exec_ctx_finish(&new_exec_ctx);
+    } else {
+      grpc_exec_ctx_sched(exec_ctx, r->on_done, r->error, NULL);
+    }
+    gpr_mu_destroy(&r->mu);
+    grpc_ares_ev_driver_destroy(r->ev_driver);
+    gpr_free(r->host);
+    gpr_free(r->port);
+    gpr_free(r->default_port);
+    gpr_free(r);
+  }
+}
+
 static void on_done_cb(void *arg, int status, int timeouts,
                        struct hostent *hostent) {
   grpc_ares_request *r = (grpc_ares_request *)arg;
@@ -165,28 +191,13 @@ static void on_done_cb(void *arg, int status, int timeouts,
     }
   }
   gpr_mu_unlock(&r->mu);
-  // If there are no pending queries, invoke on_done callback and destroy the
-  // request
-  if (gpr_unref(&r->pending_queries)) {
-    // A new exec_ctx is created here, as the c-ares interface does not provide
-    // one in this callback. It's safe to schedule on_done with the newly
-    // created exec_ctx, since the caller has been warned not to acquire locks
-    // in on_done. ares_dns_resolver is using combiner to protect resources
-    // needed by on_done.
-    grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
-    grpc_exec_ctx_sched(&exec_ctx, r->on_done, r->error, NULL);
-    grpc_exec_ctx_finish(&exec_ctx);
-    gpr_mu_destroy(&r->mu);
-    gpr_free(r->host);
-    gpr_free(r->port);
-    gpr_free(r->default_port);
-    gpr_free(r);
-  }
+  grpc_ares_request_unref(NULL, r);
 }
 
 void grpc_resolve_address_ares_impl(grpc_exec_ctx *exec_ctx, const char *name,
                                     const char *default_port,
-                                    grpc_ares_ev_driver *ev_driver,
+                                    // grpc_ares_ev_driver *ev_driver,
+                                    grpc_pollset_set *interested_parties,
                                     grpc_closure *on_done,
                                     grpc_resolved_addresses **addrs) {
   grpc_error *err;
@@ -216,6 +227,13 @@ void grpc_resolve_address_ares_impl(grpc_exec_ctx *exec_ctx, const char *name,
     port = gpr_strdup(default_port);
   }
 
+  grpc_ares_ev_driver *ev_driver;
+  err = grpc_ares_ev_driver_create(&ev_driver, interested_parties);
+  if (err != GRPC_ERROR_NONE) {
+    GRPC_LOG_IF_ERROR("grpc_ares_ev_driver_create() failed", err);
+    goto error_cleanup;
+  }
+
   grpc_ares_request *r = gpr_malloc(sizeof(grpc_ares_request));
   gpr_mu_init(&r->mu);
   r->ev_driver = ev_driver;
@@ -228,13 +246,16 @@ void grpc_resolve_address_ares_impl(grpc_exec_ctx *exec_ctx, const char *name,
   r->error = GRPC_ERROR_NONE;
   ares_channel *channel =
       (ares_channel *)grpc_ares_ev_driver_get_channel(r->ev_driver);
-  gpr_ref_init(&r->pending_queries, 1);
+  // An extra reference is put here to avoid destroying the request in
+  // on_done_cb before calling grpc_ares_ev_driver_start.
+  gpr_ref_init(&r->pending_queries, 2);
   if (grpc_ipv6_loopback_available()) {
     gpr_ref(&r->pending_queries);
     ares_gethostbyname(*channel, r->host, AF_INET6, on_done_cb, r);
   }
   ares_gethostbyname(*channel, r->host, AF_INET, on_done_cb, r);
   grpc_ares_ev_driver_start(exec_ctx, ev_driver);
+  grpc_ares_request_unref(exec_ctx, r);
   return;
 
 error_cleanup:
@@ -244,7 +265,8 @@ error_cleanup:
 
 void (*grpc_resolve_address_ares)(
     grpc_exec_ctx *exec_ctx, const char *name, const char *default_port,
-    grpc_ares_ev_driver *ev_driver, grpc_closure *on_done,
+    grpc_pollset_set *interested_parties, grpc_closure *on_done,
+    // grpc_ares_ev_driver *ev_driver, grpc_closure *on_done,
     grpc_resolved_addresses **addrs) = grpc_resolve_address_ares_impl;
 
 grpc_error *grpc_ares_init(void) {
diff --git a/src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper.h b/src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper.h
index ffeff86344..3968a445ab 100644
--- a/src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper.h
+++ b/src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper.h
@@ -43,12 +43,12 @@
 /* Asynchronously resolve addr. Use default_port if a port isn't designated in
    addr, otherwise use the port in addr. grpc_ares_init() must be called at
    least once before this function. \a on_done may be called directly in this
-   function without being scheduled with \a exec_ctx, it should not try to
-   acquire locks that are being held by the caller. */
+   function without being scheduled with \a exec_ctx, it must not try to acquire
+   locks that are being held by the caller. */
 extern void (*grpc_resolve_address_ares)(grpc_exec_ctx *exec_ctx,
                                          const char *addr,
                                          const char *default_port,
-                                         grpc_ares_ev_driver *ev_driver,
+                                         grpc_pollset_set *interested_parties,
                                          grpc_closure *on_done,
                                          grpc_resolved_addresses **addresses);
 
-- 
GitLab