Skip to content
Snippets Groups Projects
Commit 968ca530 authored by Yang Gao's avatar Yang Gao
Browse files

Add trailing metadata to client context and use it.

parent dcf9c0e5
No related branches found
No related tags found
No related merge requests found
...@@ -50,6 +50,12 @@ struct grpc_completion_queue; ...@@ -50,6 +50,12 @@ struct grpc_completion_queue;
namespace grpc { namespace grpc {
class CallOpBuffer; class CallOpBuffer;
template <class R> class ClientReader;
template <class W> class ClientWriter;
template <class R, class W> class ClientReaderWriter;
template <class R> class ServerReader;
template <class W> class ServerWriter;
template <class R, class W> class ServerReaderWriter;
class ClientContext { class ClientContext {
public: public:
...@@ -71,7 +77,12 @@ class ClientContext { ...@@ -71,7 +77,12 @@ class ClientContext {
friend class CallOpBuffer; friend class CallOpBuffer;
friend class Channel; friend class Channel;
friend class StreamContext; template <class R> friend class ::grpc::ClientReader;
template <class W> friend class ::grpc::ClientWriter;
template <class R, class W> friend class ::grpc::ClientReaderWriter;
template <class R> friend class ::grpc::ServerReader;
template <class W> friend class ::grpc::ServerWriter;
template <class R, class W> friend class ::grpc::ServerReaderWriter;
grpc_call *call() { return call_; } grpc_call *call() { return call_; }
void set_call(grpc_call *call) { void set_call(grpc_call *call) {
...@@ -87,7 +98,9 @@ class ClientContext { ...@@ -87,7 +98,9 @@ class ClientContext {
grpc_call *call_; grpc_call *call_;
grpc_completion_queue *cq_; grpc_completion_queue *cq_;
gpr_timespec absolute_deadline_; gpr_timespec absolute_deadline_;
std::multimap<grpc::string, grpc::string> metadata_; std::multimap<grpc::string, grpc::string> send_initial_metadata_;
std::multimap<grpc::string, grpc::string> recv_initial_metadata_;
std::multimap<grpc::string, grpc::string> trailing_metadata_;
}; };
} // namespace grpc } // namespace grpc
......
...@@ -35,6 +35,7 @@ ...@@ -35,6 +35,7 @@
#define __GRPCPP_STREAM_H__ #define __GRPCPP_STREAM_H__
#include <grpc++/channel_interface.h> #include <grpc++/channel_interface.h>
#include <grpc++/client_context.h>
#include <grpc++/completion_queue.h> #include <grpc++/completion_queue.h>
#include <grpc++/impl/call.h> #include <grpc++/impl/call.h>
#include <grpc++/status.h> #include <grpc++/status.h>
...@@ -87,7 +88,7 @@ class ClientReader final : public ClientStreamingInterface, ...@@ -87,7 +88,7 @@ class ClientReader final : public ClientStreamingInterface,
ClientReader(ChannelInterface *channel, const RpcMethod &method, ClientReader(ChannelInterface *channel, const RpcMethod &method,
ClientContext *context, ClientContext *context,
const google::protobuf::Message &request) const google::protobuf::Message &request)
: call_(channel->CreateCall(method, context, &cq_)) { : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
CallOpBuffer buf; CallOpBuffer buf;
buf.AddSendMessage(request); buf.AddSendMessage(request);
buf.AddClientSendClose(); buf.AddClientSendClose();
...@@ -105,13 +106,14 @@ class ClientReader final : public ClientStreamingInterface, ...@@ -105,13 +106,14 @@ class ClientReader final : public ClientStreamingInterface,
virtual Status Finish() override { virtual Status Finish() override {
CallOpBuffer buf; CallOpBuffer buf;
Status status; Status status;
buf.AddClientRecvStatus(nullptr, &status); // TODO metadata buf.AddClientRecvStatus(&context_->trailing_metadata_, &status);
call_.PerformOps(&buf); call_.PerformOps(&buf);
GPR_ASSERT(cq_.Pluck(&buf)); GPR_ASSERT(cq_.Pluck(&buf));
return status; return status;
} }
private: private:
ClientContext* context_;
CompletionQueue cq_; CompletionQueue cq_;
Call call_; Call call_;
}; };
...@@ -124,7 +126,7 @@ class ClientWriter final : public ClientStreamingInterface, ...@@ -124,7 +126,7 @@ class ClientWriter final : public ClientStreamingInterface,
ClientWriter(ChannelInterface *channel, const RpcMethod &method, ClientWriter(ChannelInterface *channel, const RpcMethod &method,
ClientContext *context, ClientContext *context,
google::protobuf::Message *response) google::protobuf::Message *response)
: response_(response), : context_(context), response_(response),
call_(channel->CreateCall(method, context, &cq_)) {} call_(channel->CreateCall(method, context, &cq_)) {}
virtual bool Write(const W& msg) override { virtual bool Write(const W& msg) override {
...@@ -146,13 +148,14 @@ class ClientWriter final : public ClientStreamingInterface, ...@@ -146,13 +148,14 @@ class ClientWriter final : public ClientStreamingInterface,
CallOpBuffer buf; CallOpBuffer buf;
Status status; Status status;
buf.AddRecvMessage(response_); buf.AddRecvMessage(response_);
buf.AddClientRecvStatus(nullptr, &status); // TODO metadata buf.AddClientRecvStatus(&context_->trailing_metadata_, &status);
call_.PerformOps(&buf); call_.PerformOps(&buf);
GPR_ASSERT(cq_.Pluck(&buf)); GPR_ASSERT(cq_.Pluck(&buf));
return status; return status;
} }
private: private:
ClientContext* context_;
google::protobuf::Message *const response_; google::protobuf::Message *const response_;
CompletionQueue cq_; CompletionQueue cq_;
Call call_; Call call_;
...@@ -167,7 +170,7 @@ class ClientReaderWriter final : public ClientStreamingInterface, ...@@ -167,7 +170,7 @@ class ClientReaderWriter final : public ClientStreamingInterface,
// Blocking create a stream. // Blocking create a stream.
ClientReaderWriter(ChannelInterface *channel, ClientReaderWriter(ChannelInterface *channel,
const RpcMethod &method, ClientContext *context) const RpcMethod &method, ClientContext *context)
: call_(channel->CreateCall(method, context, &cq_)) {} : context_(context), call_(channel->CreateCall(method, context, &cq_)) {}
virtual bool Read(R *msg) override { virtual bool Read(R *msg) override {
CallOpBuffer buf; CallOpBuffer buf;
...@@ -193,13 +196,14 @@ class ClientReaderWriter final : public ClientStreamingInterface, ...@@ -193,13 +196,14 @@ class ClientReaderWriter final : public ClientStreamingInterface,
virtual Status Finish() override { virtual Status Finish() override {
CallOpBuffer buf; CallOpBuffer buf;
Status status; Status status;
buf.AddClientRecvStatus(nullptr, &status); // TODO metadata buf.AddClientRecvStatus(&context_->trailing_metadata_, &status);
call_.PerformOps(&buf); call_.PerformOps(&buf);
GPR_ASSERT(cq_.Pluck(&buf)); GPR_ASSERT(cq_.Pluck(&buf));
return status; return status;
} }
private: private:
ClientContext* context_;
CompletionQueue cq_; CompletionQueue cq_;
Call call_; Call call_;
}; };
......
...@@ -72,7 +72,7 @@ system_clock::time_point ClientContext::absolute_deadline() { ...@@ -72,7 +72,7 @@ system_clock::time_point ClientContext::absolute_deadline() {
void ClientContext::AddMetadata(const grpc::string &meta_key, void ClientContext::AddMetadata(const grpc::string &meta_key,
const grpc::string &meta_value) { const grpc::string &meta_value) {
metadata_.insert(std::make_pair(meta_key, meta_value)); send_initial_metadata_.insert(std::make_pair(meta_key, meta_value));
} }
void ClientContext::StartCancel() {} void ClientContext::StartCancel() {}
......
...@@ -119,7 +119,7 @@ void CallOpBuffer::AddSendInitialMetadata( ...@@ -119,7 +119,7 @@ void CallOpBuffer::AddSendInitialMetadata(
} }
void CallOpBuffer::AddSendInitialMetadata(ClientContext *ctx) { void CallOpBuffer::AddSendInitialMetadata(ClientContext *ctx) {
AddSendInitialMetadata(&ctx->metadata_); AddSendInitialMetadata(&ctx->send_initial_metadata_);
} }
void CallOpBuffer::AddSendMessage(const google::protobuf::Message& message) { void CallOpBuffer::AddSendMessage(const google::protobuf::Message& message) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment