diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h index 49f88a6f135c42713fba7402b04efc653f3d5bd7..b8982f4d93de22d670c3221d2f32b54ae8dbec4b 100644 --- a/include/grpc++/stream.h +++ b/include/grpc++/stream.h @@ -96,7 +96,7 @@ class ClientReader : public ClientStreamingInterface, virtual bool Read(R* msg) { return context_->Read(msg); } - virtual void Cancel() { context_->FinishStream(Status::Cancelled, true); } + virtual void Cancel() { context_->Cancel(); } virtual const Status& Wait() { return context_->Wait(); } @@ -122,7 +122,7 @@ class ClientWriter : public ClientStreamingInterface, virtual void WritesDone() { context_->Write(nullptr, true); } - virtual void Cancel() { context_->FinishStream(Status::Cancelled, true); } + virtual void Cancel() { context_->Cancel(); } // Read the final response and wait for the final status. virtual const Status& Wait() { @@ -165,7 +165,7 @@ class ClientReaderWriter : public ClientStreamingInterface, virtual void WritesDone() { context_->Write(nullptr, true); } - virtual void Cancel() { context_->FinishStream(Status::Cancelled, true); } + virtual void Cancel() { context_->Cancel(); } virtual const Status& Wait() { return context_->Wait(); } diff --git a/include/grpc++/stream_context_interface.h b/include/grpc++/stream_context_interface.h index 535c0048e65af31556bb14e310180621a08a743b..a84119800b77f2d2558e8501d55815230464f79c 100644 --- a/include/grpc++/stream_context_interface.h +++ b/include/grpc++/stream_context_interface.h @@ -53,7 +53,7 @@ class StreamContextInterface { virtual bool Read(google::protobuf::Message* msg) = 0; virtual bool Write(const google::protobuf::Message* msg, bool is_last) = 0; virtual const Status& Wait() = 0; - virtual void FinishStream(const Status& status, bool send) = 0; + virtual void Cancel() = 0; virtual google::protobuf::Message* request() = 0; virtual google::protobuf::Message* response() = 0; diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index d8fd03b8999d7b1b5cb34bb69485edbae12b4cda..f9d8cce8475ecf8ca6dde36b4b378f18704994da 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -365,6 +365,14 @@ grpc_call_error grpc_call_server_end_initial_metadata(grpc_call *call, Can be called multiple times, from any thread. */ grpc_call_error grpc_call_cancel(grpc_call *call); +/* Called by clients to cancel an RPC on the server. + Can be called multiple times, from any thread. + If a status has not been received for the call, set it to the status code + and description passed in. + Importantly, this function does not send status nor description to the + remote endpoint. */ +grpc_call_error grpc_call_cancel_with_status(grpc_call *call, grpc_status_code status, const char *description); + /* Queue a byte buffer for writing. flags is a bit-field combination of the write flags defined above. A write with byte_buffer null is allowed, and will not send any bytes on the diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 44ccfd11a76454965b7ac0d3d94f109ef324c9dd..4ce958171b54d648824fd23753cd2276ed6d56c9 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -270,6 +270,19 @@ void grpc_call_destroy(grpc_call *c) { grpc_call_internal_unref(c); } +static void maybe_set_status_code(grpc_call *call, gpr_uint32 status) { + if (!call->got_status_code) { + call->status_code = status; + call->got_status_code = 1; + } +} + +static void maybe_set_status_details(grpc_call *call, grpc_mdstr *status) { + if (!call->status_details) { + call->status_details = grpc_mdstr_ref(status); + } +} + grpc_call_error grpc_call_cancel(grpc_call *c) { grpc_call_element *elem; grpc_call_op op; @@ -286,6 +299,17 @@ grpc_call_error grpc_call_cancel(grpc_call *c) { return GRPC_CALL_OK; } +grpc_call_error grpc_call_cancel_with_status(grpc_call *c, grpc_status_code status, const char *description) { + grpc_mdstr *details = description? grpc_mdstr_from_string(c->metadata_context, description) : NULL; + gpr_mu_lock(&c->read_mu); + maybe_set_status_code(c, status); + if (details) { + maybe_set_status_details(c, details); + } + gpr_mu_unlock(&c->read_mu); + return grpc_call_cancel(c); +} + void grpc_call_execute_op(grpc_call *call, grpc_call_op *op) { grpc_call_element *elem; GPR_ASSERT(op->dir == GRPC_CALL_DOWN); @@ -803,16 +827,11 @@ void grpc_call_recv_metadata(grpc_call_element *elem, grpc_call_op *op) { grpc_mdstr *key = md->key; gpr_log(GPR_DEBUG, "call %p got metadata %s %s", call, grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value)); if (key == grpc_channel_get_status_string(call->channel)) { - if (!call->got_status_code) { - call->status_code = decode_status(md); - call->got_status_code = 1; - } + maybe_set_status_code(call, decode_status(md)); grpc_mdelem_unref(md); op->done_cb(op->user_data, GRPC_OP_OK); } else if (key == grpc_channel_get_message_string(call->channel)) { - if (!call->status_details) { - call->status_details = grpc_mdstr_ref(md->value); - } + maybe_set_status_details(call, md->value); grpc_mdelem_unref(md); op->done_cb(op->user_data, GRPC_OP_OK); } else { diff --git a/src/cpp/stream/stream_context.cc b/src/cpp/stream/stream_context.cc index 7936a30dfd7c66f060a8bc45528c8b57f2b3f2a3..d46ba6e56ae2807106f264a6974a96fce5915363 100644 --- a/src/cpp/stream/stream_context.cc +++ b/src/cpp/stream/stream_context.cc @@ -112,9 +112,7 @@ bool StreamContext::Read(google::protobuf::Message* msg) { if (read_ev->data.read) { if (!DeserializeProto(read_ev->data.read, msg)) { ret = false; - FinishStream( - Status(StatusCode::DATA_LOSS, "Failed to parse incoming proto"), - true); + grpc_call_cancel_with_status(call(), GRPC_STATUS_DATA_LOSS, "Failed to parse incoming proto"); } } else { ret = false; @@ -132,9 +130,7 @@ bool StreamContext::Write(const google::protobuf::Message* msg, bool is_last) { if (msg) { grpc_byte_buffer* out_buf = nullptr; if (!SerializeProto(*msg, &out_buf)) { - FinishStream(Status(StatusCode::INVALID_ARGUMENT, - "Failed to serialize outgoing proto"), - true); + grpc_call_cancel_with_status(call(), GRPC_STATUS_INVALID_ARGUMENT, "Failed to serialize outgoing proto"); return false; } int flag = is_last ? GRPC_WRITE_BUFFER_HINT : 0; @@ -172,29 +168,21 @@ const Status& StreamContext::Wait() { grpc_event_finish(metadata_ev); // TODO(yangg) protect states by a mutex, including other places. if (!self_halfclosed_ || !peer_halfclosed_) { - FinishStream(Status::Cancelled, true); - } else { - grpc_event* finish_ev = - grpc_completion_queue_pluck(cq(), finished_tag(), gpr_inf_future); - GPR_ASSERT(finish_ev->type == GRPC_FINISHED); - final_status_ = Status( - static_cast<StatusCode>(finish_ev->data.finished.status), - finish_ev->data.finished.details ? finish_ev->data.finished.details - : ""); - grpc_event_finish(finish_ev); - } - return final_status_; -} - -void StreamContext::FinishStream(const Status& status, bool send) { - if (send) { - grpc_call_cancel(call()); - } + Cancel(); + } grpc_event* finish_ev = grpc_completion_queue_pluck(cq(), finished_tag(), gpr_inf_future); GPR_ASSERT(finish_ev->type == GRPC_FINISHED); + final_status_ = Status( + static_cast<StatusCode>(finish_ev->data.finished.status), + finish_ev->data.finished.details ? finish_ev->data.finished.details + : ""); grpc_event_finish(finish_ev); - final_status_ = status; + return final_status_; +} + +void StreamContext::Cancel() { + grpc_call_cancel(call()); } } // namespace grpc diff --git a/src/cpp/stream/stream_context.h b/src/cpp/stream/stream_context.h index f70fe6daa347ac2bb4210628fea4c8aebc4e64d9..4781f27a77f35c08701f40b3102afdc607d41e22 100644 --- a/src/cpp/stream/stream_context.h +++ b/src/cpp/stream/stream_context.h @@ -48,7 +48,7 @@ namespace grpc { class ClientContext; class RpcMethod; -class StreamContext : public StreamContextInterface { +class StreamContext final : public StreamContextInterface { public: StreamContext(const RpcMethod& method, ClientContext* context, const google::protobuf::Message* request, @@ -63,7 +63,7 @@ class StreamContext : public StreamContextInterface { bool Read(google::protobuf::Message* msg) override; bool Write(const google::protobuf::Message* msg, bool is_last) override; const Status& Wait() override; - void FinishStream(const Status& status, bool send) override; + void Cancel() override; google::protobuf::Message* request() override { return request_; } google::protobuf::Message* response() override { return result_; } diff --git a/third_party/libevent b/third_party/libevent new file mode 160000 index 0000000000000000000000000000000000000000..f7d92c63928a1460f3d99b9bc418bd3b686a0dca --- /dev/null +++ b/third_party/libevent @@ -0,0 +1 @@ +Subproject commit f7d92c63928a1460f3d99b9bc418bd3b686a0dca