diff --git a/include/grpc++/impl/call.h b/include/grpc++/impl/call.h index 141b16ab5bfc5ea2f8e5dbc558da613345ca9ccd..40939e458fae728a3d1b865cf70ee6843e1190f2 100644 --- a/include/grpc++/impl/call.h +++ b/include/grpc++/impl/call.h @@ -63,11 +63,15 @@ class CallOpBuffer final : public CompletionQueueTag { // Does not take ownership. void AddSendInitialMetadata( std::multimap<grpc::string, grpc::string> *metadata); + void AddRecvInitialMetadata( + std::multimap<grpc::string, grpc::string> *metadata); void AddSendMessage(const google::protobuf::Message &message); void AddRecvMessage(google::protobuf::Message *message); void AddClientSendClose(); - void AddClientRecvStatus(Status *status); - void AddServerSendStatus(std::multimap<grpc::string, grpc::string> *metadata, const Status& status); + void AddClientRecvStatus(std::multimap<grpc::string, grpc::string> *metadata, + Status *status); + void AddServerSendStatus(std::multimap<grpc::string, grpc::string> *metadata, + const Status& status); // INTERNAL API: @@ -79,17 +83,28 @@ class CallOpBuffer final : public CompletionQueueTag { private: void *return_tag_ = nullptr; + // Send initial metadata size_t initial_metadata_count_ = 0; grpc_metadata* initial_metadata_ = nullptr; + // Recv initial metadta + std::multimap<grpc::string, grpc::string>* recv_initial_metadata_ = nullptr; + grpc_metadata_array recv_initial_metadata_arr_ = {0, 0, nullptr}; + // Send message const google::protobuf::Message* send_message_ = nullptr; grpc_byte_buffer* send_message_buf_ = nullptr; + // Recv message google::protobuf::Message* recv_message_ = nullptr; grpc_byte_buffer* recv_message_buf_ = nullptr; + // Client send close bool client_send_close_ = false; + // Client recv status + std::multimap<grpc::string, grpc::string>* recv_trailing_metadata_ = nullptr; Status* recv_status_ = nullptr; + grpc_metadata_array recv_trailing_metadata_arr_ = {0, 0, nullptr}; grpc_status_code status_code_ = GRPC_STATUS_OK; char* status_details_ = nullptr; size_t status_details_capacity_ = 0; + // Server send status Status* send_status_ = nullptr; size_t trailing_metadata_count_ = 0; grpc_metadata* trailing_metadata_ = nullptr; diff --git a/src/cpp/common/call.cc b/src/cpp/common/call.cc index b2cd55fe2459915dbb08e18be6ae21b6dfc91709..22fad2f4398a8c438a0ab492461f46ed1c95c4ab 100644 --- a/src/cpp/common/call.cc +++ b/src/cpp/common/call.cc @@ -41,34 +41,47 @@ namespace grpc { void CallOpBuffer::Reset(void* next_return_tag) { return_tag_ = next_return_tag; + initial_metadata_count_ = 0; - if (initial_metadata_) { - gpr_free(initial_metadata_); - } + gpr_free(initial_metadata_); + + recv_initial_metadata_ = nullptr; + gpr_free(recv_initial_metadata_arr_.metadata); + recv_initial_metadata_arr_ = {0, 0, nullptr}; + send_message_ = nullptr; if (send_message_buf_) { grpc_byte_buffer_destroy(send_message_buf_); send_message_buf_ = nullptr; } + recv_message_ = nullptr; if (recv_message_buf_) { grpc_byte_buffer_destroy(recv_message_buf_); recv_message_buf_ = nullptr; } + client_send_close_ = false; + + recv_trailing_metadata_ = nullptr; recv_status_ = nullptr; + gpr_free(recv_trailing_metadata_arr_.metadata); + recv_trailing_metadata_arr_ = {0, 0, nullptr}; + status_code_ = GRPC_STATUS_OK; - if (status_details_) { - gpr_free(status_details_); - status_details_ = nullptr; - } + gpr_free(status_details_); + status_details_ = nullptr; status_details_capacity_ = 0; + + send_status_ = nullptr; + trailing_metadata_count_ = 0; + trailing_metadata_ = nullptr; } namespace { // TODO(yangg) if the map is changed before we send, the pointers will be a // mess. Make sure it does not happen. -grpc_metadata* FillMetadata( +grpc_metadata* FillMetadataArray( std::multimap<grpc::string, grpc::string>* metadata) { if (metadata->empty()) { return nullptr; } grpc_metadata* metadata_array = (grpc_metadata*)gpr_malloc( @@ -83,6 +96,17 @@ grpc_metadata* FillMetadata( } return metadata_array; } + +void FillMetadataMap(grpc_metadata_array* arr, + std::multimap<grpc::string, grpc::string>* metadata) { + for (size_t i = 0; i < arr->count; i++) { + // TODO(yangg) handle duplicates? + metadata->insert(std::pair<grpc::string, grpc::string>( + arr->metadata[i].key, {arr->metadata[i].value, arr->metadata[i].value_length})); + } + grpc_metadata_array_destroy(arr); + grpc_metadata_array_init(&recv_trailing_metadata_arr_); +} } // namespace void CallOpBuffer::AddSendInitialMetadata( @@ -91,6 +115,11 @@ void CallOpBuffer::AddSendInitialMetadata( initial_metadata_ = FillMetadata(metadata); } +void CallOpBuffer::AddRecvInitialMetadata( + std::multimap<grpc::string, grpc::string>* metadata) { + recv_initial_metadata_ = metadata; +} + void CallOpBuffer::AddSendMessage(const google::protobuf::Message& message) { send_message_ = &message; } @@ -103,13 +132,17 @@ void CallOpBuffer::AddClientSendClose() { client_send_close_ = true; } -void CallOpBuffer::AddClientRecvStatus(Status *status) { +void CallOpBuffer::AddClientRecvStatus( + std::multimap<grpc::string, grpc::string>* metadata, Status *status) { + recv_trailing_metadata_ = metadata; recv_status_ = status; } -void CallOpBuffer::AddServerSendStatus(std::multimap<grpc::string, grpc::string>* metadata, - const Status& status) { - +void CallOpBuffer::AddServerSendStatus( + std::multimap<grpc::string, grpc::string>* metadata, const Status& status) { + trailing_metadata_count_ = metadata->size(); + trailing_metadata_ = FillMetadata(metadata); + send_status_ = &status; } void CallOpBuffer::FillOps(grpc_op *ops, size_t *nops) { @@ -120,6 +153,11 @@ void CallOpBuffer::FillOps(grpc_op *ops, size_t *nops) { ops[*nops].data.send_initial_metadata.metadata = initial_metadata_; (*nops)++; } + if (recv_initial_metadata_) { + ops[*nops].op = GRPC_OP_RECV_INITIAL_METADATA; + ops[*nops].data.recv_initial_metadata = &recv_initial_metadata_arr_; + (*nops)++; + } if (send_message_) { bool success = SerializeProto(*send_message_, &send_message_buf_); if (!success) { @@ -140,10 +178,24 @@ void CallOpBuffer::FillOps(grpc_op *ops, size_t *nops) { } if (recv_status_) { ops[*nops].op = GRPC_OP_RECV_STATUS_ON_CLIENT; - // TODO ops[*nops].data.recv_status_on_client.trailing_metadata = + ops[*nops].data.recv_status_on_client.trailing_metadata = + &recv_trailing_metadata_arr_; ops[*nops].data.recv_status_on_client.status = &status_code_; ops[*nops].data.recv_status_on_client.status_details = &status_details_; - ops[*nops].data.recv_status_on_client.status_details_capacity = &status_details_capacity_; + ops[*nops].data.recv_status_on_client.status_details_capacity = + &status_details_capacity_; + (*nops)++; + } + if (send_status_) { + ops[*nops].op = GRPC_OP_SEND_STATUS_FROM_SERVER; + ops[*nops].data.send_status_from_server.trailing_metadata_count = + trailing_metadata_count_; + ops[*nops].data.send_status_from_server.trailing_metadata = + trailing_metadata_; + ops[*nops].data.send_status_from_server.status = + static_cast<grpc_status_code>(send_status_->code()); + ops[*nops].data.send_status_from_server.status_details = + send_status_->details().c_str(); (*nops)++; } } @@ -158,8 +210,16 @@ void CallOpBuffer::FinalizeResult(void **tag, bool *status) { gpr_free(initial_metadata_); initial_metadata_ = nullptr; } + if (trailing_metadata_count_) { + gpr_free(trailing_metadata_); + trailing_metadata_ = nullptr; + } // Set user-facing tag. *tag = return_tag_; + // Process received initial metadata + if (recv_initial_metadata_) { + FillMetadataMap(&recv_initial_metadata_, recv_initial_metadata_); + } // Parse received message if any. if (recv_message_ && recv_message_buf_) { *status = DeserializeProto(recv_message_buf_, recv_message_); @@ -168,6 +228,7 @@ void CallOpBuffer::FinalizeResult(void **tag, bool *status) { } // Parse received status. if (recv_status_) { + FillMetadataMap(&recv_trailing_metadata_arr_, recv_trailing_metadata_); *recv_status_ = Status( static_cast<StatusCode>(status_code_), status_details_ ? grpc::string(status_details_, status_details_capacity_)