Skip to content
Snippets Groups Projects
Commit c7df0df6 authored by Craig Tiller's avatar Craig Tiller
Browse files

Implement cancellation propagation, define auth propagation

parent 6728bef3
No related branches found
No related tags found
No related merge requests found
...@@ -354,15 +354,19 @@ typedef struct grpc_op { ...@@ -354,15 +354,19 @@ typedef struct grpc_op {
/* Propagation bits: this can be bitwise or-ed to form propagation_mask for /* Propagation bits: this can be bitwise or-ed to form propagation_mask for
* grpc_call */ * grpc_call */
/** Propagate deadline */ /** Propagate deadline */
#define GRPC_PROPAGATE_DEADLINE 1 #define GRPC_PROPAGATE_DEADLINE ((gpr_uint32)1)
/** Propagate census context */ /** Propagate census context */
#define GRPC_PROPAGATE_CENSUS_CONTEXT 2 #define GRPC_PROPAGATE_CENSUS_CONTEXT ((gpr_uint32)2)
/* TODO(ctiller): #define GRPC_PROPAGATE_CANCELLATION ((gpr_uint32)4)
#define GRPC_PROPAGATE_CANCELLATION 4 #define GRPC_PROPAGATE_AUTH ((gpr_uint32)8)
*/
/* Default propagation mask: clients of the core API are encouraged to encode
deltas from this in their implementations... ie write:
GRPC_PROPAGATE_DEFAULTS & ~GRPC_PROPAGATE_DEADLINE to disable deadline
propagation. Doing so gives flexibility in the future to define new
propagation types that are default inherited or not. */
#define GRPC_PROPAGATE_DEFAULTS \ #define GRPC_PROPAGATE_DEFAULTS \
(GRPC_PROPAGATE_DEADLINE | GRPC_PROPAGATE_CENSUS_CONTEXT) ((gpr_uint32)((0xffff | GRPC_PROPAGATE_DEADLINE | GRPC_PROPAGATE_CENSUS_CONTEXT)))
/** Initialize the grpc library. /** Initialize the grpc library.
......
...@@ -144,6 +144,7 @@ struct grpc_call { ...@@ -144,6 +144,7 @@ struct grpc_call {
grpc_completion_queue *cq; grpc_completion_queue *cq;
grpc_channel *channel; grpc_channel *channel;
grpc_call *parent; grpc_call *parent;
grpc_call *first_child;
grpc_mdctx *metadata_context; grpc_mdctx *metadata_context;
/* TODO(ctiller): share with cq if possible? */ /* TODO(ctiller): share with cq if possible? */
gpr_mu mu; gpr_mu mu;
...@@ -177,6 +178,8 @@ struct grpc_call { ...@@ -177,6 +178,8 @@ struct grpc_call {
gpr_uint8 cancel_alarm; gpr_uint8 cancel_alarm;
/** bitmask of allocated completion events in completions */ /** bitmask of allocated completion events in completions */
gpr_uint8 allocated_completions; gpr_uint8 allocated_completions;
/** flag indicating that cancellation is inherited */
gpr_uint8 cancellation_is_inherited;
/* flags with bits corresponding to write states allowing us to determine /* flags with bits corresponding to write states allowing us to determine
what was sent */ what was sent */
...@@ -268,6 +271,11 @@ struct grpc_call { ...@@ -268,6 +271,11 @@ struct grpc_call {
/** completion events - for completion queue use */ /** completion events - for completion queue use */
grpc_cq_completion completions[MAX_CONCURRENT_COMPLETIONS]; grpc_cq_completion completions[MAX_CONCURRENT_COMPLETIONS];
/** siblings: children of the same parent form a list, and this list is protected under
parent->mu */
grpc_call *sibling_next;
grpc_call *sibling_prev;
}; };
#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1)) #define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1))
...@@ -314,24 +322,6 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call, ...@@ -314,24 +322,6 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call,
} }
call->parent = parent_call; call->parent = parent_call;
call->is_client = server_transport_data == NULL; call->is_client = server_transport_data == NULL;
if (parent_call != NULL) {
GRPC_CALL_INTERNAL_REF(parent_call, "child");
GPR_ASSERT(call->is_client);
GPR_ASSERT(!parent_call->is_client);
if (propagation_mask & GRPC_PROPAGATE_DEADLINE) {
send_deadline = gpr_time_min(
gpr_convert_clock_type(send_deadline,
parent_call->send_deadline.clock_type),
parent_call->send_deadline);
}
if (propagation_mask & GRPC_PROPAGATE_CENSUS_CONTEXT) {
grpc_call_context_set(call, GRPC_CONTEXT_TRACING,
parent_call->context[GRPC_CONTEXT_TRACING].value,
NULL);
}
/* cancellation is done last */
}
for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) { for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
call->request_set[i] = REQSET_EMPTY; call->request_set[i] = REQSET_EMPTY;
} }
...@@ -369,6 +359,39 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call, ...@@ -369,6 +359,39 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call,
} }
grpc_call_stack_init(channel_stack, server_transport_data, initial_op_ptr, grpc_call_stack_init(channel_stack, server_transport_data, initial_op_ptr,
CALL_STACK_FROM_CALL(call)); CALL_STACK_FROM_CALL(call));
if (parent_call != NULL) {
GRPC_CALL_INTERNAL_REF(parent_call, "child");
GPR_ASSERT(call->is_client);
GPR_ASSERT(!parent_call->is_client);
gpr_mu_lock(&parent_call->mu);
if (propagation_mask & GRPC_PROPAGATE_DEADLINE) {
send_deadline = gpr_time_min(
gpr_convert_clock_type(send_deadline,
parent_call->send_deadline.clock_type),
parent_call->send_deadline);
}
if (propagation_mask & GRPC_PROPAGATE_CENSUS_CONTEXT) {
grpc_call_context_set(call, GRPC_CONTEXT_TRACING,
parent_call->context[GRPC_CONTEXT_TRACING].value,
NULL);
}
if (propagation_mask & GRPC_PROPAGATE_CANCELLATION) {
call->cancellation_is_inherited = 1;
}
if (parent_call->first_child == NULL) {
parent_call->first_child = call;
call->sibling_next = call->sibling_prev = call;
} else {
call->sibling_next = parent_call->first_child;
call->sibling_prev = parent_call->first_child->sibling_prev;
call->sibling_next->sibling_prev = call->sibling_prev->sibling_next = call;
}
gpr_mu_unlock(&parent_call->mu);
}
if (gpr_time_cmp(send_deadline, gpr_inf_future(send_deadline.clock_type)) != if (gpr_time_cmp(send_deadline, gpr_inf_future(send_deadline.clock_type)) !=
0) { 0) {
set_deadline_alarm(call, send_deadline); set_deadline_alarm(call, send_deadline);
...@@ -427,6 +450,18 @@ static void destroy_call(void *call, int ignored_success) { ...@@ -427,6 +450,18 @@ static void destroy_call(void *call, int ignored_success) {
size_t i; size_t i;
grpc_call *c = call; grpc_call *c = call;
grpc_call *parent = c->parent; grpc_call *parent = c->parent;
if (parent) {
gpr_mu_lock(&parent->mu);
if (call == parent->first_child) {
parent->first_child = c->sibling_next;
if (c == parent->first_child) {
parent->first_child = NULL;
}
c->sibling_prev->sibling_next = c->sibling_next;
c->sibling_next->sibling_prev = c->sibling_prev;
}
gpr_mu_unlock(&parent->mu);
}
grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c)); grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c));
GRPC_CHANNEL_INTERNAL_UNREF(c->channel, "call"); GRPC_CHANNEL_INTERNAL_UNREF(c->channel, "call");
gpr_mu_destroy(&c->mu); gpr_mu_destroy(&c->mu);
...@@ -459,9 +494,6 @@ static void destroy_call(void *call, int ignored_success) { ...@@ -459,9 +494,6 @@ static void destroy_call(void *call, int ignored_success) {
GRPC_CQ_INTERNAL_UNREF(c->cq, "bind"); GRPC_CQ_INTERNAL_UNREF(c->cq, "bind");
} }
gpr_free(c); gpr_free(c);
if (parent) {
GRPC_CALL_INTERNAL_UNREF(parent, "child", 1);
}
} }
#ifdef GRPC_CALL_REF_COUNT_DEBUG #ifdef GRPC_CALL_REF_COUNT_DEBUG
...@@ -896,6 +928,8 @@ static int add_slice_to_message(grpc_call *call, gpr_slice slice) { ...@@ -896,6 +928,8 @@ static int add_slice_to_message(grpc_call *call, gpr_slice slice) {
static void call_on_done_recv(void *pc, int success) { static void call_on_done_recv(void *pc, int success) {
grpc_call *call = pc; grpc_call *call = pc;
grpc_call *child_call;
grpc_call *next_child_call;
size_t i; size_t i;
GRPC_TIMER_BEGIN(GRPC_PTAG_CALL_ON_DONE_RECV, 0); GRPC_TIMER_BEGIN(GRPC_PTAG_CALL_ON_DONE_RECV, 0);
lock(call); lock(call);
...@@ -929,6 +963,19 @@ static void call_on_done_recv(void *pc, int success) { ...@@ -929,6 +963,19 @@ static void call_on_done_recv(void *pc, int success) {
GPR_ASSERT(call->read_state <= READ_STATE_STREAM_CLOSED); GPR_ASSERT(call->read_state <= READ_STATE_STREAM_CLOSED);
call->read_state = READ_STATE_STREAM_CLOSED; call->read_state = READ_STATE_STREAM_CLOSED;
call->cancel_alarm |= call->have_alarm; call->cancel_alarm |= call->have_alarm;
/* propagate cancellation to any interested children */
child_call = call->first_child;
if (child_call != NULL) {
do {
next_child_call = child_call->sibling_next;
if (child_call->cancellation_is_inherited) {
GRPC_CALL_INTERNAL_REF(child_call, "propagate_cancel");
grpc_call_cancel(child_call);
GRPC_CALL_INTERNAL_UNREF(child_call, "propagate_cancel", 0);
}
child_call = next_child_call;
} while (child_call != call->first_child);
}
GRPC_CALL_INTERNAL_UNREF(call, "closed", 0); GRPC_CALL_INTERNAL_UNREF(call, "closed", 0);
} }
finish_read_ops(call); finish_read_ops(call);
......
...@@ -60,10 +60,10 @@ Call Channel::CreateCall(const RpcMethod& method, ClientContext* context, ...@@ -60,10 +60,10 @@ Call Channel::CreateCall(const RpcMethod& method, ClientContext* context,
CompletionQueue* cq) { CompletionQueue* cq) {
auto c_call = method.channel_tag() && context->authority().empty() auto c_call = method.channel_tag() && context->authority().empty()
? grpc_channel_create_registered_call( ? grpc_channel_create_registered_call(
c_channel_, NULL, GRPC_INHERIT_DEFAULTS, cq->cq(), c_channel_, NULL, GRPC_PROPAGATE_DEFAULTS, cq->cq(),
method.channel_tag(), context->raw_deadline()) method.channel_tag(), context->raw_deadline())
: grpc_channel_create_call( : grpc_channel_create_call(
c_channel_, NULL, GRPC_INHERIT_DEFAULTS, cq->cq(), c_channel_, NULL, GRPC_PROPAGATE_DEFAULTS, cq->cq(),
method.name(), context->authority().empty() method.name(), context->authority().empty()
? target_.c_str() ? target_.c_str()
: context->authority().c_str(), : context->authority().c_str(),
......
...@@ -511,7 +511,7 @@ NAN_METHOD(Call::New) { ...@@ -511,7 +511,7 @@ NAN_METHOD(Call::New) {
double deadline = args[2]->NumberValue(); double deadline = args[2]->NumberValue();
grpc_channel *wrapped_channel = channel->GetWrappedChannel(); grpc_channel *wrapped_channel = channel->GetWrappedChannel();
grpc_call *wrapped_call = grpc_channel_create_call( grpc_call *wrapped_call = grpc_channel_create_call(
wrapped_channel, NULL, GRPC_INHERIT_DEFAULTS, wrapped_channel, NULL, GRPC_PROPAGATE_DEFAULTS,
CompletionQueueAsyncWorker::GetQueue(), *method, channel->GetHost(), CompletionQueueAsyncWorker::GetQueue(), *method, channel->GetHost(),
MillisecondsToTimespec(deadline)); MillisecondsToTimespec(deadline));
call = new Call(wrapped_call); call = new Call(wrapped_call);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment