From 4b678bd7227952da8d3a690db64baf0ff1af1d2f Mon Sep 17 00:00:00 2001
From: Craig Tiller <ctiller@google.com>
Date: Tue, 2 Jun 2015 16:12:24 -0700
Subject: [PATCH] Fix threading problem on early orphaning

---
 src/core/iomgr/fd_posix.c         | 13 ++++++++-----
 src/core/iomgr/fd_posix.h         |  5 ++++-
 src/core/iomgr/tcp_client_posix.c |  9 +++++----
 src/core/iomgr/tcp_posix.c        |  2 +-
 src/core/iomgr/tcp_server_posix.c |  2 +-
 test/core/iomgr/fd_posix_test.c   |  8 ++++----
 6 files changed, 23 insertions(+), 16 deletions(-)

diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c
index 6c666bd420..2ac1866a66 100644
--- a/src/core/iomgr/fd_posix.c
+++ b/src/core/iomgr/fd_posix.c
@@ -114,7 +114,8 @@ static void destroy(grpc_fd *fd) {
 #define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
 static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file, int line) {
   gpr_log(GPR_DEBUG, "FD %d %p  ref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
-          fd->refst, fd->refst + n, reason, file, line);
+          gpr_atm_no_barrier_load(&fd->refst),
+          gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
 #else
 #define REF_BY(fd, n, reason) ref_by(fd, n)
 #define UNREF_BY(fd, n, reason) unref_by(fd, n)
@@ -127,7 +128,8 @@ static void ref_by(grpc_fd *fd, int n) {
 static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file, int line) {
   gpr_atm old;
   gpr_log(GPR_DEBUG, "FD %d %p unref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
-          fd->refst, fd->refst - n, reason, file, line);
+          gpr_atm_no_barrier_load(&fd->refst),
+          gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
 #else
 static void unref_by(grpc_fd *fd, int n) {
   gpr_atm old;
@@ -195,14 +197,15 @@ static void wake_all_watchers_locked(grpc_fd *fd) {
   }
 }
 
-void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done) {
+void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done,
+                    const char *reason) {
   fd->on_done_closure = on_done;
   shutdown(fd->fd, SHUT_RDWR);
-  REF_BY(fd, 1, "orphan"); /* remove active status, but keep referenced */
+  REF_BY(fd, 1, reason); /* remove active status, but keep referenced */
   gpr_mu_lock(&fd->watcher_mu);
   wake_all_watchers_locked(fd);
   gpr_mu_unlock(&fd->watcher_mu);
-  UNREF_BY(fd, 2, "orphan"); /* drop the reference */
+  UNREF_BY(fd, 2, reason); /* drop the reference */
 }
 
 /* increment refcount by two to avoid changing the orphan bit */
diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h
index 1de5d088c5..ae85550aba 100644
--- a/src/core/iomgr/fd_posix.h
+++ b/src/core/iomgr/fd_posix.h
@@ -109,7 +109,8 @@ grpc_fd *grpc_fd_create(int fd, const char *name);
    If on_done is NULL, no callback will be made.
    Requires: *fd initialized; no outstanding notify_on_read or
    notify_on_write. */
-void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done);
+void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done,
+                    const char *reason);
 
 /* Begin polling on an fd.
    Registers that the given pollset is interested in this fd - so that if read
@@ -159,6 +160,8 @@ void grpc_fd_become_readable(grpc_fd *fd, int allow_synchronous_callback);
 void grpc_fd_become_writable(grpc_fd *fd, int allow_synchronous_callback);
 
 /* Reference counting for fds */
+#define GRPC_FD_REF_COUNT_DEBUG
+
 #ifdef GRPC_FD_REF_COUNT_DEBUG
 void grpc_fd_ref(grpc_fd *fd, const char *reason, const char *file, int line);
 void grpc_fd_unref(grpc_fd *fd, const char *reason, const char *file, int line);
diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c
index 93c14c4aab..2cd4aa2f45 100644
--- a/src/core/iomgr/tcp_client_posix.c
+++ b/src/core/iomgr/tcp_client_posix.c
@@ -164,7 +164,7 @@ static void on_writable(void *acp, int success) {
 finish:
   gpr_mu_lock(&ac->mu);
   if (!ep) {
-    grpc_fd_orphan(ac->fd, NULL);
+    grpc_fd_orphan(ac->fd, NULL, "tcp_client_orphan");
   }
   done = (--ac->refs == 0);
   gpr_mu_unlock(&ac->mu);
@@ -220,7 +220,6 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
   gpr_asprintf(&name, "tcp-client:%s", addr_str);
 
   fdobj = grpc_fd_create(fd, name);
-  grpc_pollset_set_add_fd(interested_parties, fdobj);
 
   if (err >= 0) {
     cb(arg,
@@ -229,12 +228,14 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
   }
 
   if (errno != EWOULDBLOCK && errno != EINPROGRESS) {
-    gpr_log(GPR_ERROR, "connect error to '%s': %s", strerror(errno));
-    grpc_fd_orphan(fdobj, NULL);
+    gpr_log(GPR_ERROR, "connect error to '%s': %s", addr_str, strerror(errno));
+    grpc_fd_orphan(fdobj, NULL, "tcp_client_connect_error");
     cb(arg, NULL);
     goto done;
   }
 
+  grpc_pollset_set_add_fd(interested_parties, fdobj);
+
   ac = gpr_malloc(sizeof(async_connect));
   ac->cb = cb;
   ac->cb_arg = arg;
diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c
index 4fbbaa7c7d..e3289f6806 100644
--- a/src/core/iomgr/tcp_posix.c
+++ b/src/core/iomgr/tcp_posix.c
@@ -295,7 +295,7 @@ static void grpc_tcp_shutdown(grpc_endpoint *ep) {
 static void grpc_tcp_unref(grpc_tcp *tcp) {
   int refcount_zero = gpr_unref(&tcp->refcount);
   if (refcount_zero) {
-    grpc_fd_orphan(tcp->em_fd, NULL);
+    grpc_fd_orphan(tcp->em_fd, NULL, "tcp_unref_orphan");
     gpr_free(tcp);
   }
 }
diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c
index fe71bdfe6f..759a493795 100644
--- a/src/core/iomgr/tcp_server_posix.c
+++ b/src/core/iomgr/tcp_server_posix.c
@@ -178,7 +178,7 @@ static void deactivated_all_ports(grpc_tcp_server *s) {
       }
       sp->destroyed_closure.cb = destroyed_port;
       sp->destroyed_closure.cb_arg = s;
-      grpc_fd_orphan(sp->emfd, &sp->destroyed_closure);
+      grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, "tcp_listener_shutdown");
     }
     gpr_mu_unlock(&s->mu);
   } else {
diff --git a/test/core/iomgr/fd_posix_test.c b/test/core/iomgr/fd_posix_test.c
index 2d1ae45bc5..b1be316a4e 100644
--- a/test/core/iomgr/fd_posix_test.c
+++ b/test/core/iomgr/fd_posix_test.c
@@ -120,7 +120,7 @@ static void session_shutdown_cb(void *arg, /*session*/
                                 int success) {
   session *se = arg;
   server *sv = se->sv;
-  grpc_fd_orphan(se->em_fd, NULL);
+  grpc_fd_orphan(se->em_fd, NULL, "a");
   gpr_free(se);
   /* Start to shutdown listen fd. */
   grpc_fd_shutdown(sv->em_fd);
@@ -175,7 +175,7 @@ static void session_read_cb(void *arg, /*session*/
 static void listen_shutdown_cb(void *arg /*server*/, int success) {
   server *sv = arg;
 
-  grpc_fd_orphan(sv->em_fd, NULL);
+  grpc_fd_orphan(sv->em_fd, NULL, "b");
 
   gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
   sv->done = 1;
@@ -284,7 +284,7 @@ static void client_init(client *cl) {
 /* Called when a client upload session is ready to shutdown. */
 static void client_session_shutdown_cb(void *arg /*client*/, int success) {
   client *cl = arg;
-  grpc_fd_orphan(cl->em_fd, NULL);
+  grpc_fd_orphan(cl->em_fd, NULL, "c");
   cl->done = 1;
   grpc_pollset_kick(&g_pollset);
 }
@@ -472,7 +472,7 @@ static void test_grpc_fd_change(void) {
   GPR_ASSERT(b.cb_that_ran == second_read_callback);
   gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
 
-  grpc_fd_orphan(em_fd, NULL);
+  grpc_fd_orphan(em_fd, NULL, "d");
   destroy_change_data(&a);
   destroy_change_data(&b);
   close(sv[1]);
-- 
GitLab