Skip to content
Snippets Groups Projects
Commit eb8e7cd9 authored by Yang Gao's avatar Yang Gao
Browse files

Implement FinalizeResult

parent b84aab79
No related branches found
No related tags found
No related merge requests found
...@@ -75,14 +75,14 @@ class CallOpBuffer final : public CompletionQueueTag { ...@@ -75,14 +75,14 @@ class CallOpBuffer final : public CompletionQueueTag {
void FillOps(grpc_op *ops, size_t *nops); void FillOps(grpc_op *ops, size_t *nops);
// Called by completion queue just prior to returning from Next() or Pluck() // Called by completion queue just prior to returning from Next() or Pluck()
void FinalizeResult(void *tag, bool *status) override; void FinalizeResult(void **tag, bool *status) override;
private: private:
void *return_tag_ = nullptr; void *return_tag_ = nullptr;
size_t initial_metadata_count_ = 0; size_t initial_metadata_count_ = 0;
grpc_metadata* initial_metadata_ = nullptr; grpc_metadata* initial_metadata_ = nullptr;
const google::protobuf::Message* send_message_ = nullptr; const google::protobuf::Message* send_message_ = nullptr;
grpc_byte_buffer* write_buffer_ = nullptr; grpc_byte_buffer* send_message_buf_ = nullptr;
google::protobuf::Message* recv_message_ = nullptr; google::protobuf::Message* recv_message_ = nullptr;
grpc_byte_buffer* recv_message_buf_ = nullptr; grpc_byte_buffer* recv_message_buf_ = nullptr;
bool client_send_close_ = false; bool client_send_close_ = false;
...@@ -90,6 +90,9 @@ class CallOpBuffer final : public CompletionQueueTag { ...@@ -90,6 +90,9 @@ class CallOpBuffer final : public CompletionQueueTag {
grpc_status_code status_code_ = GRPC_STATUS_OK; grpc_status_code status_code_ = GRPC_STATUS_OK;
char* status_details_ = nullptr; char* status_details_ = nullptr;
size_t status_details_capacity_ = 0; size_t status_details_capacity_ = 0;
Status* send_status_ = nullptr;
size_t trailing_metadata_count_ = 0;
grpc_metadata* trailing_metadata_ = nullptr;
}; };
class CCallDeleter { class CCallDeleter {
......
...@@ -819,7 +819,7 @@ void grpc_server_add_listener(grpc_server *server, void *arg, ...@@ -819,7 +819,7 @@ void grpc_server_add_listener(grpc_server *server, void *arg,
static grpc_call_error queue_call_request(grpc_server *server, static grpc_call_error queue_call_request(grpc_server *server,
requested_call *rc) { requested_call *rc) {
call_data *calld; call_data *calld = NULL;
gpr_mu_lock(&server->mu); gpr_mu_lock(&server->mu);
if (server->shutdown) { if (server->shutdown) {
gpr_mu_unlock(&server->mu); gpr_mu_unlock(&server->mu);
...@@ -896,6 +896,9 @@ grpc_call_error grpc_server_request_call_old(grpc_server *server, ...@@ -896,6 +896,9 @@ grpc_call_error grpc_server_request_call_old(grpc_server *server,
static void publish_legacy(grpc_call *call, grpc_op_error status, void *tag); static void publish_legacy(grpc_call *call, grpc_op_error status, void *tag);
static void publish_registered_or_batch(grpc_call *call, grpc_op_error status, static void publish_registered_or_batch(grpc_call *call, grpc_op_error status,
void *tag); void *tag);
static void publish_was_not_set(grpc_call *call, grpc_op_error status, void *tag) {
abort();
}
static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) { static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) {
gpr_slice slice = value->slice; gpr_slice slice = value->slice;
...@@ -910,7 +913,7 @@ static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) { ...@@ -910,7 +913,7 @@ static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) {
static void begin_call(grpc_server *server, call_data *calld, static void begin_call(grpc_server *server, call_data *calld,
requested_call *rc) { requested_call *rc) {
grpc_ioreq_completion_func publish; grpc_ioreq_completion_func publish = publish_was_not_set;
grpc_ioreq req[2]; grpc_ioreq req[2];
grpc_ioreq *r = req; grpc_ioreq *r = req;
......
...@@ -46,9 +46,9 @@ void CallOpBuffer::Reset(void* next_return_tag) { ...@@ -46,9 +46,9 @@ void CallOpBuffer::Reset(void* next_return_tag) {
gpr_free(initial_metadata_); gpr_free(initial_metadata_);
} }
send_message_ = nullptr; send_message_ = nullptr;
if (write_buffer_) { if (send_message_buf_) {
grpc_byte_buffer_destroy(write_buffer_); grpc_byte_buffer_destroy(send_message_buf_);
write_buffer_ = nullptr; send_message_buf_ = nullptr;
} }
recv_message_ = nullptr; recv_message_ = nullptr;
if (recv_message_buf_) { if (recv_message_buf_) {
...@@ -107,6 +107,10 @@ void CallOpBuffer::AddClientRecvStatus(Status *status) { ...@@ -107,6 +107,10 @@ void CallOpBuffer::AddClientRecvStatus(Status *status) {
recv_status_ = status; recv_status_ = status;
} }
void CallOpBuffer::AddServerSendStatus(std::multimap<grpc::string, grpc::string>* metadata,
const Status& status) {
}
void CallOpBuffer::FillOps(grpc_op *ops, size_t *nops) { void CallOpBuffer::FillOps(grpc_op *ops, size_t *nops) {
*nops = 0; *nops = 0;
...@@ -117,12 +121,12 @@ void CallOpBuffer::FillOps(grpc_op *ops, size_t *nops) { ...@@ -117,12 +121,12 @@ void CallOpBuffer::FillOps(grpc_op *ops, size_t *nops) {
(*nops)++; (*nops)++;
} }
if (send_message_) { if (send_message_) {
bool success = SerializeProto(*send_message_, &write_buffer_); bool success = SerializeProto(*send_message_, &send_message_buf_);
if (!success) { if (!success) {
// TODO handle parse failure // TODO handle parse failure
} }
ops[*nops].op = GRPC_OP_SEND_MESSAGE; ops[*nops].op = GRPC_OP_SEND_MESSAGE;
ops[*nops].data.send_message = write_buffer_; ops[*nops].data.send_message = send_message_buf_;
(*nops)++; (*nops)++;
} }
if (recv_message_) { if (recv_message_) {
...@@ -136,7 +140,7 @@ void CallOpBuffer::FillOps(grpc_op *ops, size_t *nops) { ...@@ -136,7 +140,7 @@ void CallOpBuffer::FillOps(grpc_op *ops, size_t *nops) {
} }
if (recv_status_) { if (recv_status_) {
ops[*nops].op = GRPC_OP_RECV_STATUS_ON_CLIENT; ops[*nops].op = GRPC_OP_RECV_STATUS_ON_CLIENT;
// ops[*nops].data.recv_status_on_client.trailing_metadata = // TODO ops[*nops].data.recv_status_on_client.trailing_metadata =
ops[*nops].data.recv_status_on_client.status = &status_code_; ops[*nops].data.recv_status_on_client.status = &status_code_;
ops[*nops].data.recv_status_on_client.status_details = &status_details_; ops[*nops].data.recv_status_on_client.status_details = &status_details_;
ops[*nops].data.recv_status_on_client.status_details_capacity = &status_details_capacity_; ops[*nops].data.recv_status_on_client.status_details_capacity = &status_details_capacity_;
...@@ -144,11 +148,32 @@ void CallOpBuffer::FillOps(grpc_op *ops, size_t *nops) { ...@@ -144,11 +148,32 @@ void CallOpBuffer::FillOps(grpc_op *ops, size_t *nops) {
} }
} }
void CallOpBuffer::FinalizeResult(void *tag, bool *status) { void CallOpBuffer::FinalizeResult(void **tag, bool *status) {
// Release send buffers // Release send buffers.
if (write_buffer_) { if (send_message_buf_) {
grpc_byte_buffer_destroy(write_buffer_); grpc_byte_buffer_destroy(send_message_buf_);
write_buffer_ = nullptr; send_message_buf_ = nullptr;
}
if (initial_metadata_) {
gpr_free(initial_metadata_);
initial_metadata_ = nullptr;
}
// Set user-facing tag.
*tag = return_tag_;
// Parse received message if any.
if (recv_message_ && recv_message_buf_) {
*status = DeserializeProto(recv_message_buf_, recv_message_);
grpc_byte_buffer_destroy(recv_message_buf_);
recv_message_buf_ = nullptr;
}
// Parse received status.
if (recv_status_) {
if (status_details_) {
*recv_status_ = Status(static_cast<StatusCode>(status_code_),
grpc::string(status_details_, status_details_capacity_));
} else {
*recv_status_ = Status(static_cast<StatusCode>(status_code_));
}
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment