From 9ccbc4d5e51bcaa0adc3ca6837d93b7c162b3ca6 Mon Sep 17 00:00:00 2001
From: "Mark D. Roth" <roth@google.com>
Date: Wed, 15 Mar 2017 08:30:04 -0700
Subject: [PATCH] Don't use combiner lock for on_complete callback.

---
 src/core/ext/client_channel/client_channel.c | 20 ++++++----
 src/core/ext/client_channel/retry_throttle.c | 42 ++++++++------------
 src/core/ext/client_channel/retry_throttle.h | 10 ++---
 3 files changed, 32 insertions(+), 40 deletions(-)

diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c
index ef273669e8..e0b84ddd66 100644
--- a/src/core/ext/client_channel/client_channel.c
+++ b/src/core/ext/client_channel/client_channel.c
@@ -423,7 +423,8 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
             grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVER_URI);
         GPR_ASSERT(channel_arg != NULL);
         GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
-        grpc_uri *uri = grpc_uri_parse(channel_arg->value.string, true);
+        grpc_uri *uri =
+            grpc_uri_parse(exec_ctx, channel_arg->value.string, true);
         GPR_ASSERT(uri->path[0] != '\0');
         parsing_state.server_name =
             uri->path[0] == '/' ? uri->path + 1 : uri->path;
@@ -738,6 +739,7 @@ typedef struct client_channel_call_data {
   grpc_slice path;  // Request path.
   gpr_timespec call_start_time;
   gpr_timespec deadline;
+  grpc_server_retry_throttle_data *retry_throttle_data;
   method_parameters *method_params;
 
   grpc_error *cancel_error;
@@ -814,7 +816,7 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) {
   gpr_free(ops);
 }
 
-// Sets calld->method_params.
+// Sets calld->method_params and calld->retry_throttle_data.
 // If the method params specify a timeout, populates
 // *per_method_deadline and returns true.
 static bool set_call_method_params_from_service_config_locked(
@@ -822,6 +824,10 @@ static bool set_call_method_params_from_service_config_locked(
     gpr_timespec *per_method_deadline) {
   channel_data *chand = elem->channel_data;
   call_data *calld = elem->call_data;
+  if (chand->retry_throttle_data != NULL) {
+    calld->retry_throttle_data =
+        grpc_server_retry_throttle_data_ref(chand->retry_throttle_data);
+  }
   if (chand->method_params_table != NULL) {
     calld->method_params = grpc_method_config_table_get(
         exec_ctx, chand->method_params_table, calld->path);
@@ -1135,19 +1141,18 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx,
 static void on_complete_locked(grpc_exec_ctx *exec_ctx, void *arg,
                                grpc_error *error) {
   grpc_call_element *elem = arg;
-  channel_data *chand = elem->channel_data;
   call_data *calld = elem->call_data;
-  if (chand->retry_throttle_data != NULL) {
+  if (calld->retry_throttle_data != NULL) {
     if (error == GRPC_ERROR_NONE) {
       grpc_server_retry_throttle_data_record_success(
-          &chand->retry_throttle_data);
+          calld->retry_throttle_data);
     } else {
       // TODO(roth): In a subsequent PR, check the return value here and
       // decide whether or not to retry.  Note that we should only
       // record failures whose statuses match the configured retryable
       // or non-fatal status codes.
       grpc_server_retry_throttle_data_record_failure(
-          &chand->retry_throttle_data);
+          calld->retry_throttle_data);
     }
   }
   grpc_closure_run(exec_ctx, calld->original_on_complete,
@@ -1160,14 +1165,13 @@ static void start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, void *arg,
 
   grpc_transport_stream_op *op = arg;
   grpc_call_element *elem = op->handler_private.args[0];
-  channel_data *chand = elem->channel_data;
   call_data *calld = elem->call_data;
 
   if (op->recv_trailing_metadata != NULL) {
     GPR_ASSERT(op->on_complete != NULL);
     calld->original_on_complete = op->on_complete;
     grpc_closure_init(&calld->on_complete, on_complete_locked, elem,
-                      grpc_combiner_scheduler(chand->combiner, false));
+                      grpc_schedule_on_exec_ctx);
     op->on_complete = &calld->on_complete;
   }
 
diff --git a/src/core/ext/client_channel/retry_throttle.c b/src/core/ext/client_channel/retry_throttle.c
index 2aa52e4903..7b813c33df 100644
--- a/src/core/ext/client_channel/retry_throttle.c
+++ b/src/core/ext/client_channel/retry_throttle.c
@@ -64,26 +64,18 @@ static void get_replacement_throttle_data_if_needed(
         (grpc_server_retry_throttle_data*)gpr_atm_acq_load(
             &(*throttle_data)->replacement);
     if (new_throttle_data == NULL) return;
-    // Reset *throttle_data to its replacement, updating refcounts as
-    // appropriate.
-    // Note: It's safe to do this here, because the caller ensures that
-    // this will only be called with a given value of throttle_data from
-    // one thread at a time.
-    grpc_server_retry_throttle_data_ref(new_throttle_data);
-    grpc_server_retry_throttle_data* old_throttle_data = *throttle_data;
     *throttle_data = new_throttle_data;
-    grpc_server_retry_throttle_data_unref(old_throttle_data);
   }
 }
 
 bool grpc_server_retry_throttle_data_record_failure(
-    grpc_server_retry_throttle_data** throttle_data) {
+    grpc_server_retry_throttle_data* throttle_data) {
   // First, check if we are stale and need to be replaced.
-  get_replacement_throttle_data_if_needed(throttle_data);
+  get_replacement_throttle_data_if_needed(&throttle_data);
   // We decrement milli_tokens by 1000 (1 token) for each failure.
   const int delta = -1000;
   const int old_value = (int)gpr_atm_full_fetch_add(
-      &(*throttle_data)->milli_tokens, (gpr_atm)delta);
+      &throttle_data->milli_tokens, (gpr_atm)delta);
   // If the above change takes us below 0, then re-add the excess.  Note
   // that between these two atomic operations, the value will be
   // artificially low by as much as 1000, but this window should be
@@ -91,41 +83,42 @@ bool grpc_server_retry_throttle_data_record_failure(
   int new_value = old_value - 1000;
   if (new_value < 0) {
     const int excess_value = new_value - (old_value < 0 ? old_value : 0);
-    gpr_atm_full_fetch_add(&(*throttle_data)->milli_tokens,
+    gpr_atm_full_fetch_add(&throttle_data->milli_tokens,
                            (gpr_atm)-excess_value);
     new_value = 0;
   }
   // Retries are allowed as long as the new value is above the threshold
   // (max_milli_tokens / 2).
-  return new_value > (*throttle_data)->max_milli_tokens / 2;
+  return new_value > throttle_data->max_milli_tokens / 2;
 }
 
 void grpc_server_retry_throttle_data_record_success(
-    grpc_server_retry_throttle_data** throttle_data) {
+    grpc_server_retry_throttle_data* throttle_data) {
   // First, check if we are stale and need to be replaced.
-  get_replacement_throttle_data_if_needed(throttle_data);
+  get_replacement_throttle_data_if_needed(&throttle_data);
   // We increment milli_tokens by milli_token_ratio for each success.
-  const int delta = (*throttle_data)->milli_token_ratio;
+  const int delta = throttle_data->milli_token_ratio;
   const int old_value = (int)gpr_atm_full_fetch_add(
-      &(*throttle_data)->milli_tokens, (gpr_atm)delta);
+      &throttle_data->milli_tokens, (gpr_atm)delta);
   // If the above change takes us over max_milli_tokens, then subtract
   // the excess.  Note that between these two atomic operations, the
   // value will be artificially high by as much as milli_token_ratio,
   // but this window should be brief.
-  const int new_value = old_value + (*throttle_data)->milli_token_ratio;
-  if (new_value > (*throttle_data)->max_milli_tokens) {
+  const int new_value = old_value + throttle_data->milli_token_ratio;
+  if (new_value > throttle_data->max_milli_tokens) {
     const int excess_value =
-        new_value - (old_value > (*throttle_data)->max_milli_tokens
+        new_value - (old_value > throttle_data->max_milli_tokens
                          ? old_value
-                         : (*throttle_data)->max_milli_tokens);
-    gpr_atm_full_fetch_add(&(*throttle_data)->milli_tokens,
+                         : throttle_data->max_milli_tokens);
+    gpr_atm_full_fetch_add(&throttle_data->milli_tokens,
                            (gpr_atm)-excess_value);
   }
 }
 
-void grpc_server_retry_throttle_data_ref(
+grpc_server_retry_throttle_data* grpc_server_retry_throttle_data_ref(
     grpc_server_retry_throttle_data* throttle_data) {
   gpr_ref(&throttle_data->refs);
+  return throttle_data;
 }
 
 void grpc_server_retry_throttle_data_unref(
@@ -189,8 +182,7 @@ static void destroy_server_retry_throttle_data(void* value) {
 
 static void* copy_server_retry_throttle_data(void* value) {
   grpc_server_retry_throttle_data* throttle_data = value;
-  grpc_server_retry_throttle_data_ref(throttle_data);
-  return value;
+  return grpc_server_retry_throttle_data_ref(throttle_data);
 }
 
 static const gpr_avl_vtable avl_vtable = {
diff --git a/src/core/ext/client_channel/retry_throttle.h b/src/core/ext/client_channel/retry_throttle.h
index 4209bb7fb6..f9971faf65 100644
--- a/src/core/ext/client_channel/retry_throttle.h
+++ b/src/core/ext/client_channel/retry_throttle.h
@@ -40,17 +40,13 @@
 typedef struct grpc_server_retry_throttle_data grpc_server_retry_throttle_data;
 
 /// Records a failure.  Returns true if it's okay to send a retry.
-/// Updates \a throttle_data if the original value is stale and has been
-/// replaced.  Not thread safe; caller must synchronize.
 bool grpc_server_retry_throttle_data_record_failure(
-    grpc_server_retry_throttle_data** throttle_data);
+    grpc_server_retry_throttle_data* throttle_data);
 /// Records a success.
-/// Updates \a throttle_data if the original value is stale and has been
-/// replaced.  Not thread safe; caller must synchronize.
 void grpc_server_retry_throttle_data_record_success(
-    grpc_server_retry_throttle_data** throttle_data);
+    grpc_server_retry_throttle_data* throttle_data);
 
-void grpc_server_retry_throttle_data_ref(
+grpc_server_retry_throttle_data* grpc_server_retry_throttle_data_ref(
     grpc_server_retry_throttle_data* throttle_data);
 void grpc_server_retry_throttle_data_unref(
     grpc_server_retry_throttle_data* throttle_data);
-- 
GitLab