diff --git a/include/grpc++/impl/codegen/async_stream.h b/include/grpc++/impl/codegen/async_stream.h index 1a5cbbd45daf37a27967f2692760cb48bd2e65f6..8f529895cac567d099da3b8c594b88591dc8b204 100644 --- a/include/grpc++/impl/codegen/async_stream.h +++ b/include/grpc++/impl/codegen/async_stream.h @@ -101,6 +101,39 @@ class AsyncWriterInterface { /// \param[in] msg The message to be written. /// \param[in] tag The tag identifying the operation. virtual void Write(const W& msg, void* tag) = 0; + + /// Request the writing of \a msg using WriteOptions \a options with + /// identifying tag \a tag. + /// + /// Only one write may be outstanding at any given time. This means that + /// after calling Write, one must wait to receive \a tag from the completion + /// queue BEFORE calling Write again. + /// WriteOptions \a options is used to set the write options of this message. + /// This is thread-safe with respect to \a Read + /// + /// \param[in] msg The message to be written. + /// \param[in] options The WriteOptions to be used to write this message. + /// \param[in] tag The tag identifying the operation. + virtual void Write(const W& msg, WriteOptions options, void* tag) = 0; + + /// Request the writing of \a msg and coalesce it with the writing + /// of trailing metadata, using WriteOptions \a options with identifying tag + /// \a tag. + /// + /// For client, WriteLast is equivalent of performing Write and WritesDone in + /// a single step. + /// For server, WriteLast buffers the \a msg. The writing of \a msg is held + /// until Finish is called, where \a msg and trailing metadata are coalesced + /// and write is initiated. Note that WriteLast can only buffer \a msg up to + /// the flow control window size. If \a msg size is larger than the window + /// size, it will be sent on wire without buffering. + /// + /// \param[in] msg The message to be written. + /// \param[in] options The WriteOptions to be used to write this message. + /// \param[in] tag The tag identifying the operation. + void WriteLast(const W& msg, WriteOptions options, void* tag) { + Write(msg, options.set_last_message(), tag); + } }; template <class R> @@ -183,11 +216,17 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> { : context_(context), call_(channel->CreateCall(method, context, cq)) { finish_ops_.RecvMessage(response); finish_ops_.AllowNoMessage(); - - init_ops_.set_output_tag(tag); - init_ops_.SendInitialMetadata(context->send_initial_metadata_, - context->initial_metadata_flags()); - call_.PerformOps(&init_ops_); + // if corked bit is set in context, we buffer up the initial metadata to + // coalesce with later message to be sent. No op is performed. + if (context_->initial_metadata_corked_) { + write_ops_.SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); + } else { + write_ops_.set_output_tag(tag); + write_ops_.SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); + call_.PerformOps(&write_ops_); + } } void ReadInitialMetadata(void* tag) override { @@ -205,10 +244,21 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> { call_.PerformOps(&write_ops_); } + void Write(const W& msg, WriteOptions options, void* tag) override { + write_ops_.set_output_tag(tag); + if (options.is_last_message()) { + options.set_buffer_hint(); + write_ops_.ClientSendClose(); + } + // TODO(ctiller): don't assert + GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); + call_.PerformOps(&write_ops_); + } + void WritesDone(void* tag) override { - writes_done_ops_.set_output_tag(tag); - writes_done_ops_.ClientSendClose(); - call_.PerformOps(&writes_done_ops_); + write_ops_.set_output_tag(tag); + write_ops_.ClientSendClose(); + call_.PerformOps(&write_ops_); } void Finish(Status* status, void* tag) override { @@ -223,10 +273,9 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> { private: ClientContext* context_; Call call_; - CallOpSet<CallOpSendInitialMetadata> init_ops_; CallOpSet<CallOpRecvInitialMetadata> meta_ops_; - CallOpSet<CallOpSendMessage> write_ops_; - CallOpSet<CallOpClientSendClose> writes_done_ops_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> + write_ops_; CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage, CallOpClientRecvStatus> finish_ops_; @@ -253,10 +302,17 @@ class ClientAsyncReaderWriter final const RpcMethod& method, ClientContext* context, void* tag) : context_(context), call_(channel->CreateCall(method, context, cq)) { - init_ops_.set_output_tag(tag); - init_ops_.SendInitialMetadata(context->send_initial_metadata_, - context->initial_metadata_flags()); - call_.PerformOps(&init_ops_); + if (context_->initial_metadata_corked_) { + // if corked bit is set in context, we buffer up the initial metadata to + // coalesce with later message to be sent. No op is performed. + write_ops_.SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); + } else { + write_ops_.set_output_tag(tag); + write_ops_.SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); + call_.PerformOps(&write_ops_); + } } void ReadInitialMetadata(void* tag) override { @@ -283,10 +339,21 @@ class ClientAsyncReaderWriter final call_.PerformOps(&write_ops_); } + void Write(const W& msg, WriteOptions options, void* tag) override { + write_ops_.set_output_tag(tag); + if (options.is_last_message()) { + options.set_buffer_hint(); + write_ops_.ClientSendClose(); + } + // TODO(ctiller): don't assert + GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); + call_.PerformOps(&write_ops_); + } + void WritesDone(void* tag) override { - writes_done_ops_.set_output_tag(tag); - writes_done_ops_.ClientSendClose(); - call_.PerformOps(&writes_done_ops_); + write_ops_.set_output_tag(tag); + write_ops_.ClientSendClose(); + call_.PerformOps(&write_ops_); } void Finish(Status* status, void* tag) override { @@ -301,11 +368,10 @@ class ClientAsyncReaderWriter final private: ClientContext* context_; Call call_; - CallOpSet<CallOpSendInitialMetadata> init_ops_; CallOpSet<CallOpRecvInitialMetadata> meta_ops_; CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_; - CallOpSet<CallOpSendMessage> write_ops_; - CallOpSet<CallOpClientSendClose> writes_done_ops_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> + write_ops_; CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_; }; @@ -395,6 +461,20 @@ class ServerAsyncWriterInterface : public ServerAsyncStreamingInterface, public AsyncWriterInterface<W> { public: virtual void Finish(const Status& status, void* tag) = 0; + + /// Request the writing of \a msg and coalesce it with trailing metadata which + /// contains \a status, using WriteOptions options with identifying tag \a + /// tag. + /// + /// WriteAndFinish is equivalent of performing WriteLast and Finish in a + /// single step. + /// + /// \param[in] msg The message to be written. + /// \param[in] options The WriteOptions to be used to write this message. + /// \param[in] status The Status that server returns to client. + /// \param[in] tag The tag identifying the operation. + virtual void WriteAndFinish(const W& msg, WriteOptions options, + const Status& status, void* tag) = 0; }; template <class W> @@ -418,29 +498,37 @@ class ServerAsyncWriter final : public ServerAsyncWriterInterface<W> { void Write(const W& msg, void* tag) override { write_ops_.set_output_tag(tag); - if (!ctx_->sent_initial_metadata_) { - write_ops_.SendInitialMetadata(ctx_->initial_metadata_, - ctx_->initial_metadata_flags()); - if (ctx_->compression_level_set()) { - write_ops_.set_compression_level(ctx_->compression_level()); - } - ctx_->sent_initial_metadata_ = true; - } + EnsureInitialMetadataSent(&write_ops_); // TODO(ctiller): don't assert GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok()); call_.PerformOps(&write_ops_); } + void Write(const W& msg, WriteOptions options, void* tag) override { + write_ops_.set_output_tag(tag); + if (options.is_last_message()) { + options.set_buffer_hint(); + } + + EnsureInitialMetadataSent(&write_ops_); + // TODO(ctiller): don't assert + GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); + call_.PerformOps(&write_ops_); + } + + void WriteAndFinish(const W& msg, WriteOptions options, const Status& status, + void* tag) override { + write_ops_.set_output_tag(tag); + EnsureInitialMetadataSent(&write_ops_); + options.set_buffer_hint(); + GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); + write_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); + call_.PerformOps(&write_ops_); + } + void Finish(const Status& status, void* tag) override { finish_ops_.set_output_tag(tag); - if (!ctx_->sent_initial_metadata_) { - finish_ops_.SendInitialMetadata(ctx_->initial_metadata_, - ctx_->initial_metadata_flags()); - if (ctx_->compression_level_set()) { - finish_ops_.set_compression_level(ctx_->compression_level()); - } - ctx_->sent_initial_metadata_ = true; - } + EnsureInitialMetadataSent(&finish_ops_); finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); call_.PerformOps(&finish_ops_); } @@ -448,10 +536,24 @@ class ServerAsyncWriter final : public ServerAsyncWriterInterface<W> { private: void BindCall(Call* call) override { call_ = *call; } + template <class T> + void EnsureInitialMetadataSent(T* ops) { + if (!ctx_->sent_initial_metadata_) { + ops->SendInitialMetadata(ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + ops->set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + } + } + Call call_; ServerContext* ctx_; CallOpSet<CallOpSendInitialMetadata> meta_ops_; - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpServerSendStatus> + write_ops_; CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_; }; @@ -462,6 +564,20 @@ class ServerAsyncReaderWriterInterface : public ServerAsyncStreamingInterface, public AsyncReaderInterface<R> { public: virtual void Finish(const Status& status, void* tag) = 0; + + /// Request the writing of \a msg and coalesce it with trailing metadata which + /// contains \a status, using WriteOptions options with identifying tag \a + /// tag. + /// + /// WriteAndFinish is equivalent of performing WriteLast and Finish in a + /// single step. + /// + /// \param[in] msg The message to be written. + /// \param[in] options The WriteOptions to be used to write this message. + /// \param[in] status The Status that server returns to client. + /// \param[in] tag The tag identifying the operation. + virtual void WriteAndFinish(const W& msg, WriteOptions options, + const Status& status, void* tag) = 0; }; template <class W, class R> @@ -492,29 +608,36 @@ class ServerAsyncReaderWriter final void Write(const W& msg, void* tag) override { write_ops_.set_output_tag(tag); - if (!ctx_->sent_initial_metadata_) { - write_ops_.SendInitialMetadata(ctx_->initial_metadata_, - ctx_->initial_metadata_flags()); - if (ctx_->compression_level_set()) { - write_ops_.set_compression_level(ctx_->compression_level()); - } - ctx_->sent_initial_metadata_ = true; - } + EnsureInitialMetadataSent(&write_ops_); // TODO(ctiller): don't assert GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok()); call_.PerformOps(&write_ops_); } + void Write(const W& msg, WriteOptions options, void* tag) override { + write_ops_.set_output_tag(tag); + if (options.is_last_message()) { + options.set_buffer_hint(); + } + EnsureInitialMetadataSent(&write_ops_); + GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); + call_.PerformOps(&write_ops_); + } + + void WriteAndFinish(const W& msg, WriteOptions options, const Status& status, + void* tag) override { + write_ops_.set_output_tag(tag); + EnsureInitialMetadataSent(&write_ops_); + options.set_buffer_hint(); + GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); + write_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); + call_.PerformOps(&write_ops_); + } + void Finish(const Status& status, void* tag) override { finish_ops_.set_output_tag(tag); - if (!ctx_->sent_initial_metadata_) { - finish_ops_.SendInitialMetadata(ctx_->initial_metadata_, - ctx_->initial_metadata_flags()); - if (ctx_->compression_level_set()) { - finish_ops_.set_compression_level(ctx_->compression_level()); - } - ctx_->sent_initial_metadata_ = true; - } + EnsureInitialMetadataSent(&finish_ops_); + finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); call_.PerformOps(&finish_ops_); } @@ -524,11 +647,25 @@ class ServerAsyncReaderWriter final void BindCall(Call* call) override { call_ = *call; } + template <class T> + void EnsureInitialMetadataSent(T* ops) { + if (!ctx_->sent_initial_metadata_) { + ops->SendInitialMetadata(ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + ops->set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + } + } + Call call_; ServerContext* ctx_; CallOpSet<CallOpSendInitialMetadata> meta_ops_; CallOpSet<CallOpRecvMessage<R>> read_ops_; - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpServerSendStatus> + write_ops_; CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_; }; diff --git a/include/grpc++/impl/codegen/call.h b/include/grpc++/impl/codegen/call.h index 19a5ca2b2eead03e22543717fb7ea36812c57610..a3f2be6bb1e32a6e003254c3cdecac1ba0bbf249 100644 --- a/include/grpc++/impl/codegen/call.h +++ b/include/grpc++/impl/codegen/call.h @@ -84,8 +84,9 @@ inline grpc_metadata* FillMetadataArray( /// Per-message write options. class WriteOptions { public: - WriteOptions() : flags_(0) {} - WriteOptions(const WriteOptions& other) : flags_(other.flags_) {} + WriteOptions() : flags_(0), last_message_(false) {} + WriteOptions(const WriteOptions& other) + : flags_(other.flags_), last_message_(other.last_message_) {} /// Clear all flags. inline void Clear() { flags_ = 0; } @@ -141,6 +142,43 @@ class WriteOptions { /// \sa GRPC_WRITE_BUFFER_HINT inline bool get_buffer_hint() const { return GetBit(GRPC_WRITE_BUFFER_HINT); } + /// corked bit: aliases set_buffer_hint currently, with the intent that + /// set_buffer_hint will be removed in the future + inline WriteOptions& set_corked() { + SetBit(GRPC_WRITE_BUFFER_HINT); + return *this; + } + + inline WriteOptions& clear_corked() { + ClearBit(GRPC_WRITE_BUFFER_HINT); + return *this; + } + + inline bool is_corked() const { return GetBit(GRPC_WRITE_BUFFER_HINT); } + + /// last-message bit: indicates this is the last message in a stream + /// client-side: makes Write the equivalent of performing Write, WritesDone + /// in a single step + /// server-side: hold the Write until the service handler returns (sync api) + /// or until Finish is called (async api) + inline WriteOptions& set_last_message() { + last_message_ = true; + return *this; + } + + /// Clears flag indicating that this is the last message in a stream, + /// disabling coalescing. + inline WriteOptions& clear_last_messsage() { + last_message_ = false; + return *this; + } + + /// Get value for the flag indicating that this is the last message, and + /// should be coalesced with trailing metadata. + /// + /// \sa GRPC_WRITE_LAST_MESSAGE + bool is_last_message() const { return last_message_; } + WriteOptions& operator=(const WriteOptions& rhs) { flags_ = rhs.flags_; return *this; @@ -154,6 +192,7 @@ class WriteOptions { bool GetBit(const uint32_t mask) const { return (flags_ & mask) != 0; } uint32_t flags_; + bool last_message_; }; /// Default argument for CallOpSet. I is unused by the class, but can be @@ -224,7 +263,7 @@ class CallOpSendMessage { /// after use. template <class M> Status SendMessage(const M& message, - const WriteOptions& options) GRPC_MUST_USE_RESULT; + WriteOptions options) GRPC_MUST_USE_RESULT; template <class M> Status SendMessage(const M& message) GRPC_MUST_USE_RESULT; @@ -252,8 +291,7 @@ class CallOpSendMessage { }; template <class M> -Status CallOpSendMessage::SendMessage(const M& message, - const WriteOptions& options) { +Status CallOpSendMessage::SendMessage(const M& message, WriteOptions options) { write_options_ = options; return SerializationTraits<M>::Serialize(message, &send_buf_, &own_buf_); } diff --git a/include/grpc++/impl/codegen/client_context.h b/include/grpc++/impl/codegen/client_context.h index b91c7f65d434749447ee5fcca10d622f59925ef5..3c50e6ba9d4bddd4de822ced0e74e25b30914927 100644 --- a/include/grpc++/impl/codegen/client_context.h +++ b/include/grpc++/impl/codegen/client_context.h @@ -281,6 +281,17 @@ class ClientContext { /// \param algorithm The compression algorithm used for the client call. void set_compression_algorithm(grpc_compression_algorithm algorithm); + /// Flag whether the initial metadata should be \a corked + /// + /// If \a corked is true, then the initial metadata will be colasced with the + /// write of first message in the stream. + /// + /// \param corked The flag indicating whether the initial metadata is to be + /// corked or not. + void set_initial_metadata_corked(bool corked) { + initial_metadata_corked_ = corked; + } + /// Return the peer uri in a string. /// /// \warning This value is never authenticated or subject to any security @@ -357,7 +368,8 @@ class ClientContext { (cacheable_ ? GRPC_INITIAL_METADATA_CACHEABLE_REQUEST : 0) | (wait_for_ready_explicitly_set_ ? GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET - : 0); + : 0) | + (initial_metadata_corked_ ? GRPC_INITIAL_METADATA_CORKED : 0); } grpc::string authority() { return authority_; } @@ -384,6 +396,7 @@ class ClientContext { PropagationOptions propagation_options_; grpc_compression_algorithm compression_algorithm_; + bool initial_metadata_corked_; }; } // namespace grpc diff --git a/include/grpc++/impl/codegen/sync_stream.h b/include/grpc++/impl/codegen/sync_stream.h index 4d9b074e95f92b8db5a7641b277813056cf5dfd1..ae3b8e441d1edc21266e5cd92232a34f08ad2dbb 100644 --- a/include/grpc++/impl/codegen/sync_stream.h +++ b/include/grpc++/impl/codegen/sync_stream.h @@ -100,22 +100,40 @@ class WriterInterface { public: virtual ~WriterInterface() {} - /// Blocking write \a msg to the stream with options. + /// Blocking write \a msg to the stream with WriteOptions \a options. /// This is thread-safe with respect to \a Read /// /// \param msg The message to be written to the stream. - /// \param options Options affecting the write operation. + /// \param options The WriteOptions affecting the write operation. /// /// \return \a true on success, \a false when the stream has been closed. - virtual bool Write(const W& msg, const WriteOptions& options) = 0; + virtual bool Write(const W& msg, WriteOptions options) = 0; - /// Blocking write \a msg to the stream with default options. + /// Blocking write \a msg to the stream with default write options. /// This is thread-safe with respect to \a Read /// /// \param msg The message to be written to the stream. /// /// \return \a true on success, \a false when the stream has been closed. inline bool Write(const W& msg) { return Write(msg, WriteOptions()); } + + /// Write \a msg and coalesce it with the writing of trailing metadata, using + /// WriteOptions \a options. + /// + /// For client, WriteLast is equivalent of performing Write and WritesDone in + /// a single step. \a msg and trailing metadata are coalesced and sent on wire + /// by calling this function. + /// For server, WriteLast buffers the \a msg. The writing of \a msg is held + /// until the service handler returns, where \a msg and trailing metadata are + /// coalesced and sent on wire. Note that WriteLast can only buffer \a msg up + /// to the flow control window size. If \a msg size is larger than the window + /// size, it will be sent on wire without buffering. + /// + /// \param[in] msg The message to be written to the stream. + /// \param[in] options The WriteOptions to be used to write this message. + void WriteLast(const W& msg, WriteOptions options) { + Write(msg, options.set_last_message()); + } }; /// Client-side interface for streaming reads of message of type \a R. @@ -213,11 +231,13 @@ class ClientWriter : public ClientWriterInterface<W> { finish_ops_.RecvMessage(response); finish_ops_.AllowNoMessage(); - CallOpSet<CallOpSendInitialMetadata> ops; - ops.SendInitialMetadata(context->send_initial_metadata_, - context->initial_metadata_flags()); - call_.PerformOps(&ops); - cq_.Pluck(&ops); + if (!context_->initial_metadata_corked_) { + CallOpSet<CallOpSendInitialMetadata> ops; + ops.SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); + call_.PerformOps(&ops); + cq_.Pluck(&ops); + } } void WaitForInitialMetadata() { @@ -230,11 +250,24 @@ class ClientWriter : public ClientWriterInterface<W> { } using WriterInterface<W>::Write; - bool Write(const W& msg, const WriteOptions& options) override { - CallOpSet<CallOpSendMessage> ops; + bool Write(const W& msg, WriteOptions options) override { + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpClientSendClose> + ops; + + if (options.is_last_message()) { + options.set_buffer_hint(); + ops.ClientSendClose(); + } + if (context_->initial_metadata_corked_) { + ops.SendInitialMetadata(context_->send_initial_metadata_, + context_->initial_metadata_flags()); + context_->set_initial_metadata_corked(false); + } if (!ops.SendMessage(msg, options).ok()) { return false; } + call_.PerformOps(&ops); return cq_.Pluck(&ops); } @@ -293,11 +326,13 @@ class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> { ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method, ClientContext* context) : context_(context), call_(channel->CreateCall(method, context, &cq_)) { - CallOpSet<CallOpSendInitialMetadata> ops; - ops.SendInitialMetadata(context->send_initial_metadata_, - context->initial_metadata_flags()); - call_.PerformOps(&ops); - cq_.Pluck(&ops); + if (!context_->initial_metadata_corked_) { + CallOpSet<CallOpSendInitialMetadata> ops; + ops.SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); + call_.PerformOps(&ops); + cq_.Pluck(&ops); + } } void WaitForInitialMetadata() override { @@ -325,9 +360,24 @@ class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> { } using WriterInterface<W>::Write; - bool Write(const W& msg, const WriteOptions& options) override { - CallOpSet<CallOpSendMessage> ops; - if (!ops.SendMessage(msg, options).ok()) return false; + bool Write(const W& msg, WriteOptions options) override { + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpClientSendClose> + ops; + + if (options.is_last_message()) { + options.set_buffer_hint(); + ops.ClientSendClose(); + } + if (context_->initial_metadata_corked_) { + ops.SendInitialMetadata(context_->send_initial_metadata_, + context_->initial_metadata_flags()); + context_->set_initial_metadata_corked(false); + } + if (!ops.SendMessage(msg, options).ok()) { + return false; + } + call_.PerformOps(&ops); return cq_.Pluck(&ops); } @@ -423,7 +473,10 @@ class ServerWriter final : public ServerWriterInterface<W> { } using WriterInterface<W>::Write; - bool Write(const W& msg, const WriteOptions& options) override { + bool Write(const W& msg, WriteOptions options) override { + if (options.is_last_message()) { + options.set_buffer_hint(); + } CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops; if (!ops.SendMessage(msg, options).ok()) { return false; @@ -485,7 +538,10 @@ class ServerReaderWriterBody final { return call_->cq()->Pluck(&ops) && ops.got_message; } - bool Write(const W& msg, const WriteOptions& options) { + bool Write(const W& msg, WriteOptions options) { + if (options.is_last_message()) { + options.set_buffer_hint(); + } CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops; if (!ops.SendMessage(msg, options).ok()) { return false; @@ -523,7 +579,7 @@ class ServerReaderWriter final : public ServerReaderWriterInterface<W, R> { bool Read(R* msg) override { return body_.Read(msg); } using WriterInterface<W>::Write; - bool Write(const W& msg, const WriteOptions& options) override { + bool Write(const W& msg, WriteOptions options) override { return body_.Write(msg, options); } @@ -562,8 +618,7 @@ class ServerUnaryStreamer final } using WriterInterface<ResponseType>::Write; - bool Write(const ResponseType& response, - const WriteOptions& options) override { + bool Write(const ResponseType& response, WriteOptions options) override { if (write_done_ || !read_done_) { return false; } @@ -604,8 +659,7 @@ class ServerSplitStreamer final } using WriterInterface<ResponseType>::Write; - bool Write(const ResponseType& response, - const WriteOptions& options) override { + bool Write(const ResponseType& response, WriteOptions options) override { return read_done_ && body_.Write(response, options); } diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h index 887c176f1a4212065f1887396939bb08991382b6..baea961fcb3e4eae0d3a1ddae35a1b6a35926739 100644 --- a/include/grpc/impl/codegen/grpc_types.h +++ b/include/grpc/impl/codegen/grpc_types.h @@ -316,13 +316,16 @@ typedef enum grpc_call_error { /** Signal that GRPC_INITIAL_METADATA_WAIT_FOR_READY was explicitly set by the calling application. */ #define GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET (0x00000080u) +/** Signal that the initial metadata should be corked */ +#define GRPC_INITIAL_METADATA_CORKED (0x00000100u) /** Mask of all valid flags */ -#define GRPC_INITIAL_METADATA_USED_MASK \ - (GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST | \ - GRPC_INITIAL_METADATA_WAIT_FOR_READY | \ - GRPC_INITIAL_METADATA_CACHEABLE_REQUEST | \ - GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET) +#define GRPC_INITIAL_METADATA_USED_MASK \ + (GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST | \ + GRPC_INITIAL_METADATA_WAIT_FOR_READY | \ + GRPC_INITIAL_METADATA_CACHEABLE_REQUEST | \ + GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET | \ + GRPC_INITIAL_METADATA_CORKED) /** A single metadata element */ typedef struct grpc_metadata { diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 8676a3752e8d14bec71fb535f563cbd5875e95ea..967470be516c6c128301a904ecd10d8b6c91056d 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -1210,8 +1210,13 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, } } else { GPR_ASSERT(s->id != 0); - grpc_chttp2_become_writable(exec_ctx, t, s, - GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED, + grpc_chttp2_stream_write_type write_type = + GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED; + if (op->send_message != NULL && + (op->send_message->flags & GRPC_WRITE_BUFFER_HINT)) { + write_type = GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK; + } + grpc_chttp2_become_writable(exec_ctx, t, s, write_type, "op.send_initial_metadata"); } } else { diff --git a/src/cpp/client/client_context.cc b/src/cpp/client/client_context.cc index c073741dac0a7bd23ff5cb824670f69f73fb1e19..3d884cf62e4278380a99409fa74d44c4581b9fd5 100644 --- a/src/cpp/client/client_context.cc +++ b/src/cpp/client/client_context.cc @@ -67,7 +67,8 @@ ClientContext::ClientContext() call_canceled_(false), deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)), census_context_(nullptr), - propagate_from_call_(nullptr) { + propagate_from_call_(nullptr), + initial_metadata_corked_(false) { g_client_callbacks->DefaultConstructor(this); } diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index 32e8a417958075f896497dae8cbc9ccbb5fdd62e..0b5215ef8e4556e1b447702c4dee83dfb1d8549c 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -484,6 +484,81 @@ TEST_P(AsyncEnd2endTest, SimpleClientStreaming) { EXPECT_TRUE(recv_status.ok()); } +// Two pings and a final pong. +TEST_P(AsyncEnd2endTest, SimpleClientStreamingWithCoalescingApi) { + ResetStub(); + + EchoRequest send_request; + EchoRequest recv_request; + EchoResponse send_response; + EchoResponse recv_response; + Status recv_status; + ClientContext cli_ctx; + ServerContext srv_ctx; + ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx); + + send_request.set_message(GetParam().message_content); + cli_ctx.set_initial_metadata_corked(true); + // tag:1 never comes up since no op is performed + std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream( + stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1))); + + service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), + tag(2)); + + cli_stream->Write(send_request, tag(3)); + + // 65536(64KB) is the default flow control window size. Should change this + // number when default flow control window size changes. For the write of + // send_request larger than the flow control window size, tag:3 will not come + // up until server read is initiated. For write of send_request smaller than + // the flow control window size, the request can take the free ride with + // initial metadata due to coalescing, thus write tag:3 will come up here. + if (GetParam().message_content.length() < 65536) { + Verifier(GetParam().disable_blocking) + .Expect(2, true) + .Expect(3, true) + .Verify(cq_.get()); + } else { + Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); + } + + srv_stream.Read(&recv_request, tag(4)); + + if (GetParam().message_content.length() < 65536) { + Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get()); + } else { + Verifier(GetParam().disable_blocking) + .Expect(3, true) + .Expect(4, true) + .Verify(cq_.get()); + } + + EXPECT_EQ(send_request.message(), recv_request.message()); + + cli_stream->WriteLast(send_request, WriteOptions(), tag(5)); + srv_stream.Read(&recv_request, tag(6)); + Verifier(GetParam().disable_blocking) + .Expect(5, true) + .Expect(6, true) + .Verify(cq_.get()); + EXPECT_EQ(send_request.message(), recv_request.message()); + + srv_stream.Read(&recv_request, tag(7)); + Verifier(GetParam().disable_blocking).Expect(7, false).Verify(cq_.get()); + + send_response.set_message(recv_request.message()); + srv_stream.Finish(send_response, Status::OK, tag(8)); + cli_stream->Finish(&recv_status, tag(9)); + Verifier(GetParam().disable_blocking) + .Expect(8, true) + .Expect(9, true) + .Verify(cq_.get()); + + EXPECT_EQ(send_response.message(), recv_response.message()); + EXPECT_TRUE(recv_status.ok()); +} + // One ping, two pongs. TEST_P(AsyncEnd2endTest, SimpleServerStreaming) { ResetStub(); @@ -540,6 +615,112 @@ TEST_P(AsyncEnd2endTest, SimpleServerStreaming) { EXPECT_TRUE(recv_status.ok()); } +// One ping, two pongs. Using WriteAndFinish API +TEST_P(AsyncEnd2endTest, SimpleServerStreamingWithCoalescingApiWAF) { + ResetStub(); + + EchoRequest send_request; + EchoRequest recv_request; + EchoResponse send_response; + EchoResponse recv_response; + Status recv_status; + ClientContext cli_ctx; + ServerContext srv_ctx; + ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx); + + send_request.set_message(GetParam().message_content); + std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream( + stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1))); + + service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, + cq_.get(), cq_.get(), tag(2)); + + Verifier(GetParam().disable_blocking) + .Expect(1, true) + .Expect(2, true) + .Verify(cq_.get()); + EXPECT_EQ(send_request.message(), recv_request.message()); + + send_response.set_message(recv_request.message()); + srv_stream.Write(send_response, tag(3)); + cli_stream->Read(&recv_response, tag(4)); + Verifier(GetParam().disable_blocking) + .Expect(3, true) + .Expect(4, true) + .Verify(cq_.get()); + EXPECT_EQ(send_response.message(), recv_response.message()); + + srv_stream.WriteAndFinish(send_response, WriteOptions(), Status::OK, tag(5)); + cli_stream->Read(&recv_response, tag(6)); + Verifier(GetParam().disable_blocking) + .Expect(5, true) + .Expect(6, true) + .Verify(cq_.get()); + EXPECT_EQ(send_response.message(), recv_response.message()); + + cli_stream->Read(&recv_response, tag(7)); + Verifier(GetParam().disable_blocking).Expect(7, false).Verify(cq_.get()); + + cli_stream->Finish(&recv_status, tag(8)); + Verifier(GetParam().disable_blocking).Expect(8, true).Verify(cq_.get()); + + EXPECT_TRUE(recv_status.ok()); +} + +// One ping, two pongs. Using WriteLast API +TEST_P(AsyncEnd2endTest, SimpleServerStreamingWithCoalescingApiWL) { + ResetStub(); + + EchoRequest send_request; + EchoRequest recv_request; + EchoResponse send_response; + EchoResponse recv_response; + Status recv_status; + ClientContext cli_ctx; + ServerContext srv_ctx; + ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx); + + send_request.set_message(GetParam().message_content); + std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream( + stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1))); + + service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, + cq_.get(), cq_.get(), tag(2)); + + Verifier(GetParam().disable_blocking) + .Expect(1, true) + .Expect(2, true) + .Verify(cq_.get()); + EXPECT_EQ(send_request.message(), recv_request.message()); + + send_response.set_message(recv_request.message()); + srv_stream.Write(send_response, tag(3)); + cli_stream->Read(&recv_response, tag(4)); + Verifier(GetParam().disable_blocking) + .Expect(3, true) + .Expect(4, true) + .Verify(cq_.get()); + EXPECT_EQ(send_response.message(), recv_response.message()); + + srv_stream.WriteLast(send_response, WriteOptions(), tag(5)); + cli_stream->Read(&recv_response, tag(6)); + srv_stream.Finish(Status::OK, tag(7)); + Verifier(GetParam().disable_blocking) + .Expect(5, true) + .Expect(6, true) + .Expect(7, true) + .Verify(cq_.get()); + EXPECT_EQ(send_response.message(), recv_response.message()); + + cli_stream->Read(&recv_response, tag(8)); + Verifier(GetParam().disable_blocking).Expect(8, false).Verify(cq_.get()); + + cli_stream->Finish(&recv_status, tag(9)); + Verifier(GetParam().disable_blocking).Expect(9, true).Verify(cq_.get()); + + EXPECT_TRUE(recv_status.ok()); +} + // One ping, one pong. TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) { ResetStub(); @@ -599,6 +780,144 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) { EXPECT_TRUE(recv_status.ok()); } +// One ping, one pong. Using server:WriteAndFinish api +TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWAF) { + ResetStub(); + + EchoRequest send_request; + EchoRequest recv_request; + EchoResponse send_response; + EchoResponse recv_response; + Status recv_status; + ClientContext cli_ctx; + ServerContext srv_ctx; + ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx); + + send_request.set_message(GetParam().message_content); + cli_ctx.set_initial_metadata_corked(true); + std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>> + cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1))); + + service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), + tag(2)); + + cli_stream->WriteLast(send_request, WriteOptions(), tag(3)); + + // 65536(64KB) is the default flow control window size. Should change this + // number when default flow control window size changes. For the write of + // send_request larger than the flow control window size, tag:3 will not come + // up until server read is initiated. For write of send_request smaller than + // the flow control window size, the request can take the free ride with + // initial metadata due to coalescing, thus write tag:3 will come up here. + if (GetParam().message_content.length() < 65536) { + Verifier(GetParam().disable_blocking) + .Expect(2, true) + .Expect(3, true) + .Verify(cq_.get()); + } else { + Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); + } + + srv_stream.Read(&recv_request, tag(4)); + + if (GetParam().message_content.length() < 65536) { + Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get()); + } else { + Verifier(GetParam().disable_blocking) + .Expect(3, true) + .Expect(4, true) + .Verify(cq_.get()); + } + EXPECT_EQ(send_request.message(), recv_request.message()); + + srv_stream.Read(&recv_request, tag(5)); + Verifier(GetParam().disable_blocking).Expect(5, false).Verify(cq_.get()); + + send_response.set_message(recv_request.message()); + srv_stream.WriteAndFinish(send_response, WriteOptions(), Status::OK, tag(6)); + cli_stream->Read(&recv_response, tag(7)); + Verifier(GetParam().disable_blocking) + .Expect(6, true) + .Expect(7, true) + .Verify(cq_.get()); + EXPECT_EQ(send_response.message(), recv_response.message()); + + cli_stream->Finish(&recv_status, tag(8)); + Verifier(GetParam().disable_blocking).Expect(8, true).Verify(cq_.get()); + + EXPECT_TRUE(recv_status.ok()); +} + +// One ping, one pong. Using server:WriteLast api +TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWL) { + ResetStub(); + + EchoRequest send_request; + EchoRequest recv_request; + EchoResponse send_response; + EchoResponse recv_response; + Status recv_status; + ClientContext cli_ctx; + ServerContext srv_ctx; + ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx); + + send_request.set_message(GetParam().message_content); + cli_ctx.set_initial_metadata_corked(true); + std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>> + cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1))); + + service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), + tag(2)); + + cli_stream->WriteLast(send_request, WriteOptions(), tag(3)); + + // 65536(64KB) is the default flow control window size. Should change this + // number when default flow control window size changes. For the write of + // send_request larger than the flow control window size, tag:3 will not come + // up until server read is initiated. For write of send_request smaller than + // the flow control window size, the request can take the free ride with + // initial metadata due to coalescing, thus write tag:3 will come up here. + if (GetParam().message_content.length() < 65536) { + Verifier(GetParam().disable_blocking) + .Expect(2, true) + .Expect(3, true) + .Verify(cq_.get()); + } else { + Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); + } + + srv_stream.Read(&recv_request, tag(4)); + + if (GetParam().message_content.length() < 65536) { + Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get()); + } else { + Verifier(GetParam().disable_blocking) + .Expect(3, true) + .Expect(4, true) + .Verify(cq_.get()); + } + EXPECT_EQ(send_request.message(), recv_request.message()); + + srv_stream.Read(&recv_request, tag(5)); + Verifier(GetParam().disable_blocking).Expect(5, false).Verify(cq_.get()); + + send_response.set_message(recv_request.message()); + srv_stream.WriteLast(send_response, WriteOptions(), tag(6)); + srv_stream.Finish(Status::OK, tag(7)); + cli_stream->Read(&recv_response, tag(8)); + Verifier(GetParam().disable_blocking) + .Expect(6, true) + .Expect(7, true) + .Expect(8, true) + .Verify(cq_.get()); + EXPECT_EQ(send_response.message(), recv_response.message()); + + cli_stream->Finish(&recv_status, tag(9)); + Verifier(GetParam().disable_blocking).Expect(9, true).Verify(cq_.get()); + + EXPECT_TRUE(recv_status.ok()); +} + // Metadata tests TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) { ResetStub(); diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index df78557c43783cb73f40137217597e42c2f5c5ea..d3a83b188f944c3377772f7f2cd07ec6a66a8179 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -702,6 +702,21 @@ TEST_P(End2endTest, RequestStreamOneRequest) { EXPECT_TRUE(s.ok()); } +TEST_P(End2endTest, RequestStreamOneRequestWithCoalescingApi) { + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + + context.set_initial_metadata_corked(true); + auto stream = stub_->RequestStream(&context, &response); + request.set_message("hello"); + stream->WriteLast(request, WriteOptions()); + Status s = stream->Finish(); + EXPECT_EQ(response.message(), request.message()); + EXPECT_TRUE(s.ok()); +} + TEST_P(End2endTest, RequestStreamTwoRequests) { ResetStub(); EchoRequest request; @@ -718,6 +733,22 @@ TEST_P(End2endTest, RequestStreamTwoRequests) { EXPECT_TRUE(s.ok()); } +TEST_P(End2endTest, RequestStreamTwoRequestsWithCoalescingApi) { + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + + context.set_initial_metadata_corked(true); + auto stream = stub_->RequestStream(&context, &response); + request.set_message("hello"); + EXPECT_TRUE(stream->Write(request)); + stream->WriteLast(request, WriteOptions()); + Status s = stream->Finish(); + EXPECT_EQ(response.message(), "hellohello"); + EXPECT_TRUE(s.ok()); +} + TEST_P(End2endTest, ResponseStream) { ResetStub(); EchoRequest request; @@ -738,6 +769,27 @@ TEST_P(End2endTest, ResponseStream) { EXPECT_TRUE(s.ok()); } +TEST_P(End2endTest, ResponseStreamWithCoalescingApi) { + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + request.set_message("hello"); + context.AddMetadata(kServerUseCoalescingApi, "1"); + + auto stream = stub_->ResponseStream(&context, request); + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.message(), request.message() + "0"); + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.message(), request.message() + "1"); + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.message(), request.message() + "2"); + EXPECT_FALSE(stream->Read(&response)); + + Status s = stream->Finish(); + EXPECT_TRUE(s.ok()); +} + TEST_P(End2endTest, BidiStream) { ResetStub(); EchoRequest request; @@ -770,6 +822,39 @@ TEST_P(End2endTest, BidiStream) { EXPECT_TRUE(s.ok()); } +TEST_P(End2endTest, BidiStreamWithCoalescingApi) { + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + context.AddMetadata(kServerFinishAfterNReads, "3"); + context.set_initial_metadata_corked(true); + grpc::string msg("hello"); + + auto stream = stub_->BidiStream(&context); + + request.set_message(msg + "0"); + EXPECT_TRUE(stream->Write(request)); + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.message(), request.message()); + + request.set_message(msg + "1"); + EXPECT_TRUE(stream->Write(request)); + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.message(), request.message()); + + request.set_message(msg + "2"); + stream->WriteLast(request, WriteOptions()); + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.message(), request.message()); + + EXPECT_FALSE(stream->Read(&response)); + EXPECT_FALSE(stream->Read(&response)); + + Status s = stream->Finish(); + EXPECT_TRUE(s.ok()); +} + // Talk to the two services with the same name but different package names. // The two stubs are created on the same channel. TEST_P(End2endTest, DiffPackageServices) { diff --git a/test/cpp/end2end/mock_test.cc b/test/cpp/end2end/mock_test.cc index d6664da5a0f9c2a7fc3c6628170b567bc1dacaae..fdb2732e503d304cbb209905595238bee2fcc8ca 100644 --- a/test/cpp/end2end/mock_test.cc +++ b/test/cpp/end2end/mock_test.cc @@ -89,7 +89,7 @@ class MockClientReaderWriter<EchoRequest, EchoResponse> final return true; } - bool Write(const EchoRequest& msg, const WriteOptions& options) override { + bool Write(const EchoRequest& msg, WriteOptions options) override { gpr_log(GPR_INFO, "mock recv msg %s", msg.message().c_str()); last_message_ = msg.message(); return true; diff --git a/test/cpp/end2end/test_service_impl.cc b/test/cpp/end2end/test_service_impl.cc index 59d36e9cb56a51c44b2777867348cd84a59c2b47..11729c425cda646d799016ae057e4bc61ab5c487 100644 --- a/test/cpp/end2end/test_service_impl.cc +++ b/test/cpp/end2end/test_service_impl.cc @@ -246,6 +246,9 @@ Status TestServiceImpl::ResponseStream(ServerContext* context, int server_try_cancel = GetIntValueFromMetadata( kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); + int server_coalescing_api = GetIntValueFromMetadata( + kServerUseCoalescingApi, context->client_metadata(), 0); + if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { ServerTryCancel(context); return Status::CANCELLED; @@ -260,7 +263,11 @@ Status TestServiceImpl::ResponseStream(ServerContext* context, for (int i = 0; i < kNumResponseStreamsMsgs; i++) { response.set_message(request->message() + grpc::to_string(i)); - writer->Write(response); + if (i == kNumResponseStreamsMsgs - 1 && server_coalescing_api != 0) { + writer->WriteLast(response, WriteOptions()); + } else { + writer->Write(response); + } } if (server_try_cancel_thd != nullptr) { @@ -305,10 +312,21 @@ Status TestServiceImpl::BidiStream( new std::thread(&TestServiceImpl::ServerTryCancel, this, context); } + // kServerFinishAfterNReads suggests after how many reads, the server should + // write the last message and send status (coalesced using WriteLast) + int server_write_last = GetIntValueFromMetadata( + kServerFinishAfterNReads, context->client_metadata(), 0); + + int read_counts = 0; while (stream->Read(&request)) { + read_counts++; gpr_log(GPR_INFO, "recv msg %s", request.message().c_str()); response.set_message(request.message()); - stream->Write(response); + if (read_counts == server_write_last) { + stream->WriteLast(response, WriteOptions()); + } else { + stream->Write(response); + } } if (server_try_cancel_thd != nullptr) { diff --git a/test/cpp/end2end/test_service_impl.h b/test/cpp/end2end/test_service_impl.h index 88e0be7bca77369f78c5a2db25e9b241b8e26492..b1f02f93f658ee7f9cfb0d13318b23f35b28ec18 100644 --- a/test/cpp/end2end/test_service_impl.h +++ b/test/cpp/end2end/test_service_impl.h @@ -48,6 +48,8 @@ const int kNumResponseStreamsMsgs = 3; const char* const kServerCancelAfterReads = "cancel_after_reads"; const char* const kServerTryCancelRequest = "server_try_cancel"; const char* const kDebugInfoTrailerKey = "debug-info-bin"; +const char* const kServerFinishAfterNReads = "server_finish_after_n_reads"; +const char* const kServerUseCoalescingApi = "server_use_coalescing_api"; typedef enum { DO_NOT_CANCEL = 0, diff --git a/test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc b/test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc index 00e37f7912137e3442a46ab0d185842b4018f4e3..42a5381e273c6251f55a2dfd341476208b129eca 100644 --- a/test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc +++ b/test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc @@ -240,6 +240,173 @@ static void BM_StreamingPingPongMsgs(benchmark::State& state) { state.SetBytesProcessed(msg_size * state.iterations() * 2); } +// Repeatedly makes Streaming Bidi calls (exchanging a configurable number of +// messages in each call) in a loop on a single channel. Different from +// BM_StreamingPingPong we are using stream coalescing api, e.g. WriteLast, +// WriteAndFinish, set_initial_metadata_corked. These apis aim at saving +// sendmsg syscalls for streaming by coalescing 1. initial metadata with first +// message; 2. final streaming message with trailing metadata. +// +// First parmeter (i.e state.range(0)): Message size (in bytes) to use +// Second parameter (i.e state.range(1)): Number of ping pong messages. +// Note: One ping-pong means two messages (one from client to server and +// the other from server to client): +// Third parameter (i.e state.range(2)): Switch between using WriteAndFinish +// API and WriteLast API for server. +template <class Fixture, class ClientContextMutator, class ServerContextMutator> +static void BM_StreamingPingPongWithCoalescingApi(benchmark::State& state) { + const int msg_size = state.range(0); + const int max_ping_pongs = state.range(1); + // This options is used to test out server API: WriteLast and WriteAndFinish + // respectively, since we can not use both of them on server side at the same + // time. Value 1 means we are testing out the WriteAndFinish API, and + // otherwise we are testing out the WriteLast API. + const int write_and_finish = state.range(2); + + EchoTestService::AsyncService service; + std::unique_ptr<Fixture> fixture(new Fixture(&service)); + { + EchoResponse send_response; + EchoResponse recv_response; + EchoRequest send_request; + EchoRequest recv_request; + + if (msg_size > 0) { + send_request.set_message(std::string(msg_size, 'a')); + send_response.set_message(std::string(msg_size, 'b')); + } + + std::unique_ptr<EchoTestService::Stub> stub( + EchoTestService::NewStub(fixture->channel())); + + while (state.KeepRunning()) { + ServerContext svr_ctx; + ServerContextMutator svr_ctx_mut(&svr_ctx); + ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx); + service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(), + fixture->cq(), tag(0)); + + ClientContext cli_ctx; + ClientContextMutator cli_ctx_mut(&cli_ctx); + cli_ctx.set_initial_metadata_corked(true); + // tag:1 here will never comes up, since we are not performing any op due + // to initial metadata coalescing. + auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1)); + + void* t; + bool ok; + int need_tags; + + // Send 'max_ping_pongs' number of ping pong messages + int ping_pong_cnt = 0; + while (ping_pong_cnt < max_ping_pongs) { + if (ping_pong_cnt == max_ping_pongs - 1) { + request_rw->WriteLast(send_request, WriteOptions(), tag(2)); + } else { + request_rw->Write(send_request, tag(2)); // Start client send + } + + need_tags = (1 << 2) | (1 << 3) | (1 << 4) | (1 << 5); + + if (ping_pong_cnt == 0) { + // wait for the server call structure (call_hook, etc.) to be + // initialized (async stream between client side and server side + // established). It is necessary when client init metadata is + // coalesced + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + while ((int)(intptr_t)t != 0) { + // In some cases tag:2 comes before tag:0 (write tag comes out + // first), this while loop is to make sure get tag:0. + int i = (int)(intptr_t)t; + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + } + } + + response_rw.Read(&recv_request, tag(3)); // Start server recv + request_rw->Read(&recv_response, tag(4)); // Start client recv + + while (need_tags) { + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + GPR_ASSERT(ok); + int i = (int)(intptr_t)t; + + // If server recv is complete, start the server send operation + if (i == 3) { + if (ping_pong_cnt == max_ping_pongs - 1) { + if (write_and_finish == 1) { + response_rw.WriteAndFinish(send_response, WriteOptions(), + Status::OK, tag(5)); + } else { + response_rw.WriteLast(send_response, WriteOptions(), tag(5)); + // WriteLast buffers the write, so neither server write op nor + // client read op will finish inside the while loop. + need_tags &= ~(1 << 4); + need_tags &= ~(1 << 5); + } + } else { + response_rw.Write(send_response, tag(5)); + } + } + + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); + } + + ping_pong_cnt++; + } + + if (max_ping_pongs == 0) { + need_tags = (1 << 6) | (1 << 7) | (1 << 8); + } else { + if (write_and_finish == 1) { + need_tags = (1 << 8); + } else { + // server's buffered write and the client's read of the buffered write + // tags should come up. + need_tags = (1 << 4) | (1 << 5) | (1 << 7) | (1 << 8); + } + } + + // No message write or initial metadata write happened yet. + if (max_ping_pongs == 0) { + request_rw->WritesDone(tag(6)); + // wait for server call data structure(call_hook, etc.) to be + // initialized, since initial metadata is corked. + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + while ((int)(intptr_t)t != 0) { + int i = (int)(intptr_t)t; + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + } + response_rw.Finish(Status::OK, tag(7)); + } else { + if (write_and_finish != 1) { + response_rw.Finish(Status::OK, tag(7)); + } + } + + Status recv_status; + request_rw->Finish(&recv_status, tag(8)); + + while (need_tags) { + GPR_ASSERT(fixture->cq()->Next(&t, &ok)); + int i = (int)(intptr_t)t; + GPR_ASSERT(need_tags & (1 << i)); + need_tags &= ~(1 << i); + } + + GPR_ASSERT(recv_status.ok()); + } + } + + fixture->Finish(state); + fixture.reset(); + state.SetBytesProcessed(msg_size * state.iterations() * max_ping_pongs * 2); +} + /******************************************************************************* * CONFIGURATIONS */ @@ -270,6 +437,30 @@ BENCHMARK_TEMPLATE(BM_StreamingPingPongMsgs, InProcessCHTTP2, NoOpMutator, BENCHMARK_TEMPLATE(BM_StreamingPingPongMsgs, TCP, NoOpMutator, NoOpMutator) ->Range(0, 128 * 1024 * 1024); +// Generate Args for StreamingPingPongWithCoalescingApi benchmarks. Currently +// generates args for only "small streams" (i.e streams with 0, 1 or 2 messages) +static void StreamingPingPongWithCoalescingApiArgs( + benchmark::internal::Benchmark* b) { + int msg_size = 0; + + b->Args( + {0, 0, 0}); // spl case: 0 ping-pong msgs (msg_size doesn't matter here) + b->Args( + {0, 0, 1}); // spl case: 0 ping-pong msgs (msg_size doesn't matter here) + + for (msg_size = 0; msg_size <= 128 * 1024 * 1024; + msg_size == 0 ? msg_size++ : msg_size *= 8) { + b->Args({msg_size, 1, 0}); + b->Args({msg_size, 2, 0}); + b->Args({msg_size, 1, 1}); + b->Args({msg_size, 2, 1}); + } +} + +BENCHMARK_TEMPLATE(BM_StreamingPingPongWithCoalescingApi, InProcessCHTTP2, + NoOpMutator, NoOpMutator) + ->Apply(StreamingPingPongWithCoalescingApiArgs); + } // namespace testing } // namespace grpc