From 274c8ed001aca0dc563ee7e04ecd2f9a486b77ec Mon Sep 17 00:00:00 2001
From: "Mark D. Roth" <roth@google.com>
Date: Tue, 4 Oct 2016 09:21:42 -0700
Subject: [PATCH] Fix handling of max receive message size on client side.

---
 src/core/lib/channel/message_size_filter.c   |  14 +-
 src/core/lib/surface/call.c                  |  15 +-
 test/core/end2end/tests/max_message_length.c | 165 ++++++++++++++++++-
 3 files changed, 179 insertions(+), 15 deletions(-)

diff --git a/src/core/lib/channel/message_size_filter.c b/src/core/lib/channel/message_size_filter.c
index 02fc68fc3a..f067a3a51c 100644
--- a/src/core/lib/channel/message_size_filter.c
+++ b/src/core/lib/channel/message_size_filter.c
@@ -73,16 +73,22 @@ static void recv_message_ready(grpc_exec_ctx* exec_ctx, void* user_data,
     gpr_asprintf(&message_string,
                  "Received message larger than max (%u vs. %d)",
                  (*calld->recv_message)->length, chand->max_recv_size);
-    gpr_slice message = gpr_slice_from_copied_string(message_string);
+    grpc_error* new_error = grpc_error_set_int(
+        GRPC_ERROR_CREATE(message_string), GRPC_ERROR_INT_GRPC_STATUS,
+        GRPC_STATUS_INVALID_ARGUMENT);
+    if (error == GRPC_ERROR_NONE) {
+      error = new_error;
+    } else {
+      error = grpc_error_add_child(error, new_error);
+      GRPC_ERROR_UNREF(new_error);
+    }
     gpr_free(message_string);
-    grpc_call_element_send_close_with_message(
-        exec_ctx, elem, GRPC_STATUS_INVALID_ARGUMENT, &message);
   }
   // Invoke the next callback.
   grpc_exec_ctx_sched(exec_ctx, calld->next_recv_message_ready, error, NULL);
 }
 
-// Start transport op.
+// Start transport stream op.
 static void start_transport_stream_op(grpc_exec_ctx* exec_ctx,
                                       grpc_call_element* elem,
                                       grpc_transport_stream_op* op) {
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 5690bcab1e..b0f66f4f61 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -1103,8 +1103,8 @@ static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
   }
 }
 
-static void process_data_after_md(grpc_exec_ctx *exec_ctx, batch_control *bctl,
-                                  bool success) {
+static void process_data_after_md(grpc_exec_ctx *exec_ctx,
+                                  batch_control *bctl) {
   grpc_call *call = bctl->call;
   if (call->receiving_stream == NULL) {
     *call->receiving_buffer = NULL;
@@ -1124,8 +1124,6 @@ static void process_data_after_md(grpc_exec_ctx *exec_ctx, batch_control *bctl,
     grpc_closure_init(&call->receiving_slice_ready, receiving_slice_ready,
                       bctl);
     continue_receiving_slices(exec_ctx, bctl);
-    /* early out */
-    return;
   }
 }
 
@@ -1133,12 +1131,17 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
                                    grpc_error *error) {
   batch_control *bctl = bctlp;
   grpc_call *call = bctl->call;
-
+  if (error != GRPC_ERROR_NONE) {
+    grpc_status_code status;
+    const char *msg;
+    grpc_error_get_status(error, &status, &msg);
+    close_with_status(exec_ctx, call, status, msg);
+  }
   gpr_mu_lock(&bctl->call->mu);
   if (bctl->call->has_initial_md_been_received || error != GRPC_ERROR_NONE ||
       call->receiving_stream == NULL) {
     gpr_mu_unlock(&bctl->call->mu);
-    process_data_after_md(exec_ctx, bctlp, error);
+    process_data_after_md(exec_ctx, bctlp);
   } else {
     call->saved_receiving_stream_ready_bctlp = bctlp;
     gpr_mu_unlock(&bctl->call->mu);
diff --git a/test/core/end2end/tests/max_message_length.c b/test/core/end2end/tests/max_message_length.c
index cdca3e6748..d27ccedb4e 100644
--- a/test/core/end2end/tests/max_message_length.c
+++ b/test/core/end2end/tests/max_message_length.c
@@ -98,9 +98,12 @@ static void end_test(grpc_end2end_test_fixture *f) {
   grpc_completion_queue_destroy(f->cq);
 }
 
-static void test_max_message_length(grpc_end2end_test_config config,
-                                    bool send_limit) {
-  gpr_log(GPR_INFO, "testing with send_limit=%d", send_limit);
+// Test with request larger than the limit.
+// If send_limit is true, applies send limit on client; otherwise, applies
+// recv limit on server.
+static void test_max_message_length_on_request(grpc_end2end_test_config config,
+                                               bool send_limit) {
+  gpr_log(GPR_INFO, "testing request with send_limit=%d", send_limit);
 
   grpc_end2end_test_fixture f;
   grpc_arg channel_arg;
@@ -239,9 +242,161 @@ done:
   config.tear_down_data(&f);
 }
 
+// Test with response larger than the limit.
+// If send_limit is true, applies send limit on server; otherwise, applies
+// recv limit on client.
+static void test_max_message_length_on_response(grpc_end2end_test_config config,
+                                                bool send_limit) {
+  gpr_log(GPR_INFO, "testing response with send_limit=%d", send_limit);
+
+  grpc_end2end_test_fixture f;
+  grpc_arg channel_arg;
+  grpc_channel_args channel_args;
+  grpc_call *c = NULL;
+  grpc_call *s = NULL;
+  cq_verifier *cqv;
+  grpc_op ops[6];
+  grpc_op *op;
+  gpr_slice response_payload_slice =
+      gpr_slice_from_copied_string("hello world");
+  grpc_byte_buffer *response_payload =
+      grpc_raw_byte_buffer_create(&response_payload_slice, 1);
+  grpc_byte_buffer *recv_payload = NULL;
+  grpc_metadata_array initial_metadata_recv;
+  grpc_metadata_array trailing_metadata_recv;
+  grpc_metadata_array request_metadata_recv;
+  grpc_call_details call_details;
+  grpc_status_code status;
+  grpc_call_error error;
+  char *details = NULL;
+  size_t details_capacity = 0;
+  int was_cancelled = 2;
+
+  channel_arg.key = send_limit ? GRPC_ARG_MAX_SEND_MESSAGE_LENGTH
+                               : GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH;
+  channel_arg.type = GRPC_ARG_INTEGER;
+  channel_arg.value.integer = 5;
+
+  channel_args.num_args = 1;
+  channel_args.args = &channel_arg;
+
+  f = begin_test(config, "test_max_message_length",
+                 send_limit ? NULL : &channel_args,
+                 send_limit ? &channel_args : NULL);
+  cqv = cq_verifier_create(f.cq);
+
+  c = grpc_channel_create_call(f.client, NULL, GRPC_PROPAGATE_DEFAULTS, f.cq,
+                               "/foo", "foo.test.google.fr:1234",
+                               gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
+  GPR_ASSERT(c);
+
+  grpc_metadata_array_init(&initial_metadata_recv);
+  grpc_metadata_array_init(&trailing_metadata_recv);
+  grpc_metadata_array_init(&request_metadata_recv);
+  grpc_call_details_init(&call_details);
+
+  memset(ops, 0, sizeof(ops));
+  op = ops;
+  op->op = GRPC_OP_SEND_INITIAL_METADATA;
+  op->data.send_initial_metadata.count = 0;
+  op->flags = 0;
+  op->reserved = NULL;
+  op++;
+  op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
+  op->flags = 0;
+  op->reserved = NULL;
+  op++;
+  op->op = GRPC_OP_RECV_INITIAL_METADATA;
+  op->data.recv_initial_metadata = &initial_metadata_recv;
+  op->flags = 0;
+  op->reserved = NULL;
+  op++;
+  op->op = GRPC_OP_RECV_MESSAGE;
+  op->data.recv_message = &recv_payload;
+  op->flags = 0;
+  op->reserved = NULL;
+  op++;
+  op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
+  op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
+  op->data.recv_status_on_client.status = &status;
+  op->data.recv_status_on_client.status_details = &details;
+  op->data.recv_status_on_client.status_details_capacity = &details_capacity;
+  op->flags = 0;
+  op->reserved = NULL;
+  op++;
+  error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(1), NULL);
+  GPR_ASSERT(GRPC_CALL_OK == error);
+
+  error =
+      grpc_server_request_call(f.server, &s, &call_details,
+                               &request_metadata_recv, f.cq, f.cq, tag(101));
+  GPR_ASSERT(GRPC_CALL_OK == error);
+  CQ_EXPECT_COMPLETION(cqv, tag(101), 1);
+  cq_verify(cqv);
+
+  memset(ops, 0, sizeof(ops));
+  op = ops;
+  op->op = GRPC_OP_SEND_INITIAL_METADATA;
+  op->data.send_initial_metadata.count = 0;
+  op->flags = 0;
+  op->reserved = NULL;
+  op++;
+  op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
+  op->data.recv_close_on_server.cancelled = &was_cancelled;
+  op->flags = 0;
+  op->reserved = NULL;
+  op++;
+  op->op = GRPC_OP_SEND_MESSAGE;
+  op->data.send_message = response_payload;
+  op->flags = 0;
+  op->reserved = NULL;
+  op++;
+  op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
+  op->data.send_status_from_server.trailing_metadata_count = 0;
+  op->data.send_status_from_server.status = GRPC_STATUS_OK;
+  op->data.send_status_from_server.status_details = "xyz";
+  op->flags = 0;
+  op->reserved = NULL;
+  op++;
+  error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(102), NULL);
+  GPR_ASSERT(GRPC_CALL_OK == error);
+
+  CQ_EXPECT_COMPLETION(cqv, tag(102), 1);
+  CQ_EXPECT_COMPLETION(cqv, tag(1), 1);
+  cq_verify(cqv);
+
+  GPR_ASSERT(0 == strcmp(call_details.method, "/foo"));
+  GPR_ASSERT(0 == strcmp(call_details.host, "foo.test.google.fr:1234"));
+  GPR_ASSERT(was_cancelled == 0);
+
+  GPR_ASSERT(status == GRPC_STATUS_INVALID_ARGUMENT);
+  GPR_ASSERT(strcmp(details,
+                    send_limit
+                        ? "Sent message larger than max (11 vs. 5)"
+                        : "Received message larger than max (11 vs. 5)") == 0);
+
+  gpr_free(details);
+  grpc_metadata_array_destroy(&initial_metadata_recv);
+  grpc_metadata_array_destroy(&trailing_metadata_recv);
+  grpc_metadata_array_destroy(&request_metadata_recv);
+  grpc_call_details_destroy(&call_details);
+  grpc_byte_buffer_destroy(response_payload);
+  grpc_byte_buffer_destroy(recv_payload);
+
+  grpc_call_destroy(c);
+  if (s != NULL) grpc_call_destroy(s);
+
+  cq_verifier_destroy(cqv);
+
+  end_test(&f);
+  config.tear_down_data(&f);
+}
+
 void max_message_length(grpc_end2end_test_config config) {
-  test_max_message_length(config, true);
-  test_max_message_length(config, false);
+  test_max_message_length_on_request(config, false /* send_limit */);
+  test_max_message_length_on_request(config, true /* send_limit */);
+  test_max_message_length_on_response(config, false /* send_limit */);
+  test_max_message_length_on_response(config, true /* send_limit */);
 }
 
 void max_message_length_pre_init(void) {}
-- 
GitLab