From e4d2748f2fec4b189cdb7d13e25df0be95888ba2 Mon Sep 17 00:00:00 2001
From: Craig Tiller <ctiller@google.com>
Date: Tue, 3 May 2016 12:19:33 -0700
Subject: [PATCH] Fix async_end2end_test flow control
Completion queues + flow control + single threading is hard.
We need a read outstanding on a call to grant flow control tokens to the
remote end.
To do that we need to request a read *before* we wait for the write to
be finished, otherwise, in the case of a large write we'll block waiting
for flow control tokens.
Built on #6402
---
test/cpp/end2end/async_end2end_test.cc | 131 ++++++++++++++-----------
1 file changed, 73 insertions(+), 58 deletions(-)
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc
index 4de181b901..0232a9fa31 100644
--- a/test/cpp/end2end/async_end2end_test.cc
+++ b/test/cpp/end2end/async_end2end_test.cc
@@ -281,10 +281,11 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> {
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3));
- Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get());
-
response_reader->Finish(&recv_response, &recv_status, tag(4));
- Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
+ .Expect(4, true)
+ .Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
@@ -345,12 +346,9 @@ TEST_P(AsyncEnd2endTest, AsyncNextRpc) {
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3));
- Verifier(GetParam().disable_blocking)
- .Expect(3, true)
- .Verify(cq_.get(), std::chrono::system_clock::time_point::max());
-
response_reader->Finish(&recv_response, &recv_status, tag(4));
Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
.Expect(4, true)
.Verify(cq_.get(), std::chrono::system_clock::time_point::max());
@@ -384,31 +382,35 @@ TEST_P(AsyncEnd2endTest, SimpleClientStreaming) {
.Verify(cq_.get());
cli_stream->Write(send_request, tag(3));
- Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get());
-
srv_stream.Read(&recv_request, tag(4));
- Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
+ .Expect(4, true)
+ .Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
cli_stream->Write(send_request, tag(5));
- Verifier(GetParam().disable_blocking).Expect(5, true).Verify(cq_.get());
-
srv_stream.Read(&recv_request, tag(6));
- Verifier(GetParam().disable_blocking).Expect(6, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(5, true)
+ .Expect(6, true)
+ .Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
cli_stream->WritesDone(tag(7));
- Verifier(GetParam().disable_blocking).Expect(7, true).Verify(cq_.get());
-
srv_stream.Read(&recv_request, tag(8));
- Verifier(GetParam().disable_blocking).Expect(8, false).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(7, true)
+ .Expect(8, false)
+ .Verify(cq_.get());
send_response.set_message(recv_request.message());
srv_stream.Finish(send_response, Status::OK, tag(9));
- Verifier(GetParam().disable_blocking).Expect(9, true).Verify(cq_.get());
-
cli_stream->Finish(&recv_status, tag(10));
- Verifier(GetParam().disable_blocking).Expect(10, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(9, true)
+ .Expect(10, true)
+ .Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
@@ -442,24 +444,27 @@ TEST_P(AsyncEnd2endTest, SimpleServerStreaming) {
send_response.set_message(recv_request.message());
srv_stream.Write(send_response, tag(3));
- Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get());
-
cli_stream->Read(&recv_response, tag(4));
- Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
+ .Expect(4, true)
+ .Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
srv_stream.Write(send_response, tag(5));
- Verifier(GetParam().disable_blocking).Expect(5, true).Verify(cq_.get());
-
cli_stream->Read(&recv_response, tag(6));
- Verifier(GetParam().disable_blocking).Expect(6, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(5, true)
+ .Expect(6, true)
+ .Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
srv_stream.Finish(Status::OK, tag(7));
- Verifier(GetParam().disable_blocking).Expect(7, true).Verify(cq_.get());
-
cli_stream->Read(&recv_response, tag(8));
- Verifier(GetParam().disable_blocking).Expect(8, false).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(7, true)
+ .Expect(8, false)
+ .Verify(cq_.get());
cli_stream->Finish(&recv_status, tag(9));
Verifier(GetParam().disable_blocking).Expect(9, true).Verify(cq_.get());
@@ -493,31 +498,35 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) {
.Verify(cq_.get());
cli_stream->Write(send_request, tag(3));
- Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get());
-
srv_stream.Read(&recv_request, tag(4));
- Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
+ .Expect(4, true)
+ .Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
send_response.set_message(recv_request.message());
srv_stream.Write(send_response, tag(5));
- Verifier(GetParam().disable_blocking).Expect(5, true).Verify(cq_.get());
-
cli_stream->Read(&recv_response, tag(6));
- Verifier(GetParam().disable_blocking).Expect(6, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(5, true)
+ .Expect(6, true)
+ .Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
cli_stream->WritesDone(tag(7));
- Verifier(GetParam().disable_blocking).Expect(7, true).Verify(cq_.get());
-
srv_stream.Read(&recv_request, tag(8));
- Verifier(GetParam().disable_blocking).Expect(8, false).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(7, true)
+ .Expect(8, false)
+ .Verify(cq_.get());
srv_stream.Finish(Status::OK, tag(9));
- Verifier(GetParam().disable_blocking).Expect(9, true).Verify(cq_.get());
-
cli_stream->Finish(&recv_status, tag(10));
- Verifier(GetParam().disable_blocking).Expect(10, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(9, true)
+ .Expect(10, true)
+ .Verify(cq_.get());
EXPECT_TRUE(recv_status.ok());
}
@@ -562,11 +571,11 @@ TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) {
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3));
-
- Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get());
-
response_reader->Finish(&recv_response, &recv_status, tag(4));
- Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
+ .Expect(4, true)
+ .Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
@@ -612,10 +621,11 @@ TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) {
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(5));
- Verifier(GetParam().disable_blocking).Expect(5, true).Verify(cq_.get());
-
response_reader->Finish(&recv_response, &recv_status, tag(6));
- Verifier(GetParam().disable_blocking).Expect(6, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(5, true)
+ .Expect(6, true)
+ .Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
@@ -652,11 +662,13 @@ TEST_P(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
srv_ctx.AddTrailingMetadata(meta1.first, meta1.second);
srv_ctx.AddTrailingMetadata(meta2.first, meta2.second);
response_writer.Finish(send_response, Status::OK, tag(4));
+ response_reader->Finish(&recv_response, &recv_status, tag(5));
- Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(4, true)
+ .Expect(5, true)
+ .Verify(cq_.get());
- response_reader->Finish(&recv_response, &recv_status, tag(5));
- Verifier(GetParam().disable_blocking).Expect(5, true).Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
@@ -730,11 +742,13 @@ TEST_P(AsyncEnd2endTest, MetadataRpc) {
srv_ctx.AddTrailingMetadata(meta5.first, meta5.second);
srv_ctx.AddTrailingMetadata(meta6.first, meta6.second);
response_writer.Finish(send_response, Status::OK, tag(5));
+ response_reader->Finish(&recv_response, &recv_status, tag(6));
- Verifier(GetParam().disable_blocking).Expect(5, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(5, true)
+ .Expect(6, true)
+ .Verify(cq_.get());
- response_reader->Finish(&recv_response, &recv_status, tag(6));
- Verifier(GetParam().disable_blocking).Expect(6, true).Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
@@ -807,12 +821,13 @@ TEST_P(AsyncEnd2endTest, ServerCheckDone) {
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3));
- Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get());
- Verifier(GetParam().disable_blocking).Expect(5, true).Verify(cq_.get());
- EXPECT_FALSE(srv_ctx.IsCancelled());
-
response_reader->Finish(&recv_response, &recv_status, tag(4));
- Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
+ .Expect(4, true)
+ .Expect(5, true)
+ .Verify(cq_.get());
+ EXPECT_FALSE(srv_ctx.IsCancelled());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
--
GitLab