diff --git a/include/grpc++/impl/codegen/method_handler_impl.h b/include/grpc++/impl/codegen/method_handler_impl.h index 64c91bee22a44b47b7a55242f72e410cfd79f651..15e24bdcdcb8daba7ba58982df72667f826d3b99 100644 --- a/include/grpc++/impl/codegen/method_handler_impl.h +++ b/include/grpc++/impl/codegen/method_handler_impl.h @@ -141,6 +141,9 @@ class ServerStreamingHandler : public MethodHandler { } ops.ServerSendStatus(param.server_context->trailing_metadata_, status); param.call->PerformOps(&ops); + if (param.server_context->has_pending_ops_) { + param.call->cq()->Pluck(¶m.server_context->pending_ops_); + } param.call->cq()->Pluck(&ops); } @@ -185,6 +188,9 @@ class TemplatedBidiStreamingHandler : public MethodHandler { } ops.ServerSendStatus(param.server_context->trailing_metadata_, status); param.call->PerformOps(&ops); + if (param.server_context->has_pending_ops_) { + param.call->cq()->Pluck(¶m.server_context->pending_ops_); + } param.call->cq()->Pluck(&ops); } diff --git a/include/grpc++/impl/codegen/server_context.h b/include/grpc++/impl/codegen/server_context.h index 96fe645a152a3496c4a3f84da4054b0be00c4940..b5e37fd12b10df788886e1d250073229f2774e7a 100644 --- a/include/grpc++/impl/codegen/server_context.h +++ b/include/grpc++/impl/codegen/server_context.h @@ -25,6 +25,7 @@ #include <grpc/impl/codegen/compression_types.h> +#include <grpc++/impl/codegen/call.h> #include <grpc++/impl/codegen/completion_queue_tag.h> #include <grpc++/impl/codegen/config.h> #include <grpc++/impl/codegen/create_auth_context.h> @@ -288,6 +289,9 @@ class ServerContext { bool compression_level_set_; grpc_compression_level compression_level_; grpc_compression_algorithm compression_algorithm_; + + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> pending_ops_; + bool has_pending_ops_; }; } // namespace grpc diff --git a/include/grpc++/impl/codegen/sync_stream.h b/include/grpc++/impl/codegen/sync_stream.h index 09836f340b07b6ceb0be5590a3ccc6913bbc0add..3fa208963db03dfebdfc618334e45b73733a1dc2 100644 --- a/include/grpc++/impl/codegen/sync_stream.h +++ b/include/grpc++/impl/codegen/sync_stream.h @@ -589,20 +589,27 @@ class ServerWriter final : public ServerWriterInterface<W> { if (options.is_last_message()) { options.set_buffer_hint(); } - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops; - if (!ops.SendMessage(msg, options).ok()) { + if (!ctx_->pending_ops_.SendMessage(msg, options).ok()) { return false; } if (!ctx_->sent_initial_metadata_) { - ops.SendInitialMetadata(ctx_->initial_metadata_, - ctx_->initial_metadata_flags()); + ctx_->pending_ops_.SendInitialMetadata(ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { - ops.set_compression_level(ctx_->compression_level()); + ctx_->pending_ops_.set_compression_level(ctx_->compression_level()); } ctx_->sent_initial_metadata_ = true; } - call_->PerformOps(&ops); - return call_->cq()->Pluck(&ops); + call_->PerformOps(&ctx_->pending_ops_); + // if this is the last message we defer the pluck until AFTER we start + // the trailing md op. This prevents hangs. See + // https://github.com/grpc/grpc/issues/11546 + if (options.is_last_message()) { + ctx_->has_pending_ops_ = true; + return true; + } + ctx_->has_pending_ops_ = false; + return call_->cq()->Pluck(&ctx_->pending_ops_); } private: @@ -654,20 +661,27 @@ class ServerReaderWriterBody final { if (options.is_last_message()) { options.set_buffer_hint(); } - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops; - if (!ops.SendMessage(msg, options).ok()) { + if (!ctx_->pending_ops_.SendMessage(msg, options).ok()) { return false; } if (!ctx_->sent_initial_metadata_) { - ops.SendInitialMetadata(ctx_->initial_metadata_, - ctx_->initial_metadata_flags()); + ctx_->pending_ops_.SendInitialMetadata(ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { - ops.set_compression_level(ctx_->compression_level()); + ctx_->pending_ops_.set_compression_level(ctx_->compression_level()); } ctx_->sent_initial_metadata_ = true; } - call_->PerformOps(&ops); - return call_->cq()->Pluck(&ops); + call_->PerformOps(&ctx_->pending_ops_); + // if this is the last message we defer the pluck until AFTER we start + // the trailing md op. This prevents hangs. See + // https://github.com/grpc/grpc/issues/11546 + if (options.is_last_message()) { + ctx_->has_pending_ops_ = true; + return true; + } + ctx_->has_pending_ops_ = false; + return call_->cq()->Pluck(&ctx_->pending_ops_); } private: diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index 3a6bca13b345d5c59f9d40d51aa4670fc79a475a..4913682f1d16ce886bbe15112d8f4db4dc4fdae6 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -120,7 +120,8 @@ ServerContext::ServerContext() call_(nullptr), cq_(nullptr), sent_initial_metadata_(false), - compression_level_set_(false) {} + compression_level_set_(false), + has_pending_ops_(false) {} ServerContext::ServerContext(gpr_timespec deadline, grpc_metadata_array* arr) : completion_op_(nullptr), @@ -130,7 +131,8 @@ ServerContext::ServerContext(gpr_timespec deadline, grpc_metadata_array* arr) call_(nullptr), cq_(nullptr), sent_initial_metadata_(false), - compression_level_set_(false) { + compression_level_set_(false), + has_pending_ops_(false) { std::swap(*client_metadata_.arr(), *arr); client_metadata_.FillMap(); } diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index da1c9b1f15f7ae348ffdc738fcc26afc78d05ac3..d72dda3f5905a0f51cc3da940130eaa36b15a7d9 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -437,7 +437,7 @@ class End2endServerTryCancelTest : public End2endTest { auto stream = stub_->ResponseStream(&context, request); int num_msgs_read = 0; - while (num_msgs_read < kNumResponseStreamsMsgs) { + while (num_msgs_read < kServerDefaultResponseStreamsToSend) { if (!stream->Read(&response)) { break; } @@ -463,14 +463,14 @@ class End2endServerTryCancelTest : public End2endTest { case CANCEL_DURING_PROCESSING: // Server cancelled while writing messages. Client must have read less // than or equal to the expected number of messages - EXPECT_LE(num_msgs_read, kNumResponseStreamsMsgs); + EXPECT_LE(num_msgs_read, kServerDefaultResponseStreamsToSend); break; case CANCEL_AFTER_PROCESSING: // Even though the Server cancelled after writing all messages, the RPC // may be cancelled before the Client got a chance to read all the // messages. - EXPECT_LE(num_msgs_read, kNumResponseStreamsMsgs); + EXPECT_LE(num_msgs_read, kServerDefaultResponseStreamsToSend); break; default: { @@ -743,12 +743,10 @@ TEST_P(End2endTest, ResponseStream) { request.set_message("hello"); auto stream = stub_->ResponseStream(&context, request); - EXPECT_TRUE(stream->Read(&response)); - EXPECT_EQ(response.message(), request.message() + "0"); - EXPECT_TRUE(stream->Read(&response)); - EXPECT_EQ(response.message(), request.message() + "1"); - EXPECT_TRUE(stream->Read(&response)); - EXPECT_EQ(response.message(), request.message() + "2"); + for (int i = 0; i < kServerDefaultResponseStreamsToSend; ++i) { + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.message(), request.message() + grpc::to_string(i)); + } EXPECT_FALSE(stream->Read(&response)); Status s = stream->Finish(); @@ -763,13 +761,34 @@ TEST_P(End2endTest, ResponseStreamWithCoalescingApi) { request.set_message("hello"); context.AddMetadata(kServerUseCoalescingApi, "1"); + auto stream = stub_->ResponseStream(&context, request); + for (int i = 0; i < kServerDefaultResponseStreamsToSend; ++i) { + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.message(), request.message() + grpc::to_string(i)); + } + EXPECT_FALSE(stream->Read(&response)); + + Status s = stream->Finish(); + EXPECT_TRUE(s.ok()); +} + +// This was added to prevent regression from issue: +// https://github.com/grpc/grpc/issues/11546 +TEST_P(End2endTest, ResponseStreamWithEverythingCoalesced) { + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + request.set_message("hello"); + context.AddMetadata(kServerUseCoalescingApi, "1"); + // We will only send one message, forcing everything (init metadata, message, + // trailing) to be coalesced together. + context.AddMetadata(kServerResponseStreamsToSend, "1"); + auto stream = stub_->ResponseStream(&context, request); EXPECT_TRUE(stream->Read(&response)); EXPECT_EQ(response.message(), request.message() + "0"); - EXPECT_TRUE(stream->Read(&response)); - EXPECT_EQ(response.message(), request.message() + "1"); - EXPECT_TRUE(stream->Read(&response)); - EXPECT_EQ(response.message(), request.message() + "2"); + EXPECT_FALSE(stream->Read(&response)); Status s = stream->Finish(); @@ -785,20 +804,12 @@ TEST_P(End2endTest, BidiStream) { auto stream = stub_->BidiStream(&context); - request.set_message(msg + "0"); - EXPECT_TRUE(stream->Write(request)); - EXPECT_TRUE(stream->Read(&response)); - EXPECT_EQ(response.message(), request.message()); - - request.set_message(msg + "1"); - EXPECT_TRUE(stream->Write(request)); - EXPECT_TRUE(stream->Read(&response)); - EXPECT_EQ(response.message(), request.message()); - - request.set_message(msg + "2"); - EXPECT_TRUE(stream->Write(request)); - EXPECT_TRUE(stream->Read(&response)); - EXPECT_EQ(response.message(), request.message()); + for (int i = 0; i < kServerDefaultResponseStreamsToSend; ++i) { + request.set_message(msg + grpc::to_string(i)); + EXPECT_TRUE(stream->Write(request)); + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.message(), request.message()); + } stream->WritesDone(); EXPECT_FALSE(stream->Read(&response)); @@ -841,6 +852,31 @@ TEST_P(End2endTest, BidiStreamWithCoalescingApi) { EXPECT_TRUE(s.ok()); } +// This was added to prevent regression from issue: +// https://github.com/grpc/grpc/issues/11546 +TEST_P(End2endTest, BidiStreamWithEverythingCoalesced) { + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + context.AddMetadata(kServerFinishAfterNReads, "1"); + context.set_initial_metadata_corked(true); + grpc::string msg("hello"); + + auto stream = stub_->BidiStream(&context); + + request.set_message(msg + "0"); + stream->WriteLast(request, WriteOptions()); + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.message(), request.message()); + + EXPECT_FALSE(stream->Read(&response)); + EXPECT_FALSE(stream->Read(&response)); + + Status s = stream->Finish(); + EXPECT_TRUE(s.ok()); +} + // Talk to the two services with the same name but different package names. // The two stubs are created on the same channel. TEST_P(End2endTest, DiffPackageServices) { diff --git a/test/cpp/end2end/hybrid_end2end_test.cc b/test/cpp/end2end/hybrid_end2end_test.cc index 8a31ab77b2fee911b747f82ba05cd1038eb1abbd..cb515533ed3a651fcc851f6ed52a6b74804cf512 100644 --- a/test/cpp/end2end/hybrid_end2end_test.cc +++ b/test/cpp/end2end/hybrid_end2end_test.cc @@ -521,7 +521,7 @@ class SplitResponseStreamDupPkg stream->NextMessageSize(&next_msg_sz); gpr_log(GPR_INFO, "Split Streamed Next Message Size is %u", next_msg_sz); GPR_ASSERT(stream->Read(&req)); - for (int i = 0; i < kNumResponseStreamsMsgs; i++) { + for (int i = 0; i < kServerDefaultResponseStreamsToSend; i++) { resp.set_message(req.message() + grpc::to_string(i) + "_dup"); GPR_ASSERT(stream->Write(resp)); } @@ -561,7 +561,7 @@ class FullySplitStreamedDupPkg stream->NextMessageSize(&next_msg_sz); gpr_log(GPR_INFO, "Split Streamed Next Message Size is %u", next_msg_sz); GPR_ASSERT(stream->Read(&req)); - for (int i = 0; i < kNumResponseStreamsMsgs; i++) { + for (int i = 0; i < kServerDefaultResponseStreamsToSend; i++) { resp.set_message(req.message() + grpc::to_string(i) + "_dup"); GPR_ASSERT(stream->Write(resp)); } @@ -613,7 +613,7 @@ class FullyStreamedDupPkg : public duplicate::EchoTestService::StreamedService { stream->NextMessageSize(&next_msg_sz); gpr_log(GPR_INFO, "Split Streamed Next Message Size is %u", next_msg_sz); GPR_ASSERT(stream->Read(&req)); - for (int i = 0; i < kNumResponseStreamsMsgs; i++) { + for (int i = 0; i < kServerDefaultResponseStreamsToSend; i++) { resp.set_message(req.message() + grpc::to_string(i) + "_dup"); GPR_ASSERT(stream->Write(resp)); } diff --git a/test/cpp/end2end/test_service_impl.cc b/test/cpp/end2end/test_service_impl.cc index e1260277b42402be554587337c3a8af38c01ad3f..4fa98c24f577188ddd019a78a2b9949bb68fc75a 100644 --- a/test/cpp/end2end/test_service_impl.cc +++ b/test/cpp/end2end/test_service_impl.cc @@ -239,6 +239,10 @@ Status TestServiceImpl::ResponseStream(ServerContext* context, int server_coalescing_api = GetIntValueFromMetadata( kServerUseCoalescingApi, context->client_metadata(), 0); + int server_responses_to_send = GetIntValueFromMetadata( + kServerResponseStreamsToSend, context->client_metadata(), + kServerDefaultResponseStreamsToSend); + if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { ServerTryCancel(context); return Status::CANCELLED; @@ -251,9 +255,9 @@ Status TestServiceImpl::ResponseStream(ServerContext* context, new std::thread(&TestServiceImpl::ServerTryCancel, this, context); } - for (int i = 0; i < kNumResponseStreamsMsgs; i++) { + for (int i = 0; i < server_responses_to_send; i++) { response.set_message(request->message() + grpc::to_string(i)); - if (i == kNumResponseStreamsMsgs - 1 && server_coalescing_api != 0) { + if (i == server_responses_to_send - 1 && server_coalescing_api != 0) { writer->WriteLast(response, WriteOptions()); } else { writer->Write(response); diff --git a/test/cpp/end2end/test_service_impl.h b/test/cpp/end2end/test_service_impl.h index 52f1b991c7d9235a022d4fdef98192e2286cbc3c..e485769bb274b2bcbdfe70f4e157e10e9a1f076e 100644 --- a/test/cpp/end2end/test_service_impl.h +++ b/test/cpp/end2end/test_service_impl.h @@ -29,7 +29,8 @@ namespace grpc { namespace testing { -const int kNumResponseStreamsMsgs = 3; +const int kServerDefaultResponseStreamsToSend = 3; +const char* const kServerResponseStreamsToSend = "server_responses_to_send"; const char* const kServerCancelAfterReads = "cancel_after_reads"; const char* const kServerTryCancelRequest = "server_try_cancel"; const char* const kDebugInfoTrailerKey = "debug-info-bin"; diff --git a/test/cpp/util/grpc_tool_test.cc b/test/cpp/util/grpc_tool_test.cc index 4069c7a91a6ecaa02471e04015fceac80d706dd6..dd00581f2b8e99968d3f6e83af219d318e3cb6fa 100644 --- a/test/cpp/util/grpc_tool_test.cc +++ b/test/cpp/util/grpc_tool_test.cc @@ -87,7 +87,7 @@ DECLARE_bool(l); namespace { -const int kNumResponseStreamsMsgs = 3; +const int kServerDefaultResponseStreamsToSend = 3; class TestCliCredentials final : public grpc::testing::CliCredentials { public: @@ -159,7 +159,7 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { context->AddTrailingMetadata("trailing_key", "trailing_value"); EchoResponse response; - for (int i = 0; i < kNumResponseStreamsMsgs; i++) { + for (int i = 0; i < kServerDefaultResponseStreamsToSend; i++) { response.set_message(request->message() + grpc::to_string(i)); writer->Write(response); } @@ -463,7 +463,7 @@ TEST_F(GrpcToolTest, CallCommandResponseStream) { std::placeholders::_1))); // Expected output: "message: \"Hello{n}\"" - for (int i = 0; i < kNumResponseStreamsMsgs; i++) { + for (int i = 0; i < kServerDefaultResponseStreamsToSend; i++) { grpc::string expected_response_text = "message: \"Hello" + grpc::to_string(i) + "\"\n"; EXPECT_TRUE(NULL != strstr(output_stream.str().c_str(),