diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index 4bd8933cb4827bb2aaeeb294c6e64801c8f1bcc0..40778a868c694802e2e143c69a1be776eba7cecc 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -354,15 +354,19 @@ typedef struct grpc_op { /* Propagation bits: this can be bitwise or-ed to form propagation_mask for * grpc_call */ /** Propagate deadline */ -#define GRPC_PROPAGATE_DEADLINE 1 +#define GRPC_PROPAGATE_DEADLINE ((gpr_uint32)1) /** Propagate census context */ -#define GRPC_PROPAGATE_CENSUS_CONTEXT 2 -/* TODO(ctiller): -#define GRPC_PROPAGATE_CANCELLATION 4 -*/ - +#define GRPC_PROPAGATE_CENSUS_CONTEXT ((gpr_uint32)2) +#define GRPC_PROPAGATE_CANCELLATION ((gpr_uint32)4) +#define GRPC_PROPAGATE_AUTH ((gpr_uint32)8) + +/* Default propagation mask: clients of the core API are encouraged to encode + deltas from this in their implementations... ie write: + GRPC_PROPAGATE_DEFAULTS & ~GRPC_PROPAGATE_DEADLINE to disable deadline + propagation. Doing so gives flexibility in the future to define new + propagation types that are default inherited or not. */ #define GRPC_PROPAGATE_DEFAULTS \ - (GRPC_PROPAGATE_DEADLINE | GRPC_PROPAGATE_CENSUS_CONTEXT) + ((gpr_uint32)((0xffff | GRPC_PROPAGATE_DEADLINE | GRPC_PROPAGATE_CENSUS_CONTEXT))) /** Initialize the grpc library. diff --git a/src/core/surface/call.c b/src/core/surface/call.c index aa1060aebd577de3d10ed281843a7e95bf619bff..14118c94968c5f10491e7ec7f231d1de742ae02a 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -144,6 +144,7 @@ struct grpc_call { grpc_completion_queue *cq; grpc_channel *channel; grpc_call *parent; + grpc_call *first_child; grpc_mdctx *metadata_context; /* TODO(ctiller): share with cq if possible? */ gpr_mu mu; @@ -177,6 +178,8 @@ struct grpc_call { gpr_uint8 cancel_alarm; /** bitmask of allocated completion events in completions */ gpr_uint8 allocated_completions; + /** flag indicating that cancellation is inherited */ + gpr_uint8 cancellation_is_inherited; /* flags with bits corresponding to write states allowing us to determine what was sent */ @@ -268,6 +271,11 @@ struct grpc_call { /** completion events - for completion queue use */ grpc_cq_completion completions[MAX_CONCURRENT_COMPLETIONS]; + + /** siblings: children of the same parent form a list, and this list is protected under + parent->mu */ + grpc_call *sibling_next; + grpc_call *sibling_prev; }; #define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1)) @@ -314,24 +322,6 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call, } call->parent = parent_call; call->is_client = server_transport_data == NULL; - if (parent_call != NULL) { - GRPC_CALL_INTERNAL_REF(parent_call, "child"); - GPR_ASSERT(call->is_client); - GPR_ASSERT(!parent_call->is_client); - - if (propagation_mask & GRPC_PROPAGATE_DEADLINE) { - send_deadline = gpr_time_min( - gpr_convert_clock_type(send_deadline, - parent_call->send_deadline.clock_type), - parent_call->send_deadline); - } - if (propagation_mask & GRPC_PROPAGATE_CENSUS_CONTEXT) { - grpc_call_context_set(call, GRPC_CONTEXT_TRACING, - parent_call->context[GRPC_CONTEXT_TRACING].value, - NULL); - } - /* cancellation is done last */ - } for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) { call->request_set[i] = REQSET_EMPTY; } @@ -369,6 +359,39 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call, } grpc_call_stack_init(channel_stack, server_transport_data, initial_op_ptr, CALL_STACK_FROM_CALL(call)); + if (parent_call != NULL) { + GRPC_CALL_INTERNAL_REF(parent_call, "child"); + GPR_ASSERT(call->is_client); + GPR_ASSERT(!parent_call->is_client); + + gpr_mu_lock(&parent_call->mu); + + if (propagation_mask & GRPC_PROPAGATE_DEADLINE) { + send_deadline = gpr_time_min( + gpr_convert_clock_type(send_deadline, + parent_call->send_deadline.clock_type), + parent_call->send_deadline); + } + if (propagation_mask & GRPC_PROPAGATE_CENSUS_CONTEXT) { + grpc_call_context_set(call, GRPC_CONTEXT_TRACING, + parent_call->context[GRPC_CONTEXT_TRACING].value, + NULL); + } + if (propagation_mask & GRPC_PROPAGATE_CANCELLATION) { + call->cancellation_is_inherited = 1; + } + + if (parent_call->first_child == NULL) { + parent_call->first_child = call; + call->sibling_next = call->sibling_prev = call; + } else { + call->sibling_next = parent_call->first_child; + call->sibling_prev = parent_call->first_child->sibling_prev; + call->sibling_next->sibling_prev = call->sibling_prev->sibling_next = call; + } + + gpr_mu_unlock(&parent_call->mu); + } if (gpr_time_cmp(send_deadline, gpr_inf_future(send_deadline.clock_type)) != 0) { set_deadline_alarm(call, send_deadline); @@ -427,6 +450,18 @@ static void destroy_call(void *call, int ignored_success) { size_t i; grpc_call *c = call; grpc_call *parent = c->parent; + if (parent) { + gpr_mu_lock(&parent->mu); + if (call == parent->first_child) { + parent->first_child = c->sibling_next; + if (c == parent->first_child) { + parent->first_child = NULL; + } + c->sibling_prev->sibling_next = c->sibling_next; + c->sibling_next->sibling_prev = c->sibling_prev; + } + gpr_mu_unlock(&parent->mu); + } grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c)); GRPC_CHANNEL_INTERNAL_UNREF(c->channel, "call"); gpr_mu_destroy(&c->mu); @@ -459,9 +494,6 @@ static void destroy_call(void *call, int ignored_success) { GRPC_CQ_INTERNAL_UNREF(c->cq, "bind"); } gpr_free(c); - if (parent) { - GRPC_CALL_INTERNAL_UNREF(parent, "child", 1); - } } #ifdef GRPC_CALL_REF_COUNT_DEBUG @@ -896,6 +928,8 @@ static int add_slice_to_message(grpc_call *call, gpr_slice slice) { static void call_on_done_recv(void *pc, int success) { grpc_call *call = pc; + grpc_call *child_call; + grpc_call *next_child_call; size_t i; GRPC_TIMER_BEGIN(GRPC_PTAG_CALL_ON_DONE_RECV, 0); lock(call); @@ -929,6 +963,19 @@ static void call_on_done_recv(void *pc, int success) { GPR_ASSERT(call->read_state <= READ_STATE_STREAM_CLOSED); call->read_state = READ_STATE_STREAM_CLOSED; call->cancel_alarm |= call->have_alarm; + /* propagate cancellation to any interested children */ + child_call = call->first_child; + if (child_call != NULL) { + do { + next_child_call = child_call->sibling_next; + if (child_call->cancellation_is_inherited) { + GRPC_CALL_INTERNAL_REF(child_call, "propagate_cancel"); + grpc_call_cancel(child_call); + GRPC_CALL_INTERNAL_UNREF(child_call, "propagate_cancel", 0); + } + child_call = next_child_call; + } while (child_call != call->first_child); + } GRPC_CALL_INTERNAL_UNREF(call, "closed", 0); } finish_read_ops(call); diff --git a/src/cpp/client/channel.cc b/src/cpp/client/channel.cc index c2f9db20aae8c568afa5df68e03da101e33c783b..463e8bccb28534bbf19cd0e687c41c6165e86b85 100644 --- a/src/cpp/client/channel.cc +++ b/src/cpp/client/channel.cc @@ -60,10 +60,10 @@ Call Channel::CreateCall(const RpcMethod& method, ClientContext* context, CompletionQueue* cq) { auto c_call = method.channel_tag() && context->authority().empty() ? grpc_channel_create_registered_call( - c_channel_, NULL, GRPC_INHERIT_DEFAULTS, cq->cq(), + c_channel_, NULL, GRPC_PROPAGATE_DEFAULTS, cq->cq(), method.channel_tag(), context->raw_deadline()) : grpc_channel_create_call( - c_channel_, NULL, GRPC_INHERIT_DEFAULTS, cq->cq(), + c_channel_, NULL, GRPC_PROPAGATE_DEFAULTS, cq->cq(), method.name(), context->authority().empty() ? target_.c_str() : context->authority().c_str(), diff --git a/src/node/ext/call.cc b/src/node/ext/call.cc index e4451c36f69ba4e0908c5f2c891fe3313d860d38..fe585a0d4ff86b330bc216fe863341540de4510b 100644 --- a/src/node/ext/call.cc +++ b/src/node/ext/call.cc @@ -511,7 +511,7 @@ NAN_METHOD(Call::New) { double deadline = args[2]->NumberValue(); grpc_channel *wrapped_channel = channel->GetWrappedChannel(); grpc_call *wrapped_call = grpc_channel_create_call( - wrapped_channel, NULL, GRPC_INHERIT_DEFAULTS, + wrapped_channel, NULL, GRPC_PROPAGATE_DEFAULTS, CompletionQueueAsyncWorker::GetQueue(), *method, channel->GetHost(), MillisecondsToTimespec(deadline)); call = new Call(wrapped_call);