From eefa549fb57498d6724bf10d9a4197e12e17e41a Mon Sep 17 00:00:00 2001
From: Craig Tiller <ctiller@google.com>
Date: Mon, 8 Jun 2015 12:36:04 -0700
Subject: [PATCH] Split channel level and call level callback serialization

This effectively doubles our qps/core on qps_test on my machine.
---
 src/core/transport/chttp2_transport.c | 64 +++++++++++++++------------
 1 file changed, 35 insertions(+), 29 deletions(-)

diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 630504565b..1a59999598 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -230,7 +230,8 @@ struct transport {
   /* basic state management - what are we doing at the moment? */
   gpr_uint8 reading;
   gpr_uint8 writing;
-  gpr_uint8 calling_back;
+  gpr_uint8 calling_back_channel;
+  gpr_uint8 calling_back_ops;
   gpr_uint8 destroying;
   gpr_uint8 closed;
   error_state error_state;
@@ -357,7 +358,7 @@ static void push_setting(transport *t, grpc_chttp2_setting_id id,
                          gpr_uint32 value);
 
 static int prepare_callbacks(transport *t);
-static void run_callbacks(transport *t, const grpc_transport_callbacks *cb);
+static void run_callbacks(transport *t);
 static void call_cb_closed(transport *t, const grpc_transport_callbacks *cb);
 
 static int prepare_write(transport *t);
@@ -565,7 +566,7 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
   }
 
   gpr_mu_lock(&t->mu);
-  t->calling_back = 1;
+  t->calling_back_channel = 1;
   ref_transport(t); /* matches unref at end of this function */
   gpr_mu_unlock(&t->mu);
 
@@ -574,7 +575,7 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
   lock(t);
   t->cb = sr.callbacks;
   t->cb_user_data = sr.user_data;
-  t->calling_back = 0;
+  t->calling_back_channel = 0;
   if (t->destroying) gpr_cv_signal(&t->cv);
   unlock(t);
 
@@ -595,7 +596,7 @@ static void destroy_transport(grpc_transport *gt) {
      We need to be not writing as cancellation finalization may produce some
      callbacks that NEED to be made to close out some streams when t->writing
      becomes 0. */
-  while (t->calling_back || t->writing) {
+  while (t->calling_back_channel || t->writing) {
     gpr_cv_wait(&t->cv, &t->mu, gpr_inf_future);
   }
   drop_connection(t);
@@ -830,28 +831,29 @@ static void unlock(transport *t) {
   finish_reads(t);
 
   /* gather any callbacks that need to be made */
-  if (!t->calling_back) {
-    t->calling_back = perform_callbacks = prepare_callbacks(t);
-    if (cb) {
-      if (t->error_state == ERROR_STATE_SEEN && !t->writing) {
-        call_closed = 1;
-        t->calling_back = 1;
-        t->cb = NULL; /* no more callbacks */
-        t->error_state = ERROR_STATE_NOTIFIED;
-      }
-      if (t->num_pending_goaways) {
-        goaways = t->pending_goaways;
-        num_goaways = t->num_pending_goaways;
-        t->pending_goaways = NULL;
-        t->num_pending_goaways = 0;
-        t->cap_pending_goaways = 0;
-        t->calling_back = 1;
-      }
-    }
+  if (!t->calling_back_ops) {
+    t->calling_back_ops = perform_callbacks = prepare_callbacks(t);
+    if (perform_callbacks) ref_transport(t);
   }
 
-  if (perform_callbacks || call_closed || num_goaways) {
-    ref_transport(t);
+  if (!t->calling_back_channel && cb) {
+    if (t->error_state == ERROR_STATE_SEEN && !t->writing) {
+      call_closed = 1;
+      t->calling_back_channel = 1;
+      t->cb = NULL; /* no more callbacks */
+      t->error_state = ERROR_STATE_NOTIFIED;
+    }
+    if (t->num_pending_goaways) {
+      goaways = t->pending_goaways;
+      num_goaways = t->num_pending_goaways;
+      t->pending_goaways = NULL;
+      t->num_pending_goaways = 0;
+      t->cap_pending_goaways = 0;
+      t->calling_back_channel = 1;
+    }
+    if (call_closed || num_goaways) {
+      ref_transport(t);
+    }
   }
 
   /* finally unlock */
@@ -865,7 +867,11 @@ static void unlock(transport *t) {
   }
 
   if (perform_callbacks) {
-    run_callbacks(t, cb);
+    run_callbacks(t);
+    lock(t);
+    t->calling_back_ops = 0;
+    unlock(t);
+    unref_transport(t);
   }
 
   if (call_closed) {
@@ -878,9 +884,9 @@ static void unlock(transport *t) {
     perform_write(t, ep);
   }
 
-  if (perform_callbacks || call_closed || num_goaways) {
+  if (call_closed || num_goaways) {
     lock(t);
-    t->calling_back = 0;
+    t->calling_back_channel = 0;
     if (t->destroying) gpr_cv_signal(&t->cv);
     unlock(t);
     unref_transport(t);
@@ -2101,7 +2107,7 @@ static int prepare_callbacks(transport *t) {
   return t->executing_callbacks.count > 0;
 }
 
-static void run_callbacks(transport *t, const grpc_transport_callbacks *cb) {
+static void run_callbacks(transport *t) {
   size_t i;
   for (i = 0; i < t->executing_callbacks.count; i++) {
     op_closure c = t->executing_callbacks.callbacks[i];
-- 
GitLab