From d248c245463b7b2de1a4007b8029b076f05fe88c Mon Sep 17 00:00:00 2001 From: Craig Tiller <ctiller@google.com> Date: Wed, 14 Jan 2015 11:49:12 -0800 Subject: [PATCH] Update C++ code to set status via the C api. This prevents mismatches from breaking tests. --- include/grpc++/stream.h | 6 ++-- include/grpc++/stream_context_interface.h | 2 +- include/grpc/grpc.h | 8 +++++ src/core/surface/call.c | 33 +++++++++++++++----- src/cpp/stream/stream_context.cc | 38 ++++++++--------------- src/cpp/stream/stream_context.h | 4 +-- third_party/libevent | 1 + 7 files changed, 54 insertions(+), 38 deletions(-) create mode 160000 third_party/libevent diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h index 49f88a6f13..b8982f4d93 100644 --- a/include/grpc++/stream.h +++ b/include/grpc++/stream.h @@ -96,7 +96,7 @@ class ClientReader : public ClientStreamingInterface, virtual bool Read(R* msg) { return context_->Read(msg); } - virtual void Cancel() { context_->FinishStream(Status::Cancelled, true); } + virtual void Cancel() { context_->Cancel(); } virtual const Status& Wait() { return context_->Wait(); } @@ -122,7 +122,7 @@ class ClientWriter : public ClientStreamingInterface, virtual void WritesDone() { context_->Write(nullptr, true); } - virtual void Cancel() { context_->FinishStream(Status::Cancelled, true); } + virtual void Cancel() { context_->Cancel(); } // Read the final response and wait for the final status. virtual const Status& Wait() { @@ -165,7 +165,7 @@ class ClientReaderWriter : public ClientStreamingInterface, virtual void WritesDone() { context_->Write(nullptr, true); } - virtual void Cancel() { context_->FinishStream(Status::Cancelled, true); } + virtual void Cancel() { context_->Cancel(); } virtual const Status& Wait() { return context_->Wait(); } diff --git a/include/grpc++/stream_context_interface.h b/include/grpc++/stream_context_interface.h index 535c0048e6..a84119800b 100644 --- a/include/grpc++/stream_context_interface.h +++ b/include/grpc++/stream_context_interface.h @@ -53,7 +53,7 @@ class StreamContextInterface { virtual bool Read(google::protobuf::Message* msg) = 0; virtual bool Write(const google::protobuf::Message* msg, bool is_last) = 0; virtual const Status& Wait() = 0; - virtual void FinishStream(const Status& status, bool send) = 0; + virtual void Cancel() = 0; virtual google::protobuf::Message* request() = 0; virtual google::protobuf::Message* response() = 0; diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index d8fd03b899..f9d8cce847 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -365,6 +365,14 @@ grpc_call_error grpc_call_server_end_initial_metadata(grpc_call *call, Can be called multiple times, from any thread. */ grpc_call_error grpc_call_cancel(grpc_call *call); +/* Called by clients to cancel an RPC on the server. + Can be called multiple times, from any thread. + If a status has not been received for the call, set it to the status code + and description passed in. + Importantly, this function does not send status nor description to the + remote endpoint. */ +grpc_call_error grpc_call_cancel_with_status(grpc_call *call, grpc_status_code status, const char *description); + /* Queue a byte buffer for writing. flags is a bit-field combination of the write flags defined above. A write with byte_buffer null is allowed, and will not send any bytes on the diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 44ccfd11a7..4ce958171b 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -270,6 +270,19 @@ void grpc_call_destroy(grpc_call *c) { grpc_call_internal_unref(c); } +static void maybe_set_status_code(grpc_call *call, gpr_uint32 status) { + if (!call->got_status_code) { + call->status_code = status; + call->got_status_code = 1; + } +} + +static void maybe_set_status_details(grpc_call *call, grpc_mdstr *status) { + if (!call->status_details) { + call->status_details = grpc_mdstr_ref(status); + } +} + grpc_call_error grpc_call_cancel(grpc_call *c) { grpc_call_element *elem; grpc_call_op op; @@ -286,6 +299,17 @@ grpc_call_error grpc_call_cancel(grpc_call *c) { return GRPC_CALL_OK; } +grpc_call_error grpc_call_cancel_with_status(grpc_call *c, grpc_status_code status, const char *description) { + grpc_mdstr *details = description? grpc_mdstr_from_string(c->metadata_context, description) : NULL; + gpr_mu_lock(&c->read_mu); + maybe_set_status_code(c, status); + if (details) { + maybe_set_status_details(c, details); + } + gpr_mu_unlock(&c->read_mu); + return grpc_call_cancel(c); +} + void grpc_call_execute_op(grpc_call *call, grpc_call_op *op) { grpc_call_element *elem; GPR_ASSERT(op->dir == GRPC_CALL_DOWN); @@ -803,16 +827,11 @@ void grpc_call_recv_metadata(grpc_call_element *elem, grpc_call_op *op) { grpc_mdstr *key = md->key; gpr_log(GPR_DEBUG, "call %p got metadata %s %s", call, grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value)); if (key == grpc_channel_get_status_string(call->channel)) { - if (!call->got_status_code) { - call->status_code = decode_status(md); - call->got_status_code = 1; - } + maybe_set_status_code(call, decode_status(md)); grpc_mdelem_unref(md); op->done_cb(op->user_data, GRPC_OP_OK); } else if (key == grpc_channel_get_message_string(call->channel)) { - if (!call->status_details) { - call->status_details = grpc_mdstr_ref(md->value); - } + maybe_set_status_details(call, md->value); grpc_mdelem_unref(md); op->done_cb(op->user_data, GRPC_OP_OK); } else { diff --git a/src/cpp/stream/stream_context.cc b/src/cpp/stream/stream_context.cc index 7936a30dfd..d46ba6e56a 100644 --- a/src/cpp/stream/stream_context.cc +++ b/src/cpp/stream/stream_context.cc @@ -112,9 +112,7 @@ bool StreamContext::Read(google::protobuf::Message* msg) { if (read_ev->data.read) { if (!DeserializeProto(read_ev->data.read, msg)) { ret = false; - FinishStream( - Status(StatusCode::DATA_LOSS, "Failed to parse incoming proto"), - true); + grpc_call_cancel_with_status(call(), GRPC_STATUS_DATA_LOSS, "Failed to parse incoming proto"); } } else { ret = false; @@ -132,9 +130,7 @@ bool StreamContext::Write(const google::protobuf::Message* msg, bool is_last) { if (msg) { grpc_byte_buffer* out_buf = nullptr; if (!SerializeProto(*msg, &out_buf)) { - FinishStream(Status(StatusCode::INVALID_ARGUMENT, - "Failed to serialize outgoing proto"), - true); + grpc_call_cancel_with_status(call(), GRPC_STATUS_INVALID_ARGUMENT, "Failed to serialize outgoing proto"); return false; } int flag = is_last ? GRPC_WRITE_BUFFER_HINT : 0; @@ -172,29 +168,21 @@ const Status& StreamContext::Wait() { grpc_event_finish(metadata_ev); // TODO(yangg) protect states by a mutex, including other places. if (!self_halfclosed_ || !peer_halfclosed_) { - FinishStream(Status::Cancelled, true); - } else { - grpc_event* finish_ev = - grpc_completion_queue_pluck(cq(), finished_tag(), gpr_inf_future); - GPR_ASSERT(finish_ev->type == GRPC_FINISHED); - final_status_ = Status( - static_cast<StatusCode>(finish_ev->data.finished.status), - finish_ev->data.finished.details ? finish_ev->data.finished.details - : ""); - grpc_event_finish(finish_ev); - } - return final_status_; -} - -void StreamContext::FinishStream(const Status& status, bool send) { - if (send) { - grpc_call_cancel(call()); - } + Cancel(); + } grpc_event* finish_ev = grpc_completion_queue_pluck(cq(), finished_tag(), gpr_inf_future); GPR_ASSERT(finish_ev->type == GRPC_FINISHED); + final_status_ = Status( + static_cast<StatusCode>(finish_ev->data.finished.status), + finish_ev->data.finished.details ? finish_ev->data.finished.details + : ""); grpc_event_finish(finish_ev); - final_status_ = status; + return final_status_; +} + +void StreamContext::Cancel() { + grpc_call_cancel(call()); } } // namespace grpc diff --git a/src/cpp/stream/stream_context.h b/src/cpp/stream/stream_context.h index f70fe6daa3..4781f27a77 100644 --- a/src/cpp/stream/stream_context.h +++ b/src/cpp/stream/stream_context.h @@ -48,7 +48,7 @@ namespace grpc { class ClientContext; class RpcMethod; -class StreamContext : public StreamContextInterface { +class StreamContext final : public StreamContextInterface { public: StreamContext(const RpcMethod& method, ClientContext* context, const google::protobuf::Message* request, @@ -63,7 +63,7 @@ class StreamContext : public StreamContextInterface { bool Read(google::protobuf::Message* msg) override; bool Write(const google::protobuf::Message* msg, bool is_last) override; const Status& Wait() override; - void FinishStream(const Status& status, bool send) override; + void Cancel() override; google::protobuf::Message* request() override { return request_; } google::protobuf::Message* response() override { return result_; } diff --git a/third_party/libevent b/third_party/libevent new file mode 160000 index 0000000000..f7d92c6392 --- /dev/null +++ b/third_party/libevent @@ -0,0 +1 @@ +Subproject commit f7d92c63928a1460f3d99b9bc418bd3b686a0dca -- GitLab