Skip to content
Snippets Groups Projects
Commit f1258c49 authored by Yang Gao's avatar Yang Gao
Browse files

save before the change

parent 06ed31e9
No related branches found
No related tags found
No related merge requests found
...@@ -63,11 +63,15 @@ class CallOpBuffer final : public CompletionQueueTag { ...@@ -63,11 +63,15 @@ class CallOpBuffer final : public CompletionQueueTag {
// Does not take ownership. // Does not take ownership.
void AddSendInitialMetadata( void AddSendInitialMetadata(
std::multimap<grpc::string, grpc::string> *metadata); std::multimap<grpc::string, grpc::string> *metadata);
void AddRecvInitialMetadata(
std::multimap<grpc::string, grpc::string> *metadata);
void AddSendMessage(const google::protobuf::Message &message); void AddSendMessage(const google::protobuf::Message &message);
void AddRecvMessage(google::protobuf::Message *message); void AddRecvMessage(google::protobuf::Message *message);
void AddClientSendClose(); void AddClientSendClose();
void AddClientRecvStatus(Status *status); void AddClientRecvStatus(std::multimap<grpc::string, grpc::string> *metadata,
void AddServerSendStatus(std::multimap<grpc::string, grpc::string> *metadata, const Status& status); Status *status);
void AddServerSendStatus(std::multimap<grpc::string, grpc::string> *metadata,
const Status& status);
// INTERNAL API: // INTERNAL API:
...@@ -79,17 +83,28 @@ class CallOpBuffer final : public CompletionQueueTag { ...@@ -79,17 +83,28 @@ class CallOpBuffer final : public CompletionQueueTag {
private: private:
void *return_tag_ = nullptr; void *return_tag_ = nullptr;
// Send initial metadata
size_t initial_metadata_count_ = 0; size_t initial_metadata_count_ = 0;
grpc_metadata* initial_metadata_ = nullptr; grpc_metadata* initial_metadata_ = nullptr;
// Recv initial metadta
std::multimap<grpc::string, grpc::string>* recv_initial_metadata_ = nullptr;
grpc_metadata_array recv_initial_metadata_arr_ = {0, 0, nullptr};
// Send message
const google::protobuf::Message* send_message_ = nullptr; const google::protobuf::Message* send_message_ = nullptr;
grpc_byte_buffer* send_message_buf_ = nullptr; grpc_byte_buffer* send_message_buf_ = nullptr;
// Recv message
google::protobuf::Message* recv_message_ = nullptr; google::protobuf::Message* recv_message_ = nullptr;
grpc_byte_buffer* recv_message_buf_ = nullptr; grpc_byte_buffer* recv_message_buf_ = nullptr;
// Client send close
bool client_send_close_ = false; bool client_send_close_ = false;
// Client recv status
std::multimap<grpc::string, grpc::string>* recv_trailing_metadata_ = nullptr;
Status* recv_status_ = nullptr; Status* recv_status_ = nullptr;
grpc_metadata_array recv_trailing_metadata_arr_ = {0, 0, nullptr};
grpc_status_code status_code_ = GRPC_STATUS_OK; grpc_status_code status_code_ = GRPC_STATUS_OK;
char* status_details_ = nullptr; char* status_details_ = nullptr;
size_t status_details_capacity_ = 0; size_t status_details_capacity_ = 0;
// Server send status
Status* send_status_ = nullptr; Status* send_status_ = nullptr;
size_t trailing_metadata_count_ = 0; size_t trailing_metadata_count_ = 0;
grpc_metadata* trailing_metadata_ = nullptr; grpc_metadata* trailing_metadata_ = nullptr;
......
...@@ -41,34 +41,47 @@ namespace grpc { ...@@ -41,34 +41,47 @@ namespace grpc {
void CallOpBuffer::Reset(void* next_return_tag) { void CallOpBuffer::Reset(void* next_return_tag) {
return_tag_ = next_return_tag; return_tag_ = next_return_tag;
initial_metadata_count_ = 0; initial_metadata_count_ = 0;
if (initial_metadata_) { gpr_free(initial_metadata_);
gpr_free(initial_metadata_);
} recv_initial_metadata_ = nullptr;
gpr_free(recv_initial_metadata_arr_.metadata);
recv_initial_metadata_arr_ = {0, 0, nullptr};
send_message_ = nullptr; send_message_ = nullptr;
if (send_message_buf_) { if (send_message_buf_) {
grpc_byte_buffer_destroy(send_message_buf_); grpc_byte_buffer_destroy(send_message_buf_);
send_message_buf_ = nullptr; send_message_buf_ = nullptr;
} }
recv_message_ = nullptr; recv_message_ = nullptr;
if (recv_message_buf_) { if (recv_message_buf_) {
grpc_byte_buffer_destroy(recv_message_buf_); grpc_byte_buffer_destroy(recv_message_buf_);
recv_message_buf_ = nullptr; recv_message_buf_ = nullptr;
} }
client_send_close_ = false; client_send_close_ = false;
recv_trailing_metadata_ = nullptr;
recv_status_ = nullptr; recv_status_ = nullptr;
gpr_free(recv_trailing_metadata_arr_.metadata);
recv_trailing_metadata_arr_ = {0, 0, nullptr};
status_code_ = GRPC_STATUS_OK; status_code_ = GRPC_STATUS_OK;
if (status_details_) { gpr_free(status_details_);
gpr_free(status_details_); status_details_ = nullptr;
status_details_ = nullptr;
}
status_details_capacity_ = 0; status_details_capacity_ = 0;
send_status_ = nullptr;
trailing_metadata_count_ = 0;
trailing_metadata_ = nullptr;
} }
namespace { namespace {
// TODO(yangg) if the map is changed before we send, the pointers will be a // TODO(yangg) if the map is changed before we send, the pointers will be a
// mess. Make sure it does not happen. // mess. Make sure it does not happen.
grpc_metadata* FillMetadata( grpc_metadata* FillMetadataArray(
std::multimap<grpc::string, grpc::string>* metadata) { std::multimap<grpc::string, grpc::string>* metadata) {
if (metadata->empty()) { return nullptr; } if (metadata->empty()) { return nullptr; }
grpc_metadata* metadata_array = (grpc_metadata*)gpr_malloc( grpc_metadata* metadata_array = (grpc_metadata*)gpr_malloc(
...@@ -83,6 +96,17 @@ grpc_metadata* FillMetadata( ...@@ -83,6 +96,17 @@ grpc_metadata* FillMetadata(
} }
return metadata_array; return metadata_array;
} }
void FillMetadataMap(grpc_metadata_array* arr,
std::multimap<grpc::string, grpc::string>* metadata) {
for (size_t i = 0; i < arr->count; i++) {
// TODO(yangg) handle duplicates?
metadata->insert(std::pair<grpc::string, grpc::string>(
arr->metadata[i].key, {arr->metadata[i].value, arr->metadata[i].value_length}));
}
grpc_metadata_array_destroy(arr);
grpc_metadata_array_init(&recv_trailing_metadata_arr_);
}
} // namespace } // namespace
void CallOpBuffer::AddSendInitialMetadata( void CallOpBuffer::AddSendInitialMetadata(
...@@ -91,6 +115,11 @@ void CallOpBuffer::AddSendInitialMetadata( ...@@ -91,6 +115,11 @@ void CallOpBuffer::AddSendInitialMetadata(
initial_metadata_ = FillMetadata(metadata); initial_metadata_ = FillMetadata(metadata);
} }
void CallOpBuffer::AddRecvInitialMetadata(
std::multimap<grpc::string, grpc::string>* metadata) {
recv_initial_metadata_ = metadata;
}
void CallOpBuffer::AddSendMessage(const google::protobuf::Message& message) { void CallOpBuffer::AddSendMessage(const google::protobuf::Message& message) {
send_message_ = &message; send_message_ = &message;
} }
...@@ -103,13 +132,17 @@ void CallOpBuffer::AddClientSendClose() { ...@@ -103,13 +132,17 @@ void CallOpBuffer::AddClientSendClose() {
client_send_close_ = true; client_send_close_ = true;
} }
void CallOpBuffer::AddClientRecvStatus(Status *status) { void CallOpBuffer::AddClientRecvStatus(
std::multimap<grpc::string, grpc::string>* metadata, Status *status) {
recv_trailing_metadata_ = metadata;
recv_status_ = status; recv_status_ = status;
} }
void CallOpBuffer::AddServerSendStatus(std::multimap<grpc::string, grpc::string>* metadata, void CallOpBuffer::AddServerSendStatus(
const Status& status) { std::multimap<grpc::string, grpc::string>* metadata, const Status& status) {
trailing_metadata_count_ = metadata->size();
trailing_metadata_ = FillMetadata(metadata);
send_status_ = &status;
} }
void CallOpBuffer::FillOps(grpc_op *ops, size_t *nops) { void CallOpBuffer::FillOps(grpc_op *ops, size_t *nops) {
...@@ -120,6 +153,11 @@ void CallOpBuffer::FillOps(grpc_op *ops, size_t *nops) { ...@@ -120,6 +153,11 @@ void CallOpBuffer::FillOps(grpc_op *ops, size_t *nops) {
ops[*nops].data.send_initial_metadata.metadata = initial_metadata_; ops[*nops].data.send_initial_metadata.metadata = initial_metadata_;
(*nops)++; (*nops)++;
} }
if (recv_initial_metadata_) {
ops[*nops].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[*nops].data.recv_initial_metadata = &recv_initial_metadata_arr_;
(*nops)++;
}
if (send_message_) { if (send_message_) {
bool success = SerializeProto(*send_message_, &send_message_buf_); bool success = SerializeProto(*send_message_, &send_message_buf_);
if (!success) { if (!success) {
...@@ -140,10 +178,24 @@ void CallOpBuffer::FillOps(grpc_op *ops, size_t *nops) { ...@@ -140,10 +178,24 @@ void CallOpBuffer::FillOps(grpc_op *ops, size_t *nops) {
} }
if (recv_status_) { if (recv_status_) {
ops[*nops].op = GRPC_OP_RECV_STATUS_ON_CLIENT; ops[*nops].op = GRPC_OP_RECV_STATUS_ON_CLIENT;
// TODO ops[*nops].data.recv_status_on_client.trailing_metadata = ops[*nops].data.recv_status_on_client.trailing_metadata =
&recv_trailing_metadata_arr_;
ops[*nops].data.recv_status_on_client.status = &status_code_; ops[*nops].data.recv_status_on_client.status = &status_code_;
ops[*nops].data.recv_status_on_client.status_details = &status_details_; ops[*nops].data.recv_status_on_client.status_details = &status_details_;
ops[*nops].data.recv_status_on_client.status_details_capacity = &status_details_capacity_; ops[*nops].data.recv_status_on_client.status_details_capacity =
&status_details_capacity_;
(*nops)++;
}
if (send_status_) {
ops[*nops].op = GRPC_OP_SEND_STATUS_FROM_SERVER;
ops[*nops].data.send_status_from_server.trailing_metadata_count =
trailing_metadata_count_;
ops[*nops].data.send_status_from_server.trailing_metadata =
trailing_metadata_;
ops[*nops].data.send_status_from_server.status =
static_cast<grpc_status_code>(send_status_->code());
ops[*nops].data.send_status_from_server.status_details =
send_status_->details().c_str();
(*nops)++; (*nops)++;
} }
} }
...@@ -158,8 +210,16 @@ void CallOpBuffer::FinalizeResult(void **tag, bool *status) { ...@@ -158,8 +210,16 @@ void CallOpBuffer::FinalizeResult(void **tag, bool *status) {
gpr_free(initial_metadata_); gpr_free(initial_metadata_);
initial_metadata_ = nullptr; initial_metadata_ = nullptr;
} }
if (trailing_metadata_count_) {
gpr_free(trailing_metadata_);
trailing_metadata_ = nullptr;
}
// Set user-facing tag. // Set user-facing tag.
*tag = return_tag_; *tag = return_tag_;
// Process received initial metadata
if (recv_initial_metadata_) {
FillMetadataMap(&recv_initial_metadata_, recv_initial_metadata_);
}
// Parse received message if any. // Parse received message if any.
if (recv_message_ && recv_message_buf_) { if (recv_message_ && recv_message_buf_) {
*status = DeserializeProto(recv_message_buf_, recv_message_); *status = DeserializeProto(recv_message_buf_, recv_message_);
...@@ -168,6 +228,7 @@ void CallOpBuffer::FinalizeResult(void **tag, bool *status) { ...@@ -168,6 +228,7 @@ void CallOpBuffer::FinalizeResult(void **tag, bool *status) {
} }
// Parse received status. // Parse received status.
if (recv_status_) { if (recv_status_) {
FillMetadataMap(&recv_trailing_metadata_arr_, recv_trailing_metadata_);
*recv_status_ = Status( *recv_status_ = Status(
static_cast<StatusCode>(status_code_), static_cast<StatusCode>(status_code_),
status_details_ ? grpc::string(status_details_, status_details_capacity_) status_details_ ? grpc::string(status_details_, status_details_capacity_)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment