From 23f777df084f15df8d0aac3684c5f5c88399c44d Mon Sep 17 00:00:00 2001
From: yang-g <yangg@google.com>
Date: Wed, 22 Feb 2017 23:32:26 -0800
Subject: [PATCH] When there is an error from filters (such as max recv message
 size too large), add an error to the batch to avoid emitting new rpc on the
 server side in C++.

---
 src/core/lib/surface/call.c                | 17 +++++++++++------
 src/core/lib/surface/server.c              |  3 ++-
 src/proto/grpc/testing/echo_messages.proto |  1 +
 test/cpp/end2end/end2end_test.cc           |  3 +--
 test/cpp/end2end/test_service_impl.cc      |  4 ++++
 5 files changed, 19 insertions(+), 9 deletions(-)

diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 3c563bcc6f..af32c5b378 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -112,7 +112,7 @@ static received_status unpack_received_status(gpr_atm atm) {
                                  .error = (grpc_error *)(atm & ~(gpr_atm)1)};
 }
 
-#define MAX_ERRORS_PER_BATCH 3
+#define MAX_ERRORS_PER_BATCH 4
 
 typedef struct batch_control {
   grpc_call *call;
@@ -254,7 +254,7 @@ static void set_status_from_error(grpc_exec_ctx *exec_ctx, grpc_call *call,
 static void process_data_after_md(grpc_exec_ctx *exec_ctx, batch_control *bctl);
 static void post_batch_completion(grpc_exec_ctx *exec_ctx, batch_control *bctl);
 static void add_batch_error(grpc_exec_ctx *exec_ctx, batch_control *bctl,
-                            grpc_error *error);
+                            grpc_error *error, bool has_cancelled);
 
 static void add_init_error(grpc_error **composite, grpc_error *new) {
   if (new == GRPC_ERROR_NONE) return;
@@ -1223,6 +1223,11 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
   grpc_call *call = bctl->call;
   gpr_mu_lock(&bctl->call->mu);
   if (error != GRPC_ERROR_NONE) {
+    if (call->receiving_stream != NULL) {
+      grpc_byte_stream_destroy(exec_ctx, call->receiving_stream);
+      call->receiving_stream = NULL;
+    }
+    add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), true);
     cancel_with_error(exec_ctx, call, STATUS_FROM_SURFACE,
                       GRPC_ERROR_REF(error));
   }
@@ -1289,10 +1294,10 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx,
 }
 
 static void add_batch_error(grpc_exec_ctx *exec_ctx, batch_control *bctl,
-                            grpc_error *error) {
+                            grpc_error *error, bool has_cancelled) {
   if (error == GRPC_ERROR_NONE) return;
   int idx = (int)gpr_atm_no_barrier_fetch_add(&bctl->num_errors, 1);
-  if (idx == 0) {
+  if (idx == 0 && !has_cancelled) {
     cancel_with_error(exec_ctx, bctl->call, STATUS_FROM_CORE,
                       GRPC_ERROR_REF(error));
   }
@@ -1306,7 +1311,7 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
 
   gpr_mu_lock(&call->mu);
 
-  add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error));
+  add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), false);
   if (error == GRPC_ERROR_NONE) {
     grpc_metadata_batch *md =
         &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
@@ -1343,7 +1348,7 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp,
                          grpc_error *error) {
   batch_control *bctl = bctlp;
 
-  add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error));
+  add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), false);
   finish_batch_step(exec_ctx, bctl);
 }
 
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index 7210c69fb0..fdb230b3e2 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -540,7 +540,8 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg,
         &calld->kill_zombie_closure, kill_zombie,
         grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
         grpc_schedule_on_exec_ctx);
-    grpc_closure_sched(exec_ctx, &calld->kill_zombie_closure, error);
+    grpc_closure_sched(exec_ctx, &calld->kill_zombie_closure,
+                       GRPC_ERROR_REF(error));
     return;
   }
 
diff --git a/src/proto/grpc/testing/echo_messages.proto b/src/proto/grpc/testing/echo_messages.proto
index b405acf043..efb6f4d493 100644
--- a/src/proto/grpc/testing/echo_messages.proto
+++ b/src/proto/grpc/testing/echo_messages.proto
@@ -50,6 +50,7 @@ message RequestParams {
   bool skip_cancelled_check = 9;
   string expected_transport_security_type = 10;
   DebugInfo debug_info = 11;
+  bool server_die = 12; // Server should not see a request with this set.
 }
 
 message EchoRequest {
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc
index 47e5c5bd77..df78557c43 100644
--- a/test/cpp/end2end/end2end_test.cc
+++ b/test/cpp/end2end/end2end_test.cc
@@ -901,8 +901,7 @@ TEST_P(End2endTest, RpcMaxMessageSize) {
   EchoRequest request;
   EchoResponse response;
   request.set_message(string(kMaxMessageSize_ * 2, 'a'));
-  // cancelled is not guaranteed to appear before the end of the service handler
-  request.mutable_param()->set_skip_cancelled_check(true);
+  request.mutable_param()->set_server_die(true);
 
   ClientContext context;
   Status s = stub_->Echo(&context, request, &response);
diff --git a/test/cpp/end2end/test_service_impl.cc b/test/cpp/end2end/test_service_impl.cc
index 001047778d..59d36e9cb5 100644
--- a/test/cpp/end2end/test_service_impl.cc
+++ b/test/cpp/end2end/test_service_impl.cc
@@ -88,6 +88,10 @@ void CheckServerAuthContext(
 
 Status TestServiceImpl::Echo(ServerContext* context, const EchoRequest* request,
                              EchoResponse* response) {
+  if (request->has_param() && request->param().server_die()) {
+    gpr_log(GPR_ERROR, "The request should not reach application handler.");
+    GPR_ASSERT(0);
+  }
   int server_try_cancel = GetIntValueFromMetadata(
       kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
   if (server_try_cancel > DO_NOT_CANCEL) {
-- 
GitLab