Skip to content
Snippets Groups Projects
Commit 087ebf55 authored by Yang Gao's avatar Yang Gao
Browse files

Merge pull request #30 from ctiller/cppapi

Make the C++ API use C API calls to fiddle with status
parents f1f7213a fc4a449c
No related branches found
No related tags found
No related merge requests found
...@@ -96,7 +96,7 @@ class ClientReader : public ClientStreamingInterface, ...@@ -96,7 +96,7 @@ class ClientReader : public ClientStreamingInterface,
virtual bool Read(R* msg) { return context_->Read(msg); } virtual bool Read(R* msg) { return context_->Read(msg); }
virtual void Cancel() { context_->FinishStream(Status::Cancelled, true); } virtual void Cancel() { context_->Cancel(); }
virtual const Status& Wait() { return context_->Wait(); } virtual const Status& Wait() { return context_->Wait(); }
...@@ -122,7 +122,7 @@ class ClientWriter : public ClientStreamingInterface, ...@@ -122,7 +122,7 @@ class ClientWriter : public ClientStreamingInterface,
virtual void WritesDone() { context_->Write(nullptr, true); } virtual void WritesDone() { context_->Write(nullptr, true); }
virtual void Cancel() { context_->FinishStream(Status::Cancelled, true); } virtual void Cancel() { context_->Cancel(); }
// Read the final response and wait for the final status. // Read the final response and wait for the final status.
virtual const Status& Wait() { virtual const Status& Wait() {
...@@ -165,7 +165,7 @@ class ClientReaderWriter : public ClientStreamingInterface, ...@@ -165,7 +165,7 @@ class ClientReaderWriter : public ClientStreamingInterface,
virtual void WritesDone() { context_->Write(nullptr, true); } virtual void WritesDone() { context_->Write(nullptr, true); }
virtual void Cancel() { context_->FinishStream(Status::Cancelled, true); } virtual void Cancel() { context_->Cancel(); }
virtual const Status& Wait() { return context_->Wait(); } virtual const Status& Wait() { return context_->Wait(); }
......
...@@ -53,7 +53,7 @@ class StreamContextInterface { ...@@ -53,7 +53,7 @@ class StreamContextInterface {
virtual bool Read(google::protobuf::Message* msg) = 0; virtual bool Read(google::protobuf::Message* msg) = 0;
virtual bool Write(const google::protobuf::Message* msg, bool is_last) = 0; virtual bool Write(const google::protobuf::Message* msg, bool is_last) = 0;
virtual const Status& Wait() = 0; virtual const Status& Wait() = 0;
virtual void FinishStream(const Status& status, bool send) = 0; virtual void Cancel() = 0;
virtual google::protobuf::Message* request() = 0; virtual google::protobuf::Message* request() = 0;
virtual google::protobuf::Message* response() = 0; virtual google::protobuf::Message* response() = 0;
......
...@@ -365,6 +365,16 @@ grpc_call_error grpc_call_server_end_initial_metadata(grpc_call *call, ...@@ -365,6 +365,16 @@ grpc_call_error grpc_call_server_end_initial_metadata(grpc_call *call,
Can be called multiple times, from any thread. */ Can be called multiple times, from any thread. */
grpc_call_error grpc_call_cancel(grpc_call *call); grpc_call_error grpc_call_cancel(grpc_call *call);
/* Called by clients to cancel an RPC on the server.
Can be called multiple times, from any thread.
If a status has not been received for the call, set it to the status code
and description passed in.
Importantly, this function does not send status nor description to the
remote endpoint. */
grpc_call_error grpc_call_cancel_with_status(grpc_call *call,
grpc_status_code status,
const char *description);
/* Queue a byte buffer for writing. /* Queue a byte buffer for writing.
flags is a bit-field combination of the write flags defined above. flags is a bit-field combination of the write flags defined above.
A write with byte_buffer null is allowed, and will not send any bytes on the A write with byte_buffer null is allowed, and will not send any bytes on the
......
...@@ -178,6 +178,7 @@ struct grpc_call { ...@@ -178,6 +178,7 @@ struct grpc_call {
gpr_uint8 received_metadata; gpr_uint8 received_metadata;
gpr_uint8 have_read; gpr_uint8 have_read;
gpr_uint8 have_alarm; gpr_uint8 have_alarm;
gpr_uint8 got_status_code;
/* The current outstanding read message tag (only valid if have_read == 1) */ /* The current outstanding read message tag (only valid if have_read == 1) */
void *read_tag; void *read_tag;
void *metadata_tag; void *metadata_tag;
...@@ -225,6 +226,7 @@ grpc_call *grpc_call_create(grpc_channel *channel, ...@@ -225,6 +226,7 @@ grpc_call *grpc_call_create(grpc_channel *channel,
call->have_write = 0; call->have_write = 0;
call->have_alarm = 0; call->have_alarm = 0;
call->received_metadata = 0; call->received_metadata = 0;
call->got_status_code = 0;
call->status_code = call->status_code =
server_transport_data != NULL ? GRPC_STATUS_OK : GRPC_STATUS_UNKNOWN; server_transport_data != NULL ? GRPC_STATUS_OK : GRPC_STATUS_UNKNOWN;
call->status_details = NULL; call->status_details = NULL;
...@@ -268,6 +270,19 @@ void grpc_call_destroy(grpc_call *c) { ...@@ -268,6 +270,19 @@ void grpc_call_destroy(grpc_call *c) {
grpc_call_internal_unref(c); grpc_call_internal_unref(c);
} }
static void maybe_set_status_code(grpc_call *call, gpr_uint32 status) {
if (!call->got_status_code) {
call->status_code = status;
call->got_status_code = 1;
}
}
static void maybe_set_status_details(grpc_call *call, grpc_mdstr *status) {
if (!call->status_details) {
call->status_details = grpc_mdstr_ref(status);
}
}
grpc_call_error grpc_call_cancel(grpc_call *c) { grpc_call_error grpc_call_cancel(grpc_call *c) {
grpc_call_element *elem; grpc_call_element *elem;
grpc_call_op op; grpc_call_op op;
...@@ -284,6 +299,21 @@ grpc_call_error grpc_call_cancel(grpc_call *c) { ...@@ -284,6 +299,21 @@ grpc_call_error grpc_call_cancel(grpc_call *c) {
return GRPC_CALL_OK; return GRPC_CALL_OK;
} }
grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
grpc_status_code status,
const char *description) {
grpc_mdstr *details =
description ? grpc_mdstr_from_string(c->metadata_context, description)
: NULL;
gpr_mu_lock(&c->read_mu);
maybe_set_status_code(c, status);
if (details) {
maybe_set_status_details(c, details);
}
gpr_mu_unlock(&c->read_mu);
return grpc_call_cancel(c);
}
void grpc_call_execute_op(grpc_call *call, grpc_call_op *op) { void grpc_call_execute_op(grpc_call *call, grpc_call_op *op) {
grpc_call_element *elem; grpc_call_element *elem;
GPR_ASSERT(op->dir == GRPC_CALL_DOWN); GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
...@@ -799,15 +829,14 @@ void grpc_call_recv_metadata(grpc_call_element *elem, grpc_call_op *op) { ...@@ -799,15 +829,14 @@ void grpc_call_recv_metadata(grpc_call_element *elem, grpc_call_op *op) {
grpc_call *call = CALL_FROM_TOP_ELEM(elem); grpc_call *call = CALL_FROM_TOP_ELEM(elem);
grpc_mdelem *md = op->data.metadata; grpc_mdelem *md = op->data.metadata;
grpc_mdstr *key = md->key; grpc_mdstr *key = md->key;
gpr_log(GPR_DEBUG, "call %p got metadata %s %s", call,
grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value));
if (key == grpc_channel_get_status_string(call->channel)) { if (key == grpc_channel_get_status_string(call->channel)) {
call->status_code = decode_status(md); maybe_set_status_code(call, decode_status(md));
grpc_mdelem_unref(md); grpc_mdelem_unref(md);
op->done_cb(op->user_data, GRPC_OP_OK); op->done_cb(op->user_data, GRPC_OP_OK);
} else if (key == grpc_channel_get_message_string(call->channel)) { } else if (key == grpc_channel_get_message_string(call->channel)) {
if (call->status_details) { maybe_set_status_details(call, md->value);
grpc_mdstr_unref(call->status_details);
}
call->status_details = grpc_mdstr_ref(md->value);
grpc_mdelem_unref(md); grpc_mdelem_unref(md);
op->done_cb(op->user_data, GRPC_OP_OK); op->done_cb(op->user_data, GRPC_OP_OK);
} else { } else {
......
...@@ -112,9 +112,8 @@ bool StreamContext::Read(google::protobuf::Message* msg) { ...@@ -112,9 +112,8 @@ bool StreamContext::Read(google::protobuf::Message* msg) {
if (read_ev->data.read) { if (read_ev->data.read) {
if (!DeserializeProto(read_ev->data.read, msg)) { if (!DeserializeProto(read_ev->data.read, msg)) {
ret = false; ret = false;
FinishStream( grpc_call_cancel_with_status(call(), GRPC_STATUS_DATA_LOSS,
Status(StatusCode::DATA_LOSS, "Failed to parse incoming proto"), "Failed to parse incoming proto");
true);
} }
} else { } else {
ret = false; ret = false;
...@@ -132,9 +131,8 @@ bool StreamContext::Write(const google::protobuf::Message* msg, bool is_last) { ...@@ -132,9 +131,8 @@ bool StreamContext::Write(const google::protobuf::Message* msg, bool is_last) {
if (msg) { if (msg) {
grpc_byte_buffer* out_buf = nullptr; grpc_byte_buffer* out_buf = nullptr;
if (!SerializeProto(*msg, &out_buf)) { if (!SerializeProto(*msg, &out_buf)) {
FinishStream(Status(StatusCode::INVALID_ARGUMENT, grpc_call_cancel_with_status(call(), GRPC_STATUS_INVALID_ARGUMENT,
"Failed to serialize outgoing proto"), "Failed to serialize outgoing proto");
true);
return false; return false;
} }
int flag = is_last ? GRPC_WRITE_BUFFER_HINT : 0; int flag = is_last ? GRPC_WRITE_BUFFER_HINT : 0;
...@@ -172,29 +170,18 @@ const Status& StreamContext::Wait() { ...@@ -172,29 +170,18 @@ const Status& StreamContext::Wait() {
grpc_event_finish(metadata_ev); grpc_event_finish(metadata_ev);
// TODO(yangg) protect states by a mutex, including other places. // TODO(yangg) protect states by a mutex, including other places.
if (!self_halfclosed_ || !peer_halfclosed_) { if (!self_halfclosed_ || !peer_halfclosed_) {
FinishStream(Status::Cancelled, true); Cancel();
} else { }
grpc_event* finish_ev = grpc_event* finish_ev =
grpc_completion_queue_pluck(cq(), finished_tag(), gpr_inf_future); grpc_completion_queue_pluck(cq(), finished_tag(), gpr_inf_future);
GPR_ASSERT(finish_ev->type == GRPC_FINISHED); GPR_ASSERT(finish_ev->type == GRPC_FINISHED);
final_status_ = Status( final_status_ = Status(
static_cast<StatusCode>(finish_ev->data.finished.status), static_cast<StatusCode>(finish_ev->data.finished.status),
finish_ev->data.finished.details ? finish_ev->data.finished.details finish_ev->data.finished.details ? finish_ev->data.finished.details : "");
: "");
grpc_event_finish(finish_ev); grpc_event_finish(finish_ev);
}
return final_status_; return final_status_;
} }
void StreamContext::FinishStream(const Status& status, bool send) { void StreamContext::Cancel() { grpc_call_cancel(call()); }
if (send) {
grpc_call_cancel(call());
}
grpc_event* finish_ev =
grpc_completion_queue_pluck(cq(), finished_tag(), gpr_inf_future);
GPR_ASSERT(finish_ev->type == GRPC_FINISHED);
grpc_event_finish(finish_ev);
final_status_ = status;
}
} // namespace grpc } // namespace grpc
...@@ -48,7 +48,7 @@ namespace grpc { ...@@ -48,7 +48,7 @@ namespace grpc {
class ClientContext; class ClientContext;
class RpcMethod; class RpcMethod;
class StreamContext : public StreamContextInterface { class StreamContext final : public StreamContextInterface {
public: public:
StreamContext(const RpcMethod& method, ClientContext* context, StreamContext(const RpcMethod& method, ClientContext* context,
const google::protobuf::Message* request, const google::protobuf::Message* request,
...@@ -63,7 +63,7 @@ class StreamContext : public StreamContextInterface { ...@@ -63,7 +63,7 @@ class StreamContext : public StreamContextInterface {
bool Read(google::protobuf::Message* msg) override; bool Read(google::protobuf::Message* msg) override;
bool Write(const google::protobuf::Message* msg, bool is_last) override; bool Write(const google::protobuf::Message* msg, bool is_last) override;
const Status& Wait() override; const Status& Wait() override;
void FinishStream(const Status& status, bool send) override; void Cancel() override;
google::protobuf::Message* request() override { return request_; } google::protobuf::Message* request() override { return request_; }
google::protobuf::Message* response() override { return result_; } google::protobuf::Message* response() override { return result_; }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment