Skip to content
Snippets Groups Projects
Commit 274c8ed0 authored by Mark D. Roth's avatar Mark D. Roth
Browse files

Fix handling of max receive message size on client side.

parent 9ab8ba73
No related branches found
No related tags found
No related merge requests found
...@@ -73,16 +73,22 @@ static void recv_message_ready(grpc_exec_ctx* exec_ctx, void* user_data, ...@@ -73,16 +73,22 @@ static void recv_message_ready(grpc_exec_ctx* exec_ctx, void* user_data,
gpr_asprintf(&message_string, gpr_asprintf(&message_string,
"Received message larger than max (%u vs. %d)", "Received message larger than max (%u vs. %d)",
(*calld->recv_message)->length, chand->max_recv_size); (*calld->recv_message)->length, chand->max_recv_size);
gpr_slice message = gpr_slice_from_copied_string(message_string); grpc_error* new_error = grpc_error_set_int(
GRPC_ERROR_CREATE(message_string), GRPC_ERROR_INT_GRPC_STATUS,
GRPC_STATUS_INVALID_ARGUMENT);
if (error == GRPC_ERROR_NONE) {
error = new_error;
} else {
error = grpc_error_add_child(error, new_error);
GRPC_ERROR_UNREF(new_error);
}
gpr_free(message_string); gpr_free(message_string);
grpc_call_element_send_close_with_message(
exec_ctx, elem, GRPC_STATUS_INVALID_ARGUMENT, &message);
} }
// Invoke the next callback. // Invoke the next callback.
grpc_exec_ctx_sched(exec_ctx, calld->next_recv_message_ready, error, NULL); grpc_exec_ctx_sched(exec_ctx, calld->next_recv_message_ready, error, NULL);
} }
// Start transport op. // Start transport stream op.
static void start_transport_stream_op(grpc_exec_ctx* exec_ctx, static void start_transport_stream_op(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem, grpc_call_element* elem,
grpc_transport_stream_op* op) { grpc_transport_stream_op* op) {
......
...@@ -1103,8 +1103,8 @@ static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, ...@@ -1103,8 +1103,8 @@ static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
} }
} }
static void process_data_after_md(grpc_exec_ctx *exec_ctx, batch_control *bctl, static void process_data_after_md(grpc_exec_ctx *exec_ctx,
bool success) { batch_control *bctl) {
grpc_call *call = bctl->call; grpc_call *call = bctl->call;
if (call->receiving_stream == NULL) { if (call->receiving_stream == NULL) {
*call->receiving_buffer = NULL; *call->receiving_buffer = NULL;
...@@ -1124,8 +1124,6 @@ static void process_data_after_md(grpc_exec_ctx *exec_ctx, batch_control *bctl, ...@@ -1124,8 +1124,6 @@ static void process_data_after_md(grpc_exec_ctx *exec_ctx, batch_control *bctl,
grpc_closure_init(&call->receiving_slice_ready, receiving_slice_ready, grpc_closure_init(&call->receiving_slice_ready, receiving_slice_ready,
bctl); bctl);
continue_receiving_slices(exec_ctx, bctl); continue_receiving_slices(exec_ctx, bctl);
/* early out */
return;
} }
} }
...@@ -1133,12 +1131,17 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp, ...@@ -1133,12 +1131,17 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
grpc_error *error) { grpc_error *error) {
batch_control *bctl = bctlp; batch_control *bctl = bctlp;
grpc_call *call = bctl->call; grpc_call *call = bctl->call;
if (error != GRPC_ERROR_NONE) {
grpc_status_code status;
const char *msg;
grpc_error_get_status(error, &status, &msg);
close_with_status(exec_ctx, call, status, msg);
}
gpr_mu_lock(&bctl->call->mu); gpr_mu_lock(&bctl->call->mu);
if (bctl->call->has_initial_md_been_received || error != GRPC_ERROR_NONE || if (bctl->call->has_initial_md_been_received || error != GRPC_ERROR_NONE ||
call->receiving_stream == NULL) { call->receiving_stream == NULL) {
gpr_mu_unlock(&bctl->call->mu); gpr_mu_unlock(&bctl->call->mu);
process_data_after_md(exec_ctx, bctlp, error); process_data_after_md(exec_ctx, bctlp);
} else { } else {
call->saved_receiving_stream_ready_bctlp = bctlp; call->saved_receiving_stream_ready_bctlp = bctlp;
gpr_mu_unlock(&bctl->call->mu); gpr_mu_unlock(&bctl->call->mu);
......
...@@ -98,9 +98,12 @@ static void end_test(grpc_end2end_test_fixture *f) { ...@@ -98,9 +98,12 @@ static void end_test(grpc_end2end_test_fixture *f) {
grpc_completion_queue_destroy(f->cq); grpc_completion_queue_destroy(f->cq);
} }
static void test_max_message_length(grpc_end2end_test_config config, // Test with request larger than the limit.
bool send_limit) { // If send_limit is true, applies send limit on client; otherwise, applies
gpr_log(GPR_INFO, "testing with send_limit=%d", send_limit); // recv limit on server.
static void test_max_message_length_on_request(grpc_end2end_test_config config,
bool send_limit) {
gpr_log(GPR_INFO, "testing request with send_limit=%d", send_limit);
grpc_end2end_test_fixture f; grpc_end2end_test_fixture f;
grpc_arg channel_arg; grpc_arg channel_arg;
...@@ -239,9 +242,161 @@ done: ...@@ -239,9 +242,161 @@ done:
config.tear_down_data(&f); config.tear_down_data(&f);
} }
// Test with response larger than the limit.
// If send_limit is true, applies send limit on server; otherwise, applies
// recv limit on client.
static void test_max_message_length_on_response(grpc_end2end_test_config config,
bool send_limit) {
gpr_log(GPR_INFO, "testing response with send_limit=%d", send_limit);
grpc_end2end_test_fixture f;
grpc_arg channel_arg;
grpc_channel_args channel_args;
grpc_call *c = NULL;
grpc_call *s = NULL;
cq_verifier *cqv;
grpc_op ops[6];
grpc_op *op;
gpr_slice response_payload_slice =
gpr_slice_from_copied_string("hello world");
grpc_byte_buffer *response_payload =
grpc_raw_byte_buffer_create(&response_payload_slice, 1);
grpc_byte_buffer *recv_payload = NULL;
grpc_metadata_array initial_metadata_recv;
grpc_metadata_array trailing_metadata_recv;
grpc_metadata_array request_metadata_recv;
grpc_call_details call_details;
grpc_status_code status;
grpc_call_error error;
char *details = NULL;
size_t details_capacity = 0;
int was_cancelled = 2;
channel_arg.key = send_limit ? GRPC_ARG_MAX_SEND_MESSAGE_LENGTH
: GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH;
channel_arg.type = GRPC_ARG_INTEGER;
channel_arg.value.integer = 5;
channel_args.num_args = 1;
channel_args.args = &channel_arg;
f = begin_test(config, "test_max_message_length",
send_limit ? NULL : &channel_args,
send_limit ? &channel_args : NULL);
cqv = cq_verifier_create(f.cq);
c = grpc_channel_create_call(f.client, NULL, GRPC_PROPAGATE_DEFAULTS, f.cq,
"/foo", "foo.test.google.fr:1234",
gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
GPR_ASSERT(c);
grpc_metadata_array_init(&initial_metadata_recv);
grpc_metadata_array_init(&trailing_metadata_recv);
grpc_metadata_array_init(&request_metadata_recv);
grpc_call_details_init(&call_details);
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op->flags = 0;
op->reserved = NULL;
op++;
op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
op->flags = 0;
op->reserved = NULL;
op++;
op->op = GRPC_OP_RECV_INITIAL_METADATA;
op->data.recv_initial_metadata = &initial_metadata_recv;
op->flags = 0;
op->reserved = NULL;
op++;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message = &recv_payload;
op->flags = 0;
op->reserved = NULL;
op++;
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
op->data.recv_status_on_client.status = &status;
op->data.recv_status_on_client.status_details = &details;
op->data.recv_status_on_client.status_details_capacity = &details_capacity;
op->flags = 0;
op->reserved = NULL;
op++;
error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(1), NULL);
GPR_ASSERT(GRPC_CALL_OK == error);
error =
grpc_server_request_call(f.server, &s, &call_details,
&request_metadata_recv, f.cq, f.cq, tag(101));
GPR_ASSERT(GRPC_CALL_OK == error);
CQ_EXPECT_COMPLETION(cqv, tag(101), 1);
cq_verify(cqv);
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op->flags = 0;
op->reserved = NULL;
op++;
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
op->data.recv_close_on_server.cancelled = &was_cancelled;
op->flags = 0;
op->reserved = NULL;
op++;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message = response_payload;
op->flags = 0;
op->reserved = NULL;
op++;
op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
op->data.send_status_from_server.trailing_metadata_count = 0;
op->data.send_status_from_server.status = GRPC_STATUS_OK;
op->data.send_status_from_server.status_details = "xyz";
op->flags = 0;
op->reserved = NULL;
op++;
error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(102), NULL);
GPR_ASSERT(GRPC_CALL_OK == error);
CQ_EXPECT_COMPLETION(cqv, tag(102), 1);
CQ_EXPECT_COMPLETION(cqv, tag(1), 1);
cq_verify(cqv);
GPR_ASSERT(0 == strcmp(call_details.method, "/foo"));
GPR_ASSERT(0 == strcmp(call_details.host, "foo.test.google.fr:1234"));
GPR_ASSERT(was_cancelled == 0);
GPR_ASSERT(status == GRPC_STATUS_INVALID_ARGUMENT);
GPR_ASSERT(strcmp(details,
send_limit
? "Sent message larger than max (11 vs. 5)"
: "Received message larger than max (11 vs. 5)") == 0);
gpr_free(details);
grpc_metadata_array_destroy(&initial_metadata_recv);
grpc_metadata_array_destroy(&trailing_metadata_recv);
grpc_metadata_array_destroy(&request_metadata_recv);
grpc_call_details_destroy(&call_details);
grpc_byte_buffer_destroy(response_payload);
grpc_byte_buffer_destroy(recv_payload);
grpc_call_destroy(c);
if (s != NULL) grpc_call_destroy(s);
cq_verifier_destroy(cqv);
end_test(&f);
config.tear_down_data(&f);
}
void max_message_length(grpc_end2end_test_config config) { void max_message_length(grpc_end2end_test_config config) {
test_max_message_length(config, true); test_max_message_length_on_request(config, false /* send_limit */);
test_max_message_length(config, false); test_max_message_length_on_request(config, true /* send_limit */);
test_max_message_length_on_response(config, false /* send_limit */);
test_max_message_length_on_response(config, true /* send_limit */);
} }
void max_message_length_pre_init(void) {} void max_message_length_pre_init(void) {}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment