From ccd27fd9b2cc3c2f83dc9a7632f6a2d5f0438c45 Mon Sep 17 00:00:00 2001
From: ctiller <ctiller@google.com>
Date: Thu, 11 Dec 2014 09:12:02 -0800
Subject: [PATCH] Wait for name resolutions to complete before shutting down
 iomgr.

This at least avoids crashing when shutting down during name resolution.
There is still a memory leak to track down before I add a test that exposes
this.

This CL also makes some tiny cleanups and debuggability improvements.
	Change on 2014/12/11 by ctiller <ctiller@google.com>
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=81882486
---
 src/core/channel/client_setup.c        |  2 +-
 src/core/iomgr/iomgr_libevent.c        | 32 +++++++++++++++++++++-----
 src/core/iomgr/iomgr_libevent.h        |  2 ++
 src/core/iomgr/resolve_address_posix.c |  9 +++++++-
 src/core/iomgr/tcp_client_posix.c      |  8 +++++++
 5 files changed, 45 insertions(+), 8 deletions(-)

diff --git a/src/core/channel/client_setup.c b/src/core/channel/client_setup.c
index 29fe915add..8060b2a36d 100644
--- a/src/core/channel/client_setup.c
+++ b/src/core/channel/client_setup.c
@@ -211,7 +211,7 @@ void grpc_client_setup_request_finish(grpc_client_setup_request *r,
   if (retry) {
     /* TODO(klempner): Replace these values with further consideration. 2x is
        probably too aggressive of a backoff. */
-    gpr_timespec max_backoff = gpr_time_from_micros(120000000);
+    gpr_timespec max_backoff = gpr_time_from_minutes(2);
     GPR_ASSERT(!s->in_alarm);
     s->in_alarm = 1;
     grpc_alarm_init(&s->backoff_alarm, backoff_alarm_done, s);
diff --git a/src/core/iomgr/iomgr_libevent.c b/src/core/iomgr/iomgr_libevent.c
index 1af03dcf12..7206e1bc1f 100644
--- a/src/core/iomgr/iomgr_libevent.c
+++ b/src/core/iomgr/iomgr_libevent.c
@@ -59,6 +59,7 @@ gpr_cv grpc_iomgr_cv;
 static grpc_libevent_activation_data *g_activation_queue;
 static int g_num_pollers;
 static int g_num_fds;
+static int g_num_address_resolutions;
 static gpr_timespec g_last_poll_completed;
 static int g_shutdown_backup_poller;
 static gpr_event g_backup_poller_done;
@@ -69,6 +70,18 @@ static grpc_fd *g_fds_to_free;
 int evthread_use_threads(void);
 static void grpc_fd_impl_destroy(grpc_fd *impl);
 
+void grpc_iomgr_ref_address_resolution(int delta) {
+  gpr_mu_lock(&grpc_iomgr_mu);
+  gpr_log(GPR_DEBUG, "num_address_resolutions = %d + %d",
+          g_num_address_resolutions, delta);
+  GPR_ASSERT(!g_shutdown_backup_poller);
+  g_num_address_resolutions += delta;
+  if (0 == g_num_address_resolutions) {
+    gpr_cv_broadcast(&grpc_iomgr_cv);
+  }
+  gpr_mu_unlock(&grpc_iomgr_mu);
+}
+
 /* If anything is in the work queue, process one item and return 1.
    Return 0 if there were no work items to complete.
    Requires grpc_iomgr_mu locked, may unlock and relock during the call. */
@@ -86,6 +99,10 @@ static int maybe_do_queue_work() {
         g_activation_queue;
   }
   work->next = work->prev = NULL;
+  /* force status to cancelled from ok when shutting down */
+  if (g_shutdown_backup_poller && work->status == GRPC_CALLBACK_SUCCESS) {
+    work->status = GRPC_CALLBACK_CANCELLED;
+  }
   gpr_mu_unlock(&grpc_iomgr_mu);
 
   work->cb(work->arg, work->status);
@@ -225,6 +242,7 @@ void grpc_iomgr_init() {
   g_activation_queue = NULL;
   g_num_pollers = 0;
   g_num_fds = 0;
+  g_num_address_resolutions = 0;
   g_last_poll_completed = gpr_now();
   g_shutdown_backup_poller = 0;
   g_fds_to_free = NULL;
@@ -256,17 +274,19 @@ void grpc_iomgr_shutdown() {
 
   /* broadcast shutdown */
   gpr_mu_lock(&grpc_iomgr_mu);
-  while (g_num_fds) {
+  while (g_num_fds > 0 || g_num_address_resolutions > 0) {
     gpr_log(GPR_INFO,
-            "waiting for %d fds to be destroyed before closing event manager",
-            g_num_fds);
+            "waiting for %d fds and %d name resolutions to be destroyed before "
+            "closing event manager",
+            g_num_fds, g_num_address_resolutions);
     if (gpr_cv_wait(&grpc_iomgr_cv, &grpc_iomgr_mu, fd_shutdown_deadline)) {
       gpr_log(GPR_ERROR,
-              "not all fds destroyed before shutdown deadline: memory leaks "
+              "not all fds or name resolutions destroyed before shutdown "
+              "deadline: memory leaks "
               "are likely");
       break;
-    } else if (g_num_fds == 0) {
-      gpr_log(GPR_INFO, "all fds closed");
+    } else if (g_num_fds == 0 && g_num_address_resolutions == 0) {
+      gpr_log(GPR_INFO, "all fds closed, all name resolutions finished");
     }
   }
 
diff --git a/src/core/iomgr/iomgr_libevent.h b/src/core/iomgr/iomgr_libevent.h
index 77e7b59989..d04deb82e4 100644
--- a/src/core/iomgr/iomgr_libevent.h
+++ b/src/core/iomgr/iomgr_libevent.h
@@ -204,4 +204,6 @@ struct grpc_alarm {
   gpr_atm triggered; /* To be used atomically if alarm triggered */
 };
 
+void grpc_iomgr_ref_address_resolution(int delta);
+
 #endif /* __GRPC_INTERNAL_IOMGR_IOMGR_LIBEVENT_H__ */
diff --git a/src/core/iomgr/resolve_address_posix.c b/src/core/iomgr/resolve_address_posix.c
index d3ea3780ce..a0a04297eb 100644
--- a/src/core/iomgr/resolve_address_posix.c
+++ b/src/core/iomgr/resolve_address_posix.c
@@ -41,6 +41,7 @@
 #include <unistd.h>
 #include <string.h>
 
+#include "src/core/iomgr/iomgr_libevent.h"
 #include "src/core/iomgr/sockaddr_utils.h"
 #include "src/core/iomgr/socket_utils_posix.h"
 #include <grpc/support/alloc.h>
@@ -192,10 +193,15 @@ done:
 /* Thread function to asynch-ify grpc_blocking_resolve_address */
 static void do_request(void *rp) {
   request *r = rp;
-  r->cb(r->arg, grpc_blocking_resolve_address(r->name, r->default_port));
+  grpc_resolved_addresses *resolved =
+      grpc_blocking_resolve_address(r->name, r->default_port);
+  void *arg = r->arg;
+  grpc_resolve_cb cb = r->cb;
   gpr_free(r->name);
   gpr_free(r->default_port);
   gpr_free(r);
+  cb(arg, resolved);
+  grpc_iomgr_ref_address_resolution(-1);
 }
 
 void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) {
@@ -207,6 +213,7 @@ void grpc_resolve_address(const char *name, const char *default_port,
                           grpc_resolve_cb cb, void *arg) {
   request *r = gpr_malloc(sizeof(request));
   gpr_thd_id id;
+  grpc_iomgr_ref_address_resolution(1);
   r->name = gpr_strdup(name);
   r->default_port = gpr_strdup(default_port);
   r->cb = cb;
diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c
index 8d2d7ab081..69e7a5326a 100644
--- a/src/core/iomgr/tcp_client_posix.c
+++ b/src/core/iomgr/tcp_client_posix.c
@@ -109,6 +109,14 @@ static void on_writable(void *acp, grpc_iomgr_cb_status status) {
         grpc_fd_notify_on_write(ac->fd, on_writable, ac, ac->deadline);
         return;
       } else {
+        switch (so_error) {
+          case ECONNREFUSED:
+            gpr_log(GPR_ERROR, "socket error: connection refused");
+            break;
+          default:
+            gpr_log(GPR_ERROR, "socket error: %d", so_error);
+            break;
+        }
         goto error;
       }
     } else {
-- 
GitLab