From 20cb627339c20e86814d64ba4837d7bdd6f35195 Mon Sep 17 00:00:00 2001
From: Craig Tiller <ctiller@google.com>
Date: Fri, 21 Apr 2017 15:07:13 -0700
Subject: [PATCH] Fix HTTP proxy tests

- heap allocate the pollset shutdown closure (this may be called asynchronously)
- ensure a poller remains until all endpoints are closed
---
 .../end2end/fixtures/http_proxy_fixture.c     | 59 +++++++++++++------
 test/core/end2end/tests/cancel_after_invoke.c |  9 +--
 2 files changed, 47 insertions(+), 21 deletions(-)

diff --git a/test/core/end2end/fixtures/http_proxy_fixture.c b/test/core/end2end/fixtures/http_proxy_fixture.c
index 451ed268d3..02235d96e7 100644
--- a/test/core/end2end/fixtures/http_proxy_fixture.c
+++ b/test/core/end2end/fixtures/http_proxy_fixture.c
@@ -59,6 +59,7 @@
 #include "src/core/lib/iomgr/sockaddr_utils.h"
 #include "src/core/lib/iomgr/tcp_client.h"
 #include "src/core/lib/iomgr/tcp_server.h"
+#include "src/core/lib/iomgr/timer.h"
 #include "src/core/lib/slice/slice_internal.h"
 #include "test/core/util/port.h"
 
@@ -69,7 +70,7 @@ struct grpc_end2end_http_proxy {
   grpc_channel_args* channel_args;
   gpr_mu* mu;
   grpc_pollset* pollset;
-  gpr_atm shutdown;
+  gpr_refcount users;
 };
 
 //
@@ -77,6 +78,8 @@ struct grpc_end2end_http_proxy {
 //
 
 typedef struct proxy_connection {
+  grpc_end2end_http_proxy* proxy;
+
   grpc_endpoint* client_endpoint;
   grpc_endpoint* server_endpoint;
 
@@ -103,13 +106,26 @@ typedef struct proxy_connection {
   grpc_http_request http_request;
 } proxy_connection;
 
+static void proxy_connection_ref(proxy_connection* conn, const char* reason) {
+  gpr_log(GPR_DEBUG, "proxy_connection_ref: %p %s %" PRIdPTR " --> %" PRIdPTR,
+          conn, reason, gpr_atm_no_barrier_load(&conn->refcount.count),
+          gpr_atm_no_barrier_load(&conn->refcount.count) - 1);
+  gpr_ref(&conn->refcount);
+}
+
 // Helper function to destroy the proxy connection.
 static void proxy_connection_unref(grpc_exec_ctx* exec_ctx,
-                                   proxy_connection* conn) {
+                                   proxy_connection* conn, const char* reason) {
+  gpr_log(GPR_DEBUG, "proxy_connection_unref: %p %s %" PRIdPTR " --> %" PRIdPTR,
+          conn, reason, gpr_atm_no_barrier_load(&conn->refcount.count),
+          gpr_atm_no_barrier_load(&conn->refcount.count) - 1);
   if (gpr_unref(&conn->refcount)) {
+    gpr_log(GPR_DEBUG, "endpoints: %p %p", conn->client_endpoint,
+            conn->server_endpoint);
     grpc_endpoint_destroy(exec_ctx, conn->client_endpoint);
-    if (conn->server_endpoint != NULL)
+    if (conn->server_endpoint != NULL) {
       grpc_endpoint_destroy(exec_ctx, conn->server_endpoint);
+    }
     grpc_pollset_set_destroy(exec_ctx, conn->pollset_set);
     grpc_slice_buffer_destroy_internal(exec_ctx, &conn->client_read_buffer);
     grpc_slice_buffer_destroy_internal(exec_ctx,
@@ -121,6 +137,7 @@ static void proxy_connection_unref(grpc_exec_ctx* exec_ctx,
     grpc_slice_buffer_destroy_internal(exec_ctx, &conn->server_write_buffer);
     grpc_http_parser_destroy(&conn->http_parser);
     grpc_http_request_destroy(&conn->http_request);
+    gpr_unref(&conn->proxy->users);
     gpr_free(conn);
   }
 }
@@ -139,7 +156,7 @@ static void proxy_connection_failed(grpc_exec_ctx* exec_ctx,
     grpc_endpoint_shutdown(exec_ctx, conn->server_endpoint,
                            GRPC_ERROR_REF(error));
   }
-  proxy_connection_unref(exec_ctx, conn);
+  proxy_connection_unref(exec_ctx, conn, "conn_failed");
 }
 
 // Callback for writing proxy data to the client.
@@ -163,7 +180,7 @@ static void on_client_write_done(grpc_exec_ctx* exec_ctx, void* arg,
                         &conn->on_client_write_done);
   } else {
     // No more writes.  Unref the connection.
-    proxy_connection_unref(exec_ctx, conn);
+    proxy_connection_unref(exec_ctx, conn, "write_done");
   }
 }
 
@@ -188,7 +205,7 @@ static void on_server_write_done(grpc_exec_ctx* exec_ctx, void* arg,
                         &conn->on_server_write_done);
   } else {
     // No more writes.  Unref the connection.
-    proxy_connection_unref(exec_ctx, conn);
+    proxy_connection_unref(exec_ctx, conn, "server_write");
   }
 }
 
@@ -214,7 +231,7 @@ static void on_client_read_done(grpc_exec_ctx* exec_ctx, void* arg,
   } else {
     grpc_slice_buffer_move_into(&conn->client_read_buffer,
                                 &conn->server_write_buffer);
-    gpr_ref(&conn->refcount);
+    proxy_connection_ref(conn, "client_read");
     grpc_endpoint_write(exec_ctx, conn->server_endpoint,
                         &conn->server_write_buffer,
                         &conn->on_server_write_done);
@@ -246,7 +263,7 @@ static void on_server_read_done(grpc_exec_ctx* exec_ctx, void* arg,
   } else {
     grpc_slice_buffer_move_into(&conn->server_read_buffer,
                                 &conn->client_write_buffer);
-    gpr_ref(&conn->refcount);
+    proxy_connection_ref(conn, "server_read");
     grpc_endpoint_write(exec_ctx, conn->client_endpoint,
                         &conn->client_write_buffer,
                         &conn->on_client_write_done);
@@ -270,7 +287,9 @@ static void on_write_response_done(grpc_exec_ctx* exec_ctx, void* arg,
   // Start reading from both client and server.  One of the read
   // requests inherits our ref to conn, but we need to take a new ref
   // for the other one.
-  gpr_ref(&conn->refcount);
+  proxy_connection_ref(conn, "client_read");
+  proxy_connection_ref(conn, "server_read");
+  proxy_connection_unref(exec_ctx, conn, "write_response");
   grpc_endpoint_read(exec_ctx, conn->client_endpoint, &conn->client_read_buffer,
                      &conn->on_client_read_done);
   grpc_endpoint_read(exec_ctx, conn->server_endpoint, &conn->server_read_buffer,
@@ -312,6 +331,8 @@ static void on_server_connect_done(grpc_exec_ctx* exec_ctx, void* arg,
 static void on_read_request_done(grpc_exec_ctx* exec_ctx, void* arg,
                                  grpc_error* error) {
   proxy_connection* conn = arg;
+  gpr_log(GPR_DEBUG, "on_read_request_done: %p %s", conn,
+          grpc_error_string(error));
   if (error != GRPC_ERROR_NONE) {
     proxy_connection_failed(exec_ctx, conn, true /* is_client */,
                             "HTTP proxy read request", error);
@@ -376,12 +397,15 @@ static void on_accept(grpc_exec_ctx* exec_ctx, void* arg,
   gpr_free(acceptor);
   grpc_end2end_http_proxy* proxy = arg;
   // Instantiate proxy_connection.
-  proxy_connection* conn = gpr_malloc(sizeof(*conn));
-  memset(conn, 0, sizeof(*conn));
+  proxy_connection* conn = gpr_zalloc(sizeof(*conn));
+  gpr_ref(&proxy->users);
   conn->client_endpoint = endpoint;
+  conn->proxy = proxy;
   gpr_ref_init(&conn->refcount, 1);
   conn->pollset_set = grpc_pollset_set_create();
+  gpr_log(GPR_DEBUG, "on_accept: %p", conn);
   grpc_pollset_set_add_pollset(exec_ctx, conn->pollset_set, proxy->pollset);
+  grpc_endpoint_add_to_pollset_set(exec_ctx, endpoint, conn->pollset_set);
   grpc_closure_init(&conn->on_read_request_done, on_read_request_done, conn,
                     grpc_schedule_on_exec_ctx);
   grpc_closure_init(&conn->on_server_connect_done, on_server_connect_done, conn,
@@ -416,6 +440,7 @@ static void thread_main(void* arg) {
   grpc_end2end_http_proxy* proxy = arg;
   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
   do {
+    gpr_ref(&proxy->users);
     const gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
     const gpr_timespec deadline =
         gpr_time_add(now, gpr_time_from_seconds(1, GPR_TIMESPAN));
@@ -426,7 +451,7 @@ static void thread_main(void* arg) {
         grpc_pollset_work(&exec_ctx, proxy->pollset, &worker, now, deadline));
     gpr_mu_unlock(proxy->mu);
     grpc_exec_ctx_flush(&exec_ctx);
-  } while (!gpr_atm_acq_load(&proxy->shutdown));
+  } while (!gpr_unref(&proxy->users));
   grpc_exec_ctx_finish(&exec_ctx);
 }
 
@@ -434,6 +459,7 @@ grpc_end2end_http_proxy* grpc_end2end_http_proxy_create(void) {
   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
   grpc_end2end_http_proxy* proxy = gpr_malloc(sizeof(*proxy));
   memset(proxy, 0, sizeof(*proxy));
+  gpr_ref_init(&proxy->users, 1);
   // Construct proxy address.
   const int proxy_port = grpc_pick_unused_port_or_die();
   gpr_join_host_port(&proxy->proxy_name, "localhost", proxy_port);
@@ -474,17 +500,16 @@ static void destroy_pollset(grpc_exec_ctx* exec_ctx, void* arg,
 }
 
 void grpc_end2end_http_proxy_destroy(grpc_end2end_http_proxy* proxy) {
-  gpr_atm_rel_store(&proxy->shutdown, 1);  // Signal proxy thread to shutdown.
+  gpr_unref(&proxy->users);  // Signal proxy thread to shutdown.
   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
   gpr_thd_join(proxy->thd);
   grpc_tcp_server_shutdown_listeners(&exec_ctx, proxy->server);
   grpc_tcp_server_unref(&exec_ctx, proxy->server);
   gpr_free(proxy->proxy_name);
   grpc_channel_args_destroy(&exec_ctx, proxy->channel_args);
-  grpc_closure destroyed;
-  grpc_closure_init(&destroyed, destroy_pollset, proxy->pollset,
-                    grpc_schedule_on_exec_ctx);
-  grpc_pollset_shutdown(&exec_ctx, proxy->pollset, &destroyed);
+  grpc_pollset_shutdown(&exec_ctx, proxy->pollset,
+                        grpc_closure_create(destroy_pollset, proxy->pollset,
+                                            grpc_schedule_on_exec_ctx));
   gpr_free(proxy);
   grpc_exec_ctx_finish(&exec_ctx);
 }
diff --git a/test/core/end2end/tests/cancel_after_invoke.c b/test/core/end2end/tests/cancel_after_invoke.c
index f2aca737ab..5bc9ed283b 100644
--- a/test/core/end2end/tests/cancel_after_invoke.c
+++ b/test/core/end2end/tests/cancel_after_invoke.c
@@ -49,11 +49,12 @@ static void *tag(intptr_t t) { return (void *)t; }
 static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config,
                                             const char *test_name,
                                             cancellation_mode mode,
+                                            size_t test_ops,
                                             grpc_channel_args *client_args,
                                             grpc_channel_args *server_args) {
   grpc_end2end_test_fixture f;
-  gpr_log(GPR_INFO, "Running test: %s/%s/%s", test_name, config.name,
-          mode.name);
+  gpr_log(GPR_INFO, "Running test: %s/%s/%s [%" PRIdPTR " ops]", test_name,
+          config.name, mode.name, test_ops);
   f = config.create_fixture(client_args, server_args);
   config.init_server(&f, server_args);
   config.init_client(&f, client_args);
@@ -108,8 +109,8 @@ static void test_cancel_after_invoke(grpc_end2end_test_config config,
   grpc_op ops[6];
   grpc_op *op;
   grpc_call *c;
-  grpc_end2end_test_fixture f =
-      begin_test(config, "test_cancel_after_invoke", mode, NULL, NULL);
+  grpc_end2end_test_fixture f = begin_test(config, "test_cancel_after_invoke",
+                                           mode, test_ops, NULL, NULL);
   cq_verifier *cqv = cq_verifier_create(f.cq);
   grpc_metadata_array initial_metadata_recv;
   grpc_metadata_array trailing_metadata_recv;
-- 
GitLab