From af084dc37c2d31f199cfa0516473fbe308ed9ba4 Mon Sep 17 00:00:00 2001
From: David Garcia Quintas <dgq@google.com>
Date: Tue, 27 Jun 2017 13:42:54 -0700
Subject: [PATCH] Fixes to pick first

---
 .../lb_policy/pick_first/pick_first.c         | 45 ++++++++++++++++---
 .../lb_policy/round_robin/round_robin.c       | 30 ++++++-------
 .../resolver/fake/fake_resolver.c             |  3 ++
 3 files changed, 57 insertions(+), 21 deletions(-)

diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c
index 307e3bad67..d0acd7a901 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c
@@ -95,6 +95,9 @@ static void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
   gpr_free(p->subchannels);
   gpr_free(p->new_subchannels);
   gpr_free(p);
+  if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
+    gpr_log(GPR_DEBUG, "Pick First %p destroyed.", (void *)p);
+  }
 }
 
 static void pf_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
@@ -268,11 +271,20 @@ static void stop_connectivity_watchers(grpc_exec_ctx *exec_ctx,
                                        pick_first_lb_policy *p) {
   if (p->num_subchannels > 0) {
     GPR_ASSERT(p->selected == NULL);
+    if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
+      gpr_log(GPR_DEBUG, "Pick First %p unsubscribing from subchannel %p",
+              (void *)p, (void *)p->subchannels[p->checking_subchannel]);
+    }
     grpc_subchannel_notify_on_state_change(
         exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL,
         &p->connectivity_changed);
     p->updating_subchannels = true;
   } else if (p->selected != NULL) {
+    if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
+      gpr_log(GPR_DEBUG,
+              "Pick First %p unsubscribing from selected subchannel %p",
+              (void *)p, (void *)p->selected);
+    }
     grpc_connected_subchannel_notify_on_state_change(
         exec_ctx, p->selected, NULL, NULL, &p->connectivity_changed);
     p->updating_selected = true;
@@ -451,12 +463,25 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
   grpc_subchannel *selected_subchannel;
   pending_pick *pp;
 
+  if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
+    gpr_log(
+        GPR_DEBUG,
+        "Pick First %p connectivity changed. Updating selected: %d; Updating "
+        "subchannels: %d; Checking %lu index (%lu total); State: %d; ",
+        (void *)p, p->updating_selected, p->updating_subchannels,
+        (unsigned long)p->checking_subchannel,
+        (unsigned long)p->num_subchannels, p->checking_connectivity);
+  }
   bool restart = false;
-  if (p->updating_selected && error == GRPC_ERROR_CANCELLED) {
+  if (p->updating_selected && error != GRPC_ERROR_NONE) {
     /* Captured the unsubscription for p->selected */
     GPR_ASSERT(p->selected != NULL);
     GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected,
                                     "pf_update_connectivity");
+    if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
+      gpr_log(GPR_DEBUG, "Pick First %p unreffing selected subchannel %p",
+              (void *)p, (void *)p->selected);
+    }
     p->updating_selected = false;
     if (p->num_new_subchannels == 0) {
       p->selected = NULL;
@@ -464,12 +489,16 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
     }
     restart = true;
   }
-  if (p->updating_subchannels && error == GRPC_ERROR_CANCELLED) {
+  if (p->updating_subchannels && error != GRPC_ERROR_NONE) {
     /* Captured the unsubscription for the checking subchannel */
     GPR_ASSERT(p->selected == NULL);
     for (size_t i = 0; i < p->num_subchannels; i++) {
       GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i],
                             "pf_update_connectivity");
+      if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
+        gpr_log(GPR_DEBUG, "Pick First %p unreffing subchannel %p", (void *)p,
+                (void *)p->subchannels[i]);
+      }
     }
     gpr_free(p->subchannels);
     p->subchannels = NULL;
@@ -481,14 +510,12 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
   if (restart) {
     p->selected = NULL;
     p->selected_key = NULL;
-
     GPR_ASSERT(p->new_subchannels != NULL);
     GPR_ASSERT(p->num_new_subchannels > 0);
     p->num_subchannels = p->num_new_subchannels;
     p->subchannels = p->new_subchannels;
     p->num_new_subchannels = 0;
     p->new_subchannels = NULL;
-
     if (p->started_picking) {
       /* If we were picking, continue to do so over the new subchannels,
        * starting from the 0th index. */
@@ -542,7 +569,9 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
             "picked_first");
 
         if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
-          gpr_log(GPR_INFO, "Selected subchannel %p", (void *)p->selected);
+          gpr_log(GPR_INFO,
+                  "Pick First %p selected subchannel %p (connected %p)",
+                  (void *)p, (void *)selected_subchannel, (void *)p->selected);
         }
         p->selected_key = grpc_subchannel_get_key(selected_subchannel);
         /* drop the pick list: we are connected now */
@@ -568,7 +597,8 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
         p->checking_subchannel =
             (p->checking_subchannel + 1) % p->num_subchannels;
         if (p->checking_subchannel == 0) {
-          /* only trigger transient failure when we've tried all alternatives */
+          /* only trigger transient failure when we've tried all alternatives
+           */
           grpc_connectivity_state_set(
               exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
               GRPC_ERROR_REF(error), "connecting_transient_failure");
@@ -652,6 +682,9 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
                                          grpc_lb_policy_args *args) {
   GPR_ASSERT(args->client_channel_factory != NULL);
   pick_first_lb_policy *p = gpr_zalloc(sizeof(*p));
+  if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
+    gpr_log(GPR_DEBUG, "Pick First %p created.", (void *)p);
+  }
   pf_update_locked(exec_ctx, &p->base, args);
   grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable, args->combiner);
   GRPC_CLOSURE_INIT(&p->connectivity_changed, pf_connectivity_changed_locked, p,
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
index b648bba3fb..8e9d6b0f47 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
@@ -478,29 +478,30 @@ static grpc_connectivity_state update_lb_connectivity_status_locked(
   if (subchannel_list->num_ready > 0) { /* 1) READY */
     grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY,
                                 GRPC_ERROR_NONE, "rr_ready");
-    new_state =  GRPC_CHANNEL_READY;
+    new_state = GRPC_CHANNEL_READY;
   } else if (sd->curr_connectivity_state ==
              GRPC_CHANNEL_CONNECTING) { /* 2) CONNECTING */
     grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
                                 GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE,
                                 "rr_connecting");
-    new_state =  GRPC_CHANNEL_CONNECTING;
+    new_state = GRPC_CHANNEL_CONNECTING;
   } else if (p->subchannel_list->num_shutdown ==
              p->subchannel_list->num_subchannels) { /* 3) SHUTDOWN */
     grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
-                                GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "rr_shutdown");
-    new_state =  GRPC_CHANNEL_SHUTDOWN;
+                                GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error),
+                                "rr_shutdown");
+    new_state = GRPC_CHANNEL_SHUTDOWN;
   } else if (subchannel_list->num_transient_failures ==
              p->subchannel_list->num_subchannels) { /* 4) TRANSIENT_FAILURE */
     grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
-                                GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error),
-                                "rr_transient_failure");
-    new_state =  GRPC_CHANNEL_TRANSIENT_FAILURE;
+                                GRPC_CHANNEL_TRANSIENT_FAILURE,
+                                GRPC_ERROR_REF(error), "rr_transient_failure");
+    new_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
   } else if (subchannel_list->num_idle ==
              p->subchannel_list->num_subchannels) { /* 5) IDLE */
     grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_IDLE,
                                 GRPC_ERROR_NONE, "rr_idle");
-    new_state =  GRPC_CHANNEL_IDLE;
+    new_state = GRPC_CHANNEL_IDLE;
   }
   GRPC_ERROR_UNREF(error);
   return new_state;
@@ -581,14 +582,14 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
         GPR_ASSERT(!sd->subchannel_list->shutting_down);
         if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
           const unsigned long num_subchannels =
-              p->subchannel_list != NULL ? p->subchannel_list->num_subchannels
-                                         : 0;
+              p->subchannel_list != NULL
+                  ? (unsigned long)p->subchannel_list->num_subchannels
+                  : 0;
           gpr_log(GPR_DEBUG,
                   "[RR %p] phasing out subchannel list %p (size %lu) in favor "
                   "of %p (size %lu)",
                   (void *)p, (void *)p->subchannel_list, num_subchannels,
-                  (void *)sd->subchannel_list,
-                  (unsigned long)sd->subchannel_list->num_subchannels);
+                  (void *)sd->subchannel_list, num_subchannels);
         }
         if (p->subchannel_list != NULL) {
           // dispose of the current subchannel_list
@@ -666,9 +667,8 @@ static void rr_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
     grpc_connected_subchannel_ping(exec_ctx, target, closure);
     GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, target, "rr_picked");
   } else {
-    GRPC_CLOSURE_SCHED(
-        exec_ctx, closure,
-        GRPC_ERROR_CREATE_FROM_STATIC_STRING("Round Robin not connected"));
+    GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+                                              "Round Robin not connected"));
   }
 }
 
diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c
index 56ed4371a9..479ba393a2 100644
--- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c
+++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c
@@ -101,6 +101,9 @@ static void fake_resolver_maybe_finish_next_locked(grpc_exec_ctx* exec_ctx,
 static void fake_resolver_channel_saw_error_locked(grpc_exec_ctx* exec_ctx,
                                                    grpc_resolver* resolver) {
   fake_resolver* r = (fake_resolver*)resolver;
+  gpr_log(
+      GPR_INFO,
+      "FOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOO");
   if (r->next_results == NULL && r->results_upon_error != NULL) {
     // Pretend we re-resolved.
     r->next_results = grpc_channel_args_copy(r->results_upon_error);
-- 
GitLab