diff --git a/include/grpc++/impl/codegen/async_unary_call.h b/include/grpc++/impl/codegen/async_unary_call.h index dd65cf89d6e7c1732aad742fca645e225ba9a949..b120b37f1fef05487fb267e0231dfef04e78cbf0 100644 --- a/include/grpc++/impl/codegen/async_unary_call.h +++ b/include/grpc++/impl/codegen/async_unary_call.h @@ -68,10 +68,9 @@ class ClientAsyncResponseReader final ClientContext* context, const W& request) { Call call = channel->CreateCall(method, context, cq); - ClientAsyncResponseReader* reader = static_cast<ClientAsyncResponseReader*>( - grpc_call_arena_alloc(call.call(), sizeof(*reader))); - new (&reader->call_) Call(std::move(call)); - reader->context_ = context; + ClientAsyncResponseReader* reader = + new (grpc_call_arena_alloc(call.call(), sizeof(*reader))) + ClientAsyncResponseReader(call, context); reader->init_buf_.SendInitialMetadata(context->send_initial_metadata_, context->initial_metadata_flags()); @@ -107,11 +106,15 @@ class ClientAsyncResponseReader final } private: - ClientContext* context_; + ClientContext* const context_; Call call_; + ClientAsyncResponseReader(Call call, ClientContext* context) + : context_(context), call_(call) {} + // disable operator new static void* operator new(std::size_t size); + static void* operator new(std::size_t size, void* p) { return p; }; SneakyCallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> diff --git a/include/grpc++/impl/codegen/call.h b/include/grpc++/impl/codegen/call.h index 4a52c2cbcfa28c07d3c96fd47215cd1d1f919890..56dd7b9685084a107e357152da0fd53b804ae943 100644 --- a/include/grpc++/impl/codegen/call.h +++ b/include/grpc++/impl/codegen/call.h @@ -570,7 +570,7 @@ class CallOpSetInterface : public CompletionQueueTag { public: /// Fills in grpc_op, starting from ops[*nops] and moving /// upwards. - virtual void FillOps(grpc_op* ops, size_t* nops) = 0; + virtual void FillOps(grpc_call* call, grpc_op* ops, size_t* nops) = 0; }; /// Primary implementaiton of CallOpSetInterface. @@ -598,10 +598,11 @@ class CallOpSet : public CallOpSetInterface, this->Op4::AddOp(ops, nops); this->Op5::AddOp(ops, nops); this->Op6::AddOp(ops, nops); - grpc_call_ref(call); + g_core_codegen_interface->grpc_call_ref(call); + call_ = call; } - bool FinalizeResult(grpc_call* call, void** tag, bool* status) override { + bool FinalizeResult(void** tag, bool* status) override { this->Op1::FinishOp(status); this->Op2::FinishOp(status); this->Op3::FinishOp(status); @@ -609,7 +610,7 @@ class CallOpSet : public CallOpSetInterface, this->Op5::FinishOp(status); this->Op6::FinishOp(status); *tag = return_tag_; - grpc_call_unref(call); + g_core_codegen_interface->grpc_call_unref(call_); return true; } @@ -617,6 +618,7 @@ class CallOpSet : public CallOpSetInterface, private: void* return_tag_; + grpc_call* call_; }; /// A CallOpSet that does not post completions to the completion queue. diff --git a/include/grpc++/impl/codegen/core_codegen.h b/include/grpc++/impl/codegen/core_codegen.h index 754bf14b2592c2eb7b203ea7490a4ac206ad3246..b579849acaa6c1fd5fe30d1b54b9c364432a35a4 100644 --- a/include/grpc++/impl/codegen/core_codegen.h +++ b/include/grpc++/impl/codegen/core_codegen.h @@ -64,6 +64,9 @@ class CoreCodegen : public CoreCodegenInterface { void gpr_cv_signal(gpr_cv* cv) override; void gpr_cv_broadcast(gpr_cv* cv) override; + void grpc_call_ref(grpc_call* call) override; + void grpc_call_unref(grpc_call* call) override; + void grpc_byte_buffer_destroy(grpc_byte_buffer* bb) override; int grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader, diff --git a/include/grpc++/impl/codegen/core_codegen_interface.h b/include/grpc++/impl/codegen/core_codegen_interface.h index 45ea0403031b9c2dd1186a8c5d1386cd5221f50e..12464591a42b37d77819d0e4f25f8270a08936de 100644 --- a/include/grpc++/impl/codegen/core_codegen_interface.h +++ b/include/grpc++/impl/codegen/core_codegen_interface.h @@ -94,6 +94,9 @@ class CoreCodegenInterface { virtual grpc_byte_buffer* grpc_raw_byte_buffer_create(grpc_slice* slice, size_t nslices) = 0; + virtual void grpc_call_ref(grpc_call* call) = 0; + virtual void grpc_call_unref(grpc_call* call) = 0; + virtual grpc_slice grpc_slice_malloc(size_t length) = 0; virtual void grpc_slice_unref(grpc_slice slice) = 0; virtual grpc_slice grpc_slice_split_tail(grpc_slice* s, size_t split) = 0; diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc index c985183ae7600022d36bef593547202772ff76d8..fac1ba9d43c1691c0cdbd4bf156ab59f50e9f723 100644 --- a/src/cpp/client/channel_cc.cc +++ b/src/cpp/client/channel_cc.cc @@ -131,7 +131,7 @@ void Channel::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) { static const size_t MAX_OPS = 8; size_t nops = 0; grpc_op cops[MAX_OPS]; - ops->FillOps(cops, &nops); + ops->FillOps(call->call(), cops, &nops); GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call->call(), cops, nops, ops, nullptr)); } diff --git a/src/cpp/common/core_codegen.cc b/src/cpp/common/core_codegen.cc index 36e4c893540b2c277037c674d355d0e31032183a..902eee568cb8b4849ef98c6cc92d2f50ee93ac3a 100644 --- a/src/cpp/common/core_codegen.cc +++ b/src/cpp/common/core_codegen.cc @@ -91,6 +91,9 @@ void CoreCodegen::grpc_byte_buffer_destroy(grpc_byte_buffer* bb) { ::grpc_byte_buffer_destroy(bb); } +void CoreCodegen::grpc_call_ref(grpc_call* call) { ::grpc_call_ref(call); } +void CoreCodegen::grpc_call_unref(grpc_call* call) { ::grpc_call_unref(call); } + int CoreCodegen::grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader, grpc_byte_buffer* buffer) { return ::grpc_byte_buffer_reader_init(reader, buffer); diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index e874892e73c0a173fb680fb0767a3846e34a2b5c..a611e1a77b2ef45cc2f6399a08f6010f8213d8db 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -591,7 +591,7 @@ void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) { static const size_t MAX_OPS = 8; size_t nops = 0; grpc_op cops[MAX_OPS]; - ops->FillOps(cops, &nops); + ops->FillOps(call->call(), cops, &nops); auto result = grpc_call_start_batch(call->call(), cops, nops, ops, nullptr); GPR_ASSERT(GRPC_CALL_OK == result); } diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index c0c40fda7be53930963f2cc5c16ae607271f614c..cc6cf2353ef2879f8b13cdd268117005a0335990 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -62,7 +62,7 @@ class ServerContext::CompletionOp final : public CallOpSetInterface { finalized_(false), cancelled_(0) {} - void FillOps(grpc_op* ops, size_t* nops) override; + void FillOps(grpc_call* call, grpc_op* ops, size_t* nops) override; bool FinalizeResult(void** tag, bool* status) override; bool CheckCancelled(CompletionQueue* cq) { @@ -100,7 +100,8 @@ void ServerContext::CompletionOp::Unref() { } } -void ServerContext::CompletionOp::FillOps(grpc_op* ops, size_t* nops) { +void ServerContext::CompletionOp::FillOps(grpc_call* call, grpc_op* ops, + size_t* nops) { ops->op = GRPC_OP_RECV_CLOSE_ON_SERVER; ops->data.recv_close_on_server.cancelled = &cancelled_; ops->flags = 0;