Skip to content
Snippets Groups Projects
Commit 7cd1425e authored by Mark D. Roth's avatar Mark D. Roth
Browse files

Fix handling of send_message before send_initial_metadata in compress filter.

parent 4d5f30d9
No related branches found
No related tags found
No related merge requests found
...@@ -255,6 +255,23 @@ static void continue_send_message(grpc_exec_ctx *exec_ctx, ...@@ -255,6 +255,23 @@ static void continue_send_message(grpc_exec_ctx *exec_ctx,
} }
} }
static void handle_send_message_batch(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_transport_stream_op_batch *op,
bool has_compression_algorithm) {
call_data *calld = elem->call_data;
if (!skip_compression(elem, op->payload->send_message.send_message->flags,
has_compression_algorithm)) {
calld->send_op = op;
calld->send_length = op->payload->send_message.send_message->length;
calld->send_flags = op->payload->send_message.send_message->flags;
continue_send_message(exec_ctx, elem);
} else {
/* pass control down the stack */
grpc_call_next_op(exec_ctx, elem, op);
}
}
static void compress_start_transport_stream_op_batch( static void compress_start_transport_stream_op_batch(
grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_transport_stream_op_batch *op) { grpc_transport_stream_op_batch *op) {
...@@ -307,8 +324,9 @@ static void compress_start_transport_stream_op_batch( ...@@ -307,8 +324,9 @@ static void compress_start_transport_stream_op_batch(
goto retry_send_im; goto retry_send_im;
} }
if (cur != INITIAL_METADATA_UNSEEN) { if (cur != INITIAL_METADATA_UNSEEN) {
grpc_call_next_op(exec_ctx, elem, handle_send_message_batch(exec_ctx, elem,
(grpc_transport_stream_op_batch *)cur); (grpc_transport_stream_op_batch *)cur,
has_compression_algorithm);
} }
} }
} }
...@@ -325,17 +343,8 @@ static void compress_start_transport_stream_op_batch( ...@@ -325,17 +343,8 @@ static void compress_start_transport_stream_op_batch(
break; break;
case HAS_COMPRESSION_ALGORITHM: case HAS_COMPRESSION_ALGORITHM:
case NO_COMPRESSION_ALGORITHM: case NO_COMPRESSION_ALGORITHM:
if (!skip_compression(elem, handle_send_message_batch(exec_ctx, elem, op,
op->payload->send_message.send_message->flags, cur == HAS_COMPRESSION_ALGORITHM);
cur == HAS_COMPRESSION_ALGORITHM)) {
calld->send_op = op;
calld->send_length = op->payload->send_message.send_message->length;
calld->send_flags = op->payload->send_message.send_message->flags;
continue_send_message(exec_ctx, elem);
} else {
/* pass control down the stack */
grpc_call_next_op(exec_ctx, elem, op);
}
break; break;
default: default:
if (cur & CANCELLED_BIT) { if (cur & CANCELLED_BIT) {
......
...@@ -274,7 +274,8 @@ static void request_with_payload_template( ...@@ -274,7 +274,8 @@ static void request_with_payload_template(
grpc_compression_algorithm expected_algorithm_from_client, grpc_compression_algorithm expected_algorithm_from_client,
grpc_compression_algorithm expected_algorithm_from_server, grpc_compression_algorithm expected_algorithm_from_server,
grpc_metadata *client_init_metadata, bool set_server_level, grpc_metadata *client_init_metadata, bool set_server_level,
grpc_compression_level server_compression_level) { grpc_compression_level server_compression_level,
bool send_message_before_initial_metadata) {
grpc_call *c; grpc_call *c;
grpc_call *s; grpc_call *s;
grpc_slice request_payload_slice; grpc_slice request_payload_slice;
...@@ -330,6 +331,20 @@ static void request_with_payload_template( ...@@ -330,6 +331,20 @@ static void request_with_payload_template(
grpc_metadata_array_init(&request_metadata_recv); grpc_metadata_array_init(&request_metadata_recv);
grpc_call_details_init(&call_details); grpc_call_details_init(&call_details);
if (send_message_before_initial_metadata) {
request_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1);
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message.send_message = request_payload;
op->flags = client_send_flags_bitmask;
op->reserved = NULL;
op++;
error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(2), NULL);
GPR_ASSERT(GRPC_CALL_OK == error);
CQ_EXPECT_COMPLETION(cqv, tag(2), true);
}
memset(ops, 0, sizeof(ops)); memset(ops, 0, sizeof(ops));
op = ops; op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA; op->op = GRPC_OP_SEND_INITIAL_METADATA;
...@@ -394,23 +409,21 @@ static void request_with_payload_template( ...@@ -394,23 +409,21 @@ static void request_with_payload_template(
GPR_ASSERT(GRPC_CALL_OK == error); GPR_ASSERT(GRPC_CALL_OK == error);
for (int i = 0; i < 2; i++) { for (int i = 0; i < 2; i++) {
request_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1);
response_payload = grpc_raw_byte_buffer_create(&response_payload_slice, 1); response_payload = grpc_raw_byte_buffer_create(&response_payload_slice, 1);
memset(ops, 0, sizeof(ops)); if (i > 0 || !send_message_before_initial_metadata) {
op = ops; request_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1);
op->op = GRPC_OP_SEND_MESSAGE; memset(ops, 0, sizeof(ops));
op->data.send_message.send_message = request_payload; op = ops;
op->flags = client_send_flags_bitmask; op->op = GRPC_OP_SEND_MESSAGE;
op->reserved = NULL; op->data.send_message.send_message = request_payload;
op++; op->flags = client_send_flags_bitmask;
op->op = GRPC_OP_RECV_MESSAGE; op->reserved = NULL;
op->data.recv_message.recv_message = &response_payload_recv; op++;
op->flags = 0; error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(2), NULL);
op->reserved = NULL; GPR_ASSERT(GRPC_CALL_OK == error);
op++; CQ_EXPECT_COMPLETION(cqv, tag(2), 1);
error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(2), NULL); }
GPR_ASSERT(GRPC_CALL_OK == error);
memset(ops, 0, sizeof(ops)); memset(ops, 0, sizeof(ops));
op = ops; op = ops;
...@@ -421,6 +434,7 @@ static void request_with_payload_template( ...@@ -421,6 +434,7 @@ static void request_with_payload_template(
op++; op++;
error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(102), NULL); error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(102), NULL);
GPR_ASSERT(GRPC_CALL_OK == error); GPR_ASSERT(GRPC_CALL_OK == error);
CQ_EXPECT_COMPLETION(cqv, tag(102), 1); CQ_EXPECT_COMPLETION(cqv, tag(102), 1);
cq_verify(cqv); cq_verify(cqv);
...@@ -438,8 +452,19 @@ static void request_with_payload_template( ...@@ -438,8 +452,19 @@ static void request_with_payload_template(
op++; op++;
error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(103), NULL); error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(103), NULL);
GPR_ASSERT(GRPC_CALL_OK == error); GPR_ASSERT(GRPC_CALL_OK == error);
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message.recv_message = &response_payload_recv;
op->flags = 0;
op->reserved = NULL;
op++;
error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(3), NULL);
GPR_ASSERT(GRPC_CALL_OK == error);
CQ_EXPECT_COMPLETION(cqv, tag(103), 1); CQ_EXPECT_COMPLETION(cqv, tag(103), 1);
CQ_EXPECT_COMPLETION(cqv, tag(2), 1); CQ_EXPECT_COMPLETION(cqv, tag(3), 1);
cq_verify(cqv); cq_verify(cqv);
GPR_ASSERT(response_payload_recv->type == GRPC_BB_RAW); GPR_ASSERT(response_payload_recv->type == GRPC_BB_RAW);
...@@ -469,7 +494,7 @@ static void request_with_payload_template( ...@@ -469,7 +494,7 @@ static void request_with_payload_template(
op->flags = 0; op->flags = 0;
op->reserved = NULL; op->reserved = NULL;
op++; op++;
error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(3), NULL); error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(4), NULL);
GPR_ASSERT(GRPC_CALL_OK == error); GPR_ASSERT(GRPC_CALL_OK == error);
memset(ops, 0, sizeof(ops)); memset(ops, 0, sizeof(ops));
...@@ -486,7 +511,7 @@ static void request_with_payload_template( ...@@ -486,7 +511,7 @@ static void request_with_payload_template(
GPR_ASSERT(GRPC_CALL_OK == error); GPR_ASSERT(GRPC_CALL_OK == error);
CQ_EXPECT_COMPLETION(cqv, tag(1), 1); CQ_EXPECT_COMPLETION(cqv, tag(1), 1);
CQ_EXPECT_COMPLETION(cqv, tag(3), 1); CQ_EXPECT_COMPLETION(cqv, tag(4), 1);
CQ_EXPECT_COMPLETION(cqv, tag(101), 1); CQ_EXPECT_COMPLETION(cqv, tag(101), 1);
CQ_EXPECT_COMPLETION(cqv, tag(104), 1); CQ_EXPECT_COMPLETION(cqv, tag(104), 1);
cq_verify(cqv); cq_verify(cqv);
...@@ -526,7 +551,7 @@ static void test_invoke_request_with_exceptionally_uncompressed_payload( ...@@ -526,7 +551,7 @@ static void test_invoke_request_with_exceptionally_uncompressed_payload(
config, "test_invoke_request_with_exceptionally_uncompressed_payload", config, "test_invoke_request_with_exceptionally_uncompressed_payload",
GRPC_WRITE_NO_COMPRESS, GRPC_COMPRESS_GZIP, GRPC_COMPRESS_GZIP, GRPC_WRITE_NO_COMPRESS, GRPC_COMPRESS_GZIP, GRPC_COMPRESS_GZIP,
GRPC_COMPRESS_NONE, GRPC_COMPRESS_GZIP, NULL, false, GRPC_COMPRESS_NONE, GRPC_COMPRESS_GZIP, NULL, false,
/* ignored */ GRPC_COMPRESS_LEVEL_NONE); /* ignored */ GRPC_COMPRESS_LEVEL_NONE, false);
} }
static void test_invoke_request_with_uncompressed_payload( static void test_invoke_request_with_uncompressed_payload(
...@@ -534,7 +559,8 @@ static void test_invoke_request_with_uncompressed_payload( ...@@ -534,7 +559,8 @@ static void test_invoke_request_with_uncompressed_payload(
request_with_payload_template( request_with_payload_template(
config, "test_invoke_request_with_uncompressed_payload", 0, config, "test_invoke_request_with_uncompressed_payload", 0,
GRPC_COMPRESS_NONE, GRPC_COMPRESS_NONE, GRPC_COMPRESS_NONE, GRPC_COMPRESS_NONE, GRPC_COMPRESS_NONE, GRPC_COMPRESS_NONE,
GRPC_COMPRESS_NONE, NULL, false, /* ignored */ GRPC_COMPRESS_LEVEL_NONE); GRPC_COMPRESS_NONE, NULL, false, /* ignored */ GRPC_COMPRESS_LEVEL_NONE,
false);
} }
static void test_invoke_request_with_compressed_payload( static void test_invoke_request_with_compressed_payload(
...@@ -542,7 +568,17 @@ static void test_invoke_request_with_compressed_payload( ...@@ -542,7 +568,17 @@ static void test_invoke_request_with_compressed_payload(
request_with_payload_template( request_with_payload_template(
config, "test_invoke_request_with_compressed_payload", 0, config, "test_invoke_request_with_compressed_payload", 0,
GRPC_COMPRESS_GZIP, GRPC_COMPRESS_GZIP, GRPC_COMPRESS_GZIP, GRPC_COMPRESS_GZIP, GRPC_COMPRESS_GZIP, GRPC_COMPRESS_GZIP,
GRPC_COMPRESS_GZIP, NULL, false, /* ignored */ GRPC_COMPRESS_LEVEL_NONE); GRPC_COMPRESS_GZIP, NULL, false, /* ignored */ GRPC_COMPRESS_LEVEL_NONE,
false);
}
static void test_invoke_request_with_send_message_before_initial_metadata(
grpc_end2end_test_config config) {
request_with_payload_template(
config, "test_invoke_request_with_compressed_payload", 0,
GRPC_COMPRESS_GZIP, GRPC_COMPRESS_GZIP, GRPC_COMPRESS_GZIP,
GRPC_COMPRESS_GZIP, NULL, false, /* ignored */ GRPC_COMPRESS_LEVEL_NONE,
true);
} }
static void test_invoke_request_with_server_level( static void test_invoke_request_with_server_level(
...@@ -550,7 +586,7 @@ static void test_invoke_request_with_server_level( ...@@ -550,7 +586,7 @@ static void test_invoke_request_with_server_level(
request_with_payload_template( request_with_payload_template(
config, "test_invoke_request_with_server_level", 0, GRPC_COMPRESS_NONE, config, "test_invoke_request_with_server_level", 0, GRPC_COMPRESS_NONE,
GRPC_COMPRESS_NONE, GRPC_COMPRESS_NONE, GRPC_COMPRESS_NONE /* ignored */, GRPC_COMPRESS_NONE, GRPC_COMPRESS_NONE, GRPC_COMPRESS_NONE /* ignored */,
NULL, true, GRPC_COMPRESS_LEVEL_HIGH); NULL, true, GRPC_COMPRESS_LEVEL_HIGH, false);
} }
static void test_invoke_request_with_compressed_payload_md_override( static void test_invoke_request_with_compressed_payload_md_override(
...@@ -574,21 +610,21 @@ static void test_invoke_request_with_compressed_payload_md_override( ...@@ -574,21 +610,21 @@ static void test_invoke_request_with_compressed_payload_md_override(
config, "test_invoke_request_with_compressed_payload_md_override_1", 0, config, "test_invoke_request_with_compressed_payload_md_override_1", 0,
GRPC_COMPRESS_NONE, GRPC_COMPRESS_NONE, GRPC_COMPRESS_GZIP, GRPC_COMPRESS_NONE, GRPC_COMPRESS_NONE, GRPC_COMPRESS_GZIP,
GRPC_COMPRESS_NONE, &gzip_compression_override, false, GRPC_COMPRESS_NONE, &gzip_compression_override, false,
/*ignored*/ GRPC_COMPRESS_LEVEL_NONE); /*ignored*/ GRPC_COMPRESS_LEVEL_NONE, false);
/* Channel default DEFLATE, call override to GZIP */ /* Channel default DEFLATE, call override to GZIP */
request_with_payload_template( request_with_payload_template(
config, "test_invoke_request_with_compressed_payload_md_override_2", 0, config, "test_invoke_request_with_compressed_payload_md_override_2", 0,
GRPC_COMPRESS_DEFLATE, GRPC_COMPRESS_NONE, GRPC_COMPRESS_GZIP, GRPC_COMPRESS_DEFLATE, GRPC_COMPRESS_NONE, GRPC_COMPRESS_GZIP,
GRPC_COMPRESS_NONE, &gzip_compression_override, false, GRPC_COMPRESS_NONE, &gzip_compression_override, false,
/*ignored*/ GRPC_COMPRESS_LEVEL_NONE); /*ignored*/ GRPC_COMPRESS_LEVEL_NONE, false);
/* Channel default DEFLATE, call override to NONE (aka IDENTITY) */ /* Channel default DEFLATE, call override to NONE (aka IDENTITY) */
request_with_payload_template( request_with_payload_template(
config, "test_invoke_request_with_compressed_payload_md_override_3", 0, config, "test_invoke_request_with_compressed_payload_md_override_3", 0,
GRPC_COMPRESS_DEFLATE, GRPC_COMPRESS_NONE, GRPC_COMPRESS_NONE, GRPC_COMPRESS_DEFLATE, GRPC_COMPRESS_NONE, GRPC_COMPRESS_NONE,
GRPC_COMPRESS_NONE, &identity_compression_override, false, GRPC_COMPRESS_NONE, &identity_compression_override, false,
/*ignored*/ GRPC_COMPRESS_LEVEL_NONE); /*ignored*/ GRPC_COMPRESS_LEVEL_NONE, false);
} }
static void test_invoke_request_with_disabled_algorithm( static void test_invoke_request_with_disabled_algorithm(
...@@ -602,6 +638,7 @@ void compressed_payload(grpc_end2end_test_config config) { ...@@ -602,6 +638,7 @@ void compressed_payload(grpc_end2end_test_config config) {
test_invoke_request_with_exceptionally_uncompressed_payload(config); test_invoke_request_with_exceptionally_uncompressed_payload(config);
test_invoke_request_with_uncompressed_payload(config); test_invoke_request_with_uncompressed_payload(config);
test_invoke_request_with_compressed_payload(config); test_invoke_request_with_compressed_payload(config);
test_invoke_request_with_send_message_before_initial_metadata(config);
test_invoke_request_with_server_level(config); test_invoke_request_with_server_level(config);
test_invoke_request_with_compressed_payload_md_override(config); test_invoke_request_with_compressed_payload_md_override(config);
test_invoke_request_with_disabled_algorithm(config); test_invoke_request_with_disabled_algorithm(config);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment