From f6b6d2984157cc24ee69afb9b025b5d250a5a9ad Mon Sep 17 00:00:00 2001
From: Craig Tiller <ctiller@google.com>
Date: Fri, 2 Sep 2016 09:27:26 -0700
Subject: [PATCH] Address review comments

---
 src/core/lib/iomgr/combiner.c   |  9 +++++++--
 src/core/lib/iomgr/combiner.h   |  2 +-
 src/core/lib/support/mpscq.c    | 11 +++++++----
 src/core/lib/support/mpscq.h    |  4 ++--
 test/core/iomgr/combiner_test.c |  3 ++-
 5 files changed, 19 insertions(+), 10 deletions(-)

diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c
index eb5ad634bd..831bdb4aff 100644
--- a/src/core/lib/iomgr/combiner.c
+++ b/src/core/lib/iomgr/combiner.c
@@ -176,8 +176,10 @@ static bool maybe_finish_one(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
       gpr_log(GPR_DEBUG, "C:%p maybe_finish_one n=%p", lock, n));
   GPR_ASSERT(exec_ctx->active_combiner == lock);
   if (n == NULL) {
-    // queue is in an inconsistant state: use this as a cue that we should
-    // go off and do something else for a while (and come back later)
+    // Queue is in an transiently inconsistent state: a new item is being queued
+    // but is not visible to this thread yet.
+    // Use this as a cue that we should go off and do something else for a while
+    // (and come back later)
     grpc_closure_init(&lock->continue_finishing, continue_finishing_mainline,
                       lock);
     grpc_exec_ctx_sched(exec_ctx, &lock->continue_finishing, GRPC_ERROR_NONE,
@@ -204,6 +206,9 @@ static void finish(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
                                 "C:%p finish[%d] old_state=%" PRIdPTR, lock,
                                 loops, old_state));
     switch (old_state) {
+      default:
+        // we have multiple queued work items: just continue executing them
+        break;
       case 5:  // we're down to one queued item: if it's the final list we
       case 4:  // should do that
         if (!grpc_closure_list_empty(lock->final_list)) {
diff --git a/src/core/lib/iomgr/combiner.h b/src/core/lib/iomgr/combiner.h
index 57df8f0ba8..08acbb7441 100644
--- a/src/core/lib/iomgr/combiner.h
+++ b/src/core/lib/iomgr/combiner.h
@@ -41,7 +41,7 @@
 #include "src/core/lib/support/mpscq.h"
 
 // Provides serialized access to some resource.
-// Each action queued on an aelock is executed serially in a borrowed thread.
+// Each action queued on a combiner is executed serially in a borrowed thread.
 // The actual thread executing actions may change over time (but there will only
 // every be one at a time).
 
diff --git a/src/core/lib/support/mpscq.c b/src/core/lib/support/mpscq.c
index 25b055b172..cdd6335f82 100644
--- a/src/core/lib/support/mpscq.c
+++ b/src/core/lib/support/mpscq.c
@@ -38,7 +38,7 @@
 void gpr_mpscq_init(gpr_mpscq *q) {
   gpr_atm_no_barrier_store(&q->head, (gpr_atm)&q->stub);
   q->tail = &q->stub;
-  gpr_atm_no_barrier_store(&q->stub.next, 0);
+  gpr_atm_no_barrier_store(&q->stub.next, (gpr_atm)NULL);
 }
 
 void gpr_mpscq_destroy(gpr_mpscq *q) {
@@ -47,16 +47,17 @@ void gpr_mpscq_destroy(gpr_mpscq *q) {
 }
 
 void gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n) {
-  gpr_atm_no_barrier_store(&n->next, 0);
+  gpr_atm_no_barrier_store(&n->next, (gpr_atm)NULL);
   gpr_mpscq_node *prev =
       (gpr_mpscq_node *)gpr_atm_full_xchg(&q->head, (gpr_atm)n);
-  gpr_atm_rel_store(&prev->next, (gpr_atm)n);
+  gpr_atm_no_barrier_store(&prev->next, (gpr_atm)n);
 }
 
 gpr_mpscq_node *gpr_mpscq_pop(gpr_mpscq *q) {
   gpr_mpscq_node *tail = q->tail;
   gpr_mpscq_node *next = (gpr_mpscq_node *)gpr_atm_acq_load(&tail->next);
   if (tail == &q->stub) {
+    // indicates the list is actually (ephemerally) empty
     if (next == NULL) return NULL;
     q->tail = next;
     tail = next;
@@ -68,7 +69,8 @@ gpr_mpscq_node *gpr_mpscq_pop(gpr_mpscq *q) {
   }
   gpr_mpscq_node *head = (gpr_mpscq_node *)gpr_atm_acq_load(&q->head);
   if (tail != head) {
-    return 0;
+    // indicates a retry is in order: we're still adding
+    return NULL;
   }
   gpr_mpscq_push(q, &q->stub);
   next = (gpr_mpscq_node *)gpr_atm_acq_load(&tail->next);
@@ -76,5 +78,6 @@ gpr_mpscq_node *gpr_mpscq_pop(gpr_mpscq *q) {
     q->tail = next;
     return tail;
   }
+  // indicates a retry is in order: we're still adding
   return NULL;
 }
diff --git a/src/core/lib/support/mpscq.h b/src/core/lib/support/mpscq.h
index 1201edceb1..977a117952 100644
--- a/src/core/lib/support/mpscq.h
+++ b/src/core/lib/support/mpscq.h
@@ -41,8 +41,8 @@
 // implementation from Dmitry Vyukov here:
 // http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
 
-// List node (include this in a data structure and dangle the rest of the
-// interesting bits off the end)
+// List node (include this in a data structure at the top, and add application
+// fields after it - to simulate inheritance)
 typedef struct gpr_mpscq_node { gpr_atm next; } gpr_mpscq_node;
 
 // Actual queue type
diff --git a/test/core/iomgr/combiner_test.c b/test/core/iomgr/combiner_test.c
index 7cf016d82c..197998c1e5 100644
--- a/test/core/iomgr/combiner_test.c
+++ b/test/core/iomgr/combiner_test.c
@@ -80,7 +80,6 @@ typedef struct {
 
 static void check_one(grpc_exec_ctx *exec_ctx, void *a, grpc_error *error) {
   ex_args *args = a;
-  // gpr_log(GPR_DEBUG, "*%p=%d; step %d", args->ctr, *args->ctr, args->value);
   GPR_ASSERT(*args->ctr == args->value - 1);
   *args->ctr = args->value;
   gpr_free(a);
@@ -99,6 +98,8 @@ static void execute_many_loop(void *a) {
                             grpc_closure_create(check_one, c), GRPC_ERROR_NONE);
       grpc_exec_ctx_flush(&exec_ctx);
     }
+    // sleep for a little bit, to test a combiner draining and another thread
+    // picking it up
     gpr_sleep_until(GRPC_TIMEOUT_MILLIS_TO_DEADLINE(100));
   }
   grpc_exec_ctx_finish(&exec_ctx);
-- 
GitLab