diff --git a/include/grpc++/impl/codegen/method_handler_impl.h b/include/grpc++/impl/codegen/method_handler_impl.h index 64c91bee22a44b47b7a55242f72e410cfd79f651..5afa4e9f7e9247789c93891c0b2e465fd90ffa38 100644 --- a/include/grpc++/impl/codegen/method_handler_impl.h +++ b/include/grpc++/impl/codegen/method_handler_impl.h @@ -141,6 +141,9 @@ class ServerStreamingHandler : public MethodHandler { } ops.ServerSendStatus(param.server_context->trailing_metadata_, status); param.call->PerformOps(&ops); + if (param.server_context->has_hanging_ops_) { + param.call->cq()->Pluck(¶m.server_context->hanging_ops_); + } param.call->cq()->Pluck(&ops); } diff --git a/include/grpc++/impl/codegen/server_context.h b/include/grpc++/impl/codegen/server_context.h index 96fe645a152a3496c4a3f84da4054b0be00c4940..d7bd7323f26074a57abe8eab39ccf7a97872792e 100644 --- a/include/grpc++/impl/codegen/server_context.h +++ b/include/grpc++/impl/codegen/server_context.h @@ -25,6 +25,7 @@ #include <grpc/impl/codegen/compression_types.h> +#include <grpc++/impl/codegen/call.h> #include <grpc++/impl/codegen/completion_queue_tag.h> #include <grpc++/impl/codegen/config.h> #include <grpc++/impl/codegen/create_auth_context.h> @@ -272,6 +273,8 @@ class ServerContext { uint32_t initial_metadata_flags() const { return 0; } + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> hanging_ops_; + bool has_hanging_ops_; CompletionOp* completion_op_; bool has_notify_when_done_tag_; void* async_notify_when_done_tag_; diff --git a/include/grpc++/impl/codegen/sync_stream.h b/include/grpc++/impl/codegen/sync_stream.h index 09836f340b07b6ceb0be5590a3ccc6913bbc0add..b7d81b33825f2e80d5479f91620244a8f75da0bb 100644 --- a/include/grpc++/impl/codegen/sync_stream.h +++ b/include/grpc++/impl/codegen/sync_stream.h @@ -589,20 +589,27 @@ class ServerWriter final : public ServerWriterInterface<W> { if (options.is_last_message()) { options.set_buffer_hint(); } - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops; - if (!ops.SendMessage(msg, options).ok()) { + if (!ctx_->hanging_ops_.SendMessage(msg, options).ok()) { return false; } if (!ctx_->sent_initial_metadata_) { - ops.SendInitialMetadata(ctx_->initial_metadata_, - ctx_->initial_metadata_flags()); + ctx_->hanging_ops_.SendInitialMetadata(ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { - ops.set_compression_level(ctx_->compression_level()); + ctx_->hanging_ops_.set_compression_level(ctx_->compression_level()); } ctx_->sent_initial_metadata_ = true; } - call_->PerformOps(&ops); - return call_->cq()->Pluck(&ops); + call_->PerformOps(&ctx_->hanging_ops_); + // if this is the last message we defer the pluck until AFTER we start + // the trailing md op. This prevents hangs. See + // https://github.com/grpc/grpc/issues/11546 + if (options.is_last_message()) { + ctx_->has_hanging_ops_ = true; + return true; + } + ctx_->has_hanging_ops_ = false; + return call_->cq()->Pluck(&ctx_->hanging_ops_); } private: