From 49924e0e62d29499874e93f5dcf5976edcf610f4 Mon Sep 17 00:00:00 2001
From: Craig Tiller <craig.tiller@gmail.com>
Date: Mon, 29 Jun 2015 22:42:33 -0700
Subject: [PATCH] Better handling of cancellation, uri parse errors, and
 disconnection

---
 src/core/channel/client_channel.c          |  4 +++
 src/core/client_config/resolver_registry.c |  7 +++--
 src/core/client_config/subchannel.c        | 14 +--------
 src/core/client_config/uri_parser.c        | 36 ++++++++++++----------
 src/core/client_config/uri_parser.h        |  2 +-
 src/core/surface/server.c                  | 23 ++++++++++++--
 src/core/transport/chttp2_transport.c      |  4 +--
 test/core/client_config/uri_parser_test.c  |  4 +--
 8 files changed, 54 insertions(+), 40 deletions(-)

diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index d465a970b9..e62a262bab 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -242,6 +242,7 @@ static void perform_transport_stream_op(grpc_call_element *elem, grpc_transport_
   channel_data *chand = elem->channel_data;
   grpc_subchannel_call *subchannel_call;
   grpc_lb_policy *lb_policy;
+  grpc_transport_stream_op op2;
   GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
   GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
 
@@ -263,8 +264,11 @@ static void perform_transport_stream_op(grpc_call_element *elem, grpc_transport_
       if (!continuation) {
         if (op->cancel_with_status != GRPC_STATUS_OK) {
           calld->state = CALL_CANCELLED;
+          op2 = calld->waiting_op;
+          memset(&calld->waiting_op, 0, sizeof(calld->waiting_op));
           gpr_mu_unlock(&calld->mu_state);
           handle_op_after_cancellation(elem, op);
+          handle_op_after_cancellation(elem, &op2);
         } else {
           GPR_ASSERT((calld->waiting_op.send_ops == NULL) !=
                      (op->send_ops == NULL));
diff --git a/src/core/client_config/resolver_registry.c b/src/core/client_config/resolver_registry.c
index abdb5f9377..16be2da994 100644
--- a/src/core/client_config/resolver_registry.c
+++ b/src/core/client_config/resolver_registry.c
@@ -100,18 +100,21 @@ grpc_resolver *grpc_resolver_create(
   grpc_resolver_factory *factory = NULL;
   grpc_resolver *resolver;
 
-  uri = grpc_uri_parse(name);
+  uri = grpc_uri_parse(name, 1);
   factory = lookup_factory(uri);
   if (factory == NULL && g_default_resolver_scheme != NULL) {
     grpc_uri_destroy(uri);
     gpr_asprintf(&tmp, "%s%s", g_default_resolver_scheme, name);
-    uri = grpc_uri_parse(tmp);
+    uri = grpc_uri_parse(tmp, 1);
     factory = lookup_factory(uri);
     if (factory == NULL) {
+      grpc_uri_destroy(grpc_uri_parse(name, 0));
+      grpc_uri_destroy(grpc_uri_parse(tmp, 0));
       gpr_log(GPR_ERROR, "don't know how to resolve '%s' or '%s'", name, tmp);
     }
     gpr_free(tmp);
   } else if (factory == NULL) {
+    grpc_uri_destroy(grpc_uri_parse(name, 0));
     gpr_log(GPR_ERROR, "don't know how to resolve '%s'", name);
   }
   resolver =
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index c8c562f29d..cae6db0110 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -409,7 +409,6 @@ static void on_state_changed(void *p, int iomgr_success) {
   grpc_transport_op op;
   grpc_channel_element *elem;
   connection *destroy_connection = NULL;
-  int do_connect = 0;
 
   gpr_mu_lock(mu);
 
@@ -436,6 +435,7 @@ static void on_state_changed(void *p, int iomgr_success) {
       gpr_mu_unlock(mu);
       return;
     case GRPC_CHANNEL_FATAL_FAILURE:
+    case GRPC_CHANNEL_TRANSIENT_FAILURE:
       /* things have gone wrong, deactivate and enter idle */
       if (sw->subchannel->active->refs == 0) {
         destroy_connection = sw->subchannel->active;
@@ -444,15 +444,6 @@ static void on_state_changed(void *p, int iomgr_success) {
       grpc_connectivity_state_set(&c->state_tracker,
                                   GRPC_CHANNEL_TRANSIENT_FAILURE);
       break;
-    case GRPC_CHANNEL_TRANSIENT_FAILURE:
-      /* things are starting to go wrong, reconnect but don't deactivate */
-      /* released by connection */
-      SUBCHANNEL_REF_LOCKED(c, "connecting");
-      grpc_connectivity_state_set(&c->state_tracker,
-                                  GRPC_CHANNEL_TRANSIENT_FAILURE);
-      do_connect = 1;
-      c->connecting = 1;
-      break;
   }
 
 done:
@@ -460,9 +451,6 @@ done:
   destroy = SUBCHANNEL_UNREF_LOCKED(c, "state_watcher");
   gpr_free(sw);
   gpr_mu_unlock(mu);
-  if (do_connect) {
-    start_connect(c);
-  }
   if (destroy) {
     subchannel_destroy(c);
   }
diff --git a/src/core/client_config/uri_parser.c b/src/core/client_config/uri_parser.c
index 43b5b47f55..c5faab5eba 100644
--- a/src/core/client_config/uri_parser.c
+++ b/src/core/client_config/uri_parser.c
@@ -39,20 +39,22 @@
 #include <grpc/support/log.h>
 #include <grpc/support/string_util.h>
 
-static grpc_uri *bad_uri(const char *uri_text, int pos, const char *section) {
+static grpc_uri *bad_uri(const char *uri_text, int pos, const char *section, int suppress_errors) {
   char *line_prefix;
   int pfx_len;
 
-  gpr_asprintf(&line_prefix, "bad uri.%s: '", section);
-  pfx_len = strlen(line_prefix) + pos;
-  gpr_log(GPR_ERROR, "%s%s'", line_prefix, uri_text);
-  gpr_free(line_prefix);
+  if (!suppress_errors) {
+    gpr_asprintf(&line_prefix, "bad uri.%s: '", section);
+    pfx_len = strlen(line_prefix) + pos;
+    gpr_log(GPR_ERROR, "%s%s'", line_prefix, uri_text);
+    gpr_free(line_prefix);
 
-  line_prefix = gpr_malloc(pfx_len + 1);
-  memset(line_prefix, ' ', pfx_len);
-  line_prefix[pfx_len] = 0;
-  gpr_log(GPR_ERROR, "%s^ here", line_prefix);
-  gpr_free(line_prefix);
+    line_prefix = gpr_malloc(pfx_len + 1);
+    memset(line_prefix, ' ', pfx_len);
+    line_prefix[pfx_len] = 0;
+    gpr_log(GPR_ERROR, "%s^ here", line_prefix);
+    gpr_free(line_prefix);
+  }
 
   return NULL;
 }
@@ -64,7 +66,7 @@ static char *copy_fragment(const char *src, int begin, int end) {
   return out;
 }
 
-grpc_uri *grpc_uri_parse(const char *uri_text) {
+grpc_uri *grpc_uri_parse(const char *uri_text, int suppress_errors) {
   grpc_uri *uri;
   int scheme_begin = 0;
   int scheme_end = -1;
@@ -90,7 +92,7 @@ grpc_uri *grpc_uri_parse(const char *uri_text) {
     break;
   }
   if (scheme_end == -1) {
-    return bad_uri(uri_text, i, "scheme");
+    return bad_uri(uri_text, i, "scheme", suppress_errors);
   }
 
   if (uri_text[scheme_end + 1] == '/' && uri_text[scheme_end + 2] == '/') {
@@ -100,17 +102,17 @@ grpc_uri *grpc_uri_parse(const char *uri_text) {
         authority_end = i;
       }
       if (uri_text[i] == '?') {
-        return bad_uri(uri_text, i, "query_not_supported");
+        return bad_uri(uri_text, i, "query_not_supported", suppress_errors);
       }
       if (uri_text[i] == '#') {
-        return bad_uri(uri_text, i, "fragment_not_supported");
+        return bad_uri(uri_text, i, "fragment_not_supported", suppress_errors);
       }
     }
     if (authority_end == -1 && uri_text[i] == 0) {
       authority_end = i;
     }
     if (authority_end == -1) {
-      return bad_uri(uri_text, i, "authority");
+      return bad_uri(uri_text, i, "authority", suppress_errors);
     }
     /* TODO(ctiller): parse the authority correctly */
     path_begin = authority_end;
@@ -120,10 +122,10 @@ grpc_uri *grpc_uri_parse(const char *uri_text) {
 
   for (i = path_begin; uri_text[i] != 0; i++) {
     if (uri_text[i] == '?') {
-      return bad_uri(uri_text, i, "query_not_supported");
+      return bad_uri(uri_text, i, "query_not_supported", suppress_errors);
     }
     if (uri_text[i] == '#') {
-      return bad_uri(uri_text, i, "fragment_not_supported");
+      return bad_uri(uri_text, i, "fragment_not_supported", suppress_errors);
     }
   }
   path_end = i;
diff --git a/src/core/client_config/uri_parser.h b/src/core/client_config/uri_parser.h
index b6821f9621..ce4e6aecb0 100644
--- a/src/core/client_config/uri_parser.h
+++ b/src/core/client_config/uri_parser.h
@@ -41,7 +41,7 @@ typedef struct {
 } grpc_uri;
 
 /** parse a uri, return NULL on failure */
-grpc_uri *grpc_uri_parse(const char *uri_text);
+grpc_uri *grpc_uri_parse(const char *uri_text, int suppress_errors);
 
 /** destroy a uri */
 void grpc_uri_destroy(grpc_uri *uri);
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 383c3d921d..703eeaf2bd 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -204,7 +204,9 @@ struct call_data {
 
 typedef struct {
   grpc_channel **channels;
+  grpc_channel **disconnects;
   size_t num_channels;
+  size_t num_disconnects;
 } channel_broadcaster;
 
 #define SERVER_FROM_CALL_ELEM(elem) \
@@ -223,18 +225,28 @@ static void maybe_finish_shutdown(grpc_server *server);
 static void channel_broadcaster_init(grpc_server *s, channel_broadcaster *cb) {
   channel_data *c;
   size_t count = 0;
+  size_t dc_count = 0;
   for (c = s->root_channel_data.next; c != &s->root_channel_data;
        c = c->next) {
     count ++;
+    if (c->num_calls == 0) {
+      dc_count ++;
+    }
   }
   cb->num_channels = count;
+  cb->num_disconnects = dc_count;
   cb->channels = gpr_malloc(sizeof(*cb->channels) * cb->num_channels);
+  cb->disconnects = gpr_malloc(sizeof(*cb->channels) * cb->num_disconnects);
   count = 0;
+  dc_count = 0;
   for (c = s->root_channel_data.next; c != &s->root_channel_data;
        c = c->next) {
-    cb->channels[count] = c->channel;
+    cb->channels[count++] = c->channel;
     GRPC_CHANNEL_INTERNAL_REF(c->channel, "broadcast");
-    count ++;
+    if (c->num_calls == 0) {
+      cb->disconnects[dc_count++] = c->channel;
+      GRPC_CHANNEL_INTERNAL_REF(c->channel, "broadcast-disconnect");
+    }
   }
 }
 
@@ -274,10 +286,15 @@ static void channel_broadcaster_shutdown(channel_broadcaster *cb, int send_goawa
   size_t i;
 
   for (i = 0; i < cb->num_channels; i++) {
-    send_shutdown(cb->channels[i], send_goaway, send_disconnect);
+    send_shutdown(cb->channels[i], 1, 0);
     GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast");
   }
+  for (i = 0; i < cb->num_disconnects; i++) {
+    send_shutdown(cb->disconnects[i], 0, 1);
+    GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast-disconnect");
+  }
   gpr_free(cb->channels);
+  gpr_free(cb->disconnects);
 }
 
 /* call list */
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 08a767f1d5..e32071e692 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -553,8 +553,7 @@ static void maybe_start_some_streams(
     transport_global->next_stream_id += 2;
 
     if (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID) {
-      connectivity_state_set(transport_global,
-                                  GRPC_CHANNEL_TRANSIENT_FAILURE);
+      connectivity_state_set(transport_global, GRPC_CHANNEL_TRANSIENT_FAILURE);
     }
 
     stream_global->outgoing_window =
@@ -940,6 +939,7 @@ static void schedule_closure_for_connectivity(void *a, grpc_iomgr_closure *closu
 }
 
 static void connectivity_state_set(grpc_chttp2_transport_global *transport_global, grpc_connectivity_state state) {
+  GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_DEBUG, "set connectivity_state=%d", state));
   grpc_connectivity_state_set_with_scheduler(
     &TRANSPORT_FROM_GLOBAL(transport_global)->channel_callback.state_tracker,
     state,
diff --git a/test/core/client_config/uri_parser_test.c b/test/core/client_config/uri_parser_test.c
index 26566d0924..287a9d1add 100644
--- a/test/core/client_config/uri_parser_test.c
+++ b/test/core/client_config/uri_parser_test.c
@@ -41,7 +41,7 @@
 
 static void test_succeeds(const char *uri_text, const char *scheme,
                           const char *authority, const char *path) {
-  grpc_uri *uri = grpc_uri_parse(uri_text);
+  grpc_uri *uri = grpc_uri_parse(uri_text, 0);
   GPR_ASSERT(uri);
   GPR_ASSERT(0 == strcmp(scheme, uri->scheme));
   GPR_ASSERT(0 == strcmp(authority, uri->authority));
@@ -50,7 +50,7 @@ static void test_succeeds(const char *uri_text, const char *scheme,
 }
 
 static void test_fails(const char *uri_text) {
-  GPR_ASSERT(NULL == grpc_uri_parse(uri_text));
+  GPR_ASSERT(NULL == grpc_uri_parse(uri_text, 0));
 }
 
 int main(int argc, char **argv) {
-- 
GitLab