From 296984b46c5e130c7aed4bdae1fc910108e3cbce Mon Sep 17 00:00:00 2001
From: Craig Tiller <ctiller@google.com>
Date: Wed, 30 Nov 2016 12:25:02 -0800
Subject: [PATCH] Break infinite connection retry loop

Previously, we'd keep retrying a connection until the channel closed.
With this change, we only retry connecting *if there's a watcher on the
subchannel connection state*. This ultimately means that if the
lb_policy doesn't care if the subchannel connects, it'll stop trying.
---
 src/core/ext/client_channel/subchannel.c    | 135 ++++++++++++--------
 src/core/lib/transport/connectivity_state.c |  13 +-
 src/core/lib/transport/connectivity_state.h |   5 +-
 3 files changed, 95 insertions(+), 58 deletions(-)

diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c
index a148b2a0e1..bb3d167bec 100644
--- a/src/core/ext/client_channel/subchannel.c
+++ b/src/core/ext/client_channel/subchannel.c
@@ -119,9 +119,9 @@ struct grpc_subchannel {
   gpr_mu mu;
 
   /** have we seen a disconnection? */
-  int disconnected;
+  bool disconnected;
   /** are we connecting */
-  int connecting;
+  bool connecting;
   /** connectivity state tracking */
   grpc_connectivity_state_tracker state_tracker;
 
@@ -132,7 +132,9 @@ struct grpc_subchannel {
   /** backoff state */
   gpr_backoff backoff_state;
   /** do we have an active alarm? */
-  int have_alarm;
+  bool have_alarm;
+  /** have we started the backoff loop */
+  bool backoff_begun;
   /** our alarm */
   grpc_timer alarm;
 };
@@ -264,7 +266,7 @@ static void disconnect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
   grpc_subchannel_index_unregister(exec_ctx, c->key, c);
   gpr_mu_lock(&c->mu);
   GPR_ASSERT(!c->disconnected);
-  c->disconnected = 1;
+  c->disconnected = true;
   grpc_connector_shutdown(exec_ctx, c->connector);
   con = GET_CONNECTED_SUBCHANNEL(c, no_barrier);
   if (con != NULL) {
@@ -386,12 +388,6 @@ static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
                          &c->connected);
 }
 
-static void start_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
-  c->next_attempt =
-      gpr_backoff_begin(&c->backoff_state, gpr_now(GPR_CLOCK_MONOTONIC));
-  continue_connect(exec_ctx, c);
-}
-
 grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c,
                                                            grpc_error **error) {
   grpc_connectivity_state state;
@@ -418,6 +414,73 @@ static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg,
   follow_up->cb(exec_ctx, follow_up->cb_arg, error);
 }
 
+static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
+  grpc_subchannel *c = arg;
+  gpr_mu_lock(&c->mu);
+  c->have_alarm = 0;
+  if (c->disconnected) {
+    error = GRPC_ERROR_CREATE_REFERENCING("Disconnected", &error, 1);
+  } else {
+    GRPC_ERROR_REF(error);
+  }
+  if (error == GRPC_ERROR_NONE) {
+    gpr_log(GPR_INFO, "Failed to connect to channel, retrying");
+    c->next_attempt =
+        gpr_backoff_step(&c->backoff_state, gpr_now(GPR_CLOCK_MONOTONIC));
+    continue_connect(exec_ctx, c);
+    gpr_mu_unlock(&c->mu);
+  } else {
+    gpr_mu_unlock(&c->mu);
+    GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
+  }
+  GRPC_ERROR_UNREF(error);
+}
+
+static void maybe_start_connecting_locked(grpc_exec_ctx *exec_ctx,
+                                          grpc_subchannel *c) {
+  if (c->disconnected) {
+    /* Don't try to connect if we're already disconnected */
+    return;
+  }
+
+  if (c->connecting) {
+    /* Already connecting: don't restart */
+    return;
+  }
+
+  if (GET_CONNECTED_SUBCHANNEL(c, no_barrier) != NULL) {
+    /* Already connected: don't restart */
+    return;
+  }
+
+  if (!grpc_connectivity_state_has_watchers(&c->state_tracker)) {
+    /* Nobody is interested in connecting: so don't just yet */
+    return;
+  }
+
+  c->connecting = true;
+  GRPC_SUBCHANNEL_WEAK_REF(c, "connecting");
+
+  gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
+  if (!c->backoff_begun) {
+    c->backoff_begun = true;
+    c->next_attempt = gpr_backoff_begin(&c->backoff_state, now);
+    continue_connect(exec_ctx, c);
+  } else {
+    GPR_ASSERT(!c->have_alarm);
+    c->have_alarm = 1;
+    gpr_timespec time_til_next = gpr_time_sub(c->next_attempt, now);
+    if (gpr_time_cmp(time_til_next, gpr_time_0(time_til_next.clock_type)) <=
+        0) {
+      gpr_log(GPR_INFO, "Retry immediately");
+    } else {
+      gpr_log(GPR_INFO, "Retry in %" PRId64 ".%09d seconds",
+              time_til_next.tv_sec, time_til_next.tv_nsec);
+    }
+    grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, on_alarm, c, now);
+  }
+}
+
 void grpc_subchannel_notify_on_state_change(
     grpc_exec_ctx *exec_ctx, grpc_subchannel *c,
     grpc_pollset_set *interested_parties, grpc_connectivity_state *state,
@@ -449,13 +512,9 @@ void grpc_subchannel_notify_on_state_change(
     w->next = &c->root_external_state_watcher;
     w->prev = w->next->prev;
     w->next->prev = w->prev->next = w;
-    if (grpc_connectivity_state_notify_on_state_change(
-            exec_ctx, &c->state_tracker, state, &w->closure)) {
-      c->connecting = 1;
-      /* released by connection */
-      GRPC_SUBCHANNEL_WEAK_REF(c, "connecting");
-      start_connect(exec_ctx, c);
-    }
+    grpc_connectivity_state_notify_on_state_change(exec_ctx, &c->state_tracker,
+                                                   state, &w->closure);
+    maybe_start_connecting_locked(exec_ctx, c);
     gpr_mu_unlock(&c->mu);
   }
 }
@@ -575,7 +634,6 @@ static void publish_transport_locked(grpc_exec_ctx *exec_ctx,
                     Re-evaluate if we really need this. */
   gpr_atm_full_barrier();
   GPR_ASSERT(gpr_atm_rel_cas(&c->connected_subchannel, 0, (gpr_atm)con));
-  c->connecting = 0;
 
   /* setup subchannel watching connected subchannel for changes; subchannel
      ref
@@ -592,28 +650,6 @@ static void publish_transport_locked(grpc_exec_ctx *exec_ctx,
                               GRPC_ERROR_NONE, "connected");
 }
 
-static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
-  grpc_subchannel *c = arg;
-  gpr_mu_lock(&c->mu);
-  c->have_alarm = 0;
-  if (c->disconnected) {
-    error = GRPC_ERROR_CREATE_REFERENCING("Disconnected", &error, 1);
-  } else {
-    GRPC_ERROR_REF(error);
-  }
-  if (error == GRPC_ERROR_NONE) {
-    gpr_log(GPR_INFO, "Failed to connect to channel, retrying");
-    c->next_attempt =
-        gpr_backoff_step(&c->backoff_state, gpr_now(GPR_CLOCK_MONOTONIC));
-    continue_connect(exec_ctx, c);
-    gpr_mu_unlock(&c->mu);
-  } else {
-    gpr_mu_unlock(&c->mu);
-    GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
-  }
-  GRPC_ERROR_UNREF(error);
-}
-
 static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
                                  grpc_error *error) {
   grpc_subchannel *c = arg;
@@ -621,35 +657,28 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
 
   GRPC_SUBCHANNEL_WEAK_REF(c, "connected");
   gpr_mu_lock(&c->mu);
+  c->connecting = false;
   if (c->connecting_result.transport != NULL) {
     publish_transport_locked(exec_ctx, c);
   } else if (c->disconnected) {
     GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
   } else {
-    gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
-    GPR_ASSERT(!c->have_alarm);
-    c->have_alarm = 1;
     grpc_connectivity_state_set(
         exec_ctx, &c->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
         grpc_error_set_int(
             GRPC_ERROR_CREATE_REFERENCING("Connect Failed", &error, 1),
             GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE),
         "connect_failed");
-    gpr_timespec time_til_next = gpr_time_sub(c->next_attempt, now);
+
     const char *errmsg = grpc_error_string(error);
     gpr_log(GPR_INFO, "Connect failed: %s", errmsg);
-    if (gpr_time_cmp(time_til_next, gpr_time_0(time_til_next.clock_type)) <=
-        0) {
-      gpr_log(GPR_INFO, "Retry immediately");
-    } else {
-      gpr_log(GPR_INFO, "Retry in %" PRId64 ".%09d seconds",
-              time_til_next.tv_sec, time_til_next.tv_nsec);
-    }
-    grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, on_alarm, c, now);
     grpc_error_free_string(errmsg);
+
+    maybe_start_connecting_locked(exec_ctx, c);
+    GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
   }
   gpr_mu_unlock(&c->mu);
-  GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
+  GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connected");
   grpc_channel_args_destroy(delete_channel_args);
 }
 
diff --git a/src/core/lib/transport/connectivity_state.c b/src/core/lib/transport/connectivity_state.c
index 89072879d9..4f49d7cf7d 100644
--- a/src/core/lib/transport/connectivity_state.c
+++ b/src/core/lib/transport/connectivity_state.c
@@ -100,7 +100,12 @@ grpc_connectivity_state grpc_connectivity_state_check(
   return tracker->current_state;
 }
 
-int grpc_connectivity_state_notify_on_state_change(
+bool grpc_connectivity_state_has_watchers(
+    grpc_connectivity_state_tracker *connectivity_state) {
+  return connectivity_state->watchers != NULL;
+}
+
+bool grpc_connectivity_state_notify_on_state_change(
     grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker,
     grpc_connectivity_state *current, grpc_closure *notify) {
   if (grpc_connectivity_state_trace) {
@@ -119,7 +124,7 @@ int grpc_connectivity_state_notify_on_state_change(
       grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_CANCELLED, NULL);
       tracker->watchers = w->next;
       gpr_free(w);
-      return 0;
+      return false;
     }
     while (w != NULL) {
       grpc_connectivity_state_watcher *rm_candidate = w->next;
@@ -127,11 +132,11 @@ int grpc_connectivity_state_notify_on_state_change(
         grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_CANCELLED, NULL);
         w->next = w->next->next;
         gpr_free(rm_candidate);
-        return 0;
+        return false;
       }
       w = w->next;
     }
-    return 0;
+    return false;
   } else {
     if (tracker->current_state != *current) {
       *current = tracker->current_state;
diff --git a/src/core/lib/transport/connectivity_state.h b/src/core/lib/transport/connectivity_state.h
index 7a2fa52c10..769c675b79 100644
--- a/src/core/lib/transport/connectivity_state.h
+++ b/src/core/lib/transport/connectivity_state.h
@@ -75,13 +75,16 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx,
                                  grpc_error *associated_error,
                                  const char *reason);
 
+bool grpc_connectivity_state_has_watchers(
+    grpc_connectivity_state_tracker *tracker);
+
 grpc_connectivity_state grpc_connectivity_state_check(
     grpc_connectivity_state_tracker *tracker, grpc_error **current_error);
 
 /** Return 1 if the channel should start connecting, 0 otherwise.
     If current==NULL cancel notify if it is already queued (success==0 in that
     case) */
-int grpc_connectivity_state_notify_on_state_change(
+bool grpc_connectivity_state_notify_on_state_change(
     grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker,
     grpc_connectivity_state *current, grpc_closure *notify);
 
-- 
GitLab