Skip to content
Snippets Groups Projects
Commit d7f12e30 authored by Craig Tiller's avatar Craig Tiller
Browse files

Fix accept_stream being called post-channel deletion

- Have the server clear the accept_stream callback prior to destroying
  the channel (required a small transport op protocol change)
- Have the transport not enact transport ops until parsing is completed
  (prevents accept_stream from disappearing mid-parse)
parent 40d27ba7
No related branches found
No related tags found
No related merge requests found
...@@ -251,7 +251,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, ...@@ -251,7 +251,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, true, NULL); grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, true, NULL);
GPR_ASSERT(op->set_accept_stream == NULL); GPR_ASSERT(op->set_accept_stream == false);
if (op->bind_pollset != NULL) { if (op->bind_pollset != NULL) {
grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties,
op->bind_pollset); op->bind_pollset);
......
...@@ -107,7 +107,7 @@ static void cuc_start_transport_op(grpc_exec_ctx *exec_ctx, ...@@ -107,7 +107,7 @@ static void cuc_start_transport_op(grpc_exec_ctx *exec_ctx,
grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, true, NULL); grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, true, NULL);
GPR_ASSERT(op->set_accept_stream == NULL); GPR_ASSERT(op->set_accept_stream == false);
GPR_ASSERT(op->bind_pollset == NULL); GPR_ASSERT(op->bind_pollset == NULL);
if (op->on_connectivity_state_change != NULL) { if (op->on_connectivity_state_change != NULL) {
......
...@@ -407,8 +407,15 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand) { ...@@ -407,8 +407,15 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand) {
maybe_finish_shutdown(exec_ctx, chand->server); maybe_finish_shutdown(exec_ctx, chand->server);
chand->finish_destroy_channel_closure.cb = finish_destroy_channel; chand->finish_destroy_channel_closure.cb = finish_destroy_channel;
chand->finish_destroy_channel_closure.cb_arg = chand; chand->finish_destroy_channel_closure.cb_arg = chand;
grpc_exec_ctx_enqueue(exec_ctx, &chand->finish_destroy_channel_closure, true,
NULL); grpc_transport_op op;
memset(&op, 0, sizeof(op));
op.set_accept_stream = true;
op.on_consumed = &chand->finish_destroy_channel_closure;
grpc_channel_next_op(exec_ctx,
grpc_channel_stack_element(
grpc_channel_get_channel_stack(chand->channel), 0),
&op);
} }
static void finish_start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_server *server, static void finish_start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_server *server,
...@@ -971,7 +978,8 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, ...@@ -971,7 +978,8 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s,
GRPC_CHANNEL_INTERNAL_REF(channel, "connectivity"); GRPC_CHANNEL_INTERNAL_REF(channel, "connectivity");
memset(&op, 0, sizeof(op)); memset(&op, 0, sizeof(op));
op.set_accept_stream = accept_stream; op.set_accept_stream = true;
op.set_accept_stream_fn = accept_stream;
op.set_accept_stream_user_data = chand; op.set_accept_stream_user_data = chand;
op.on_connectivity_state_change = &chand->channel_connectivity_changed; op.on_connectivity_state_change = &chand->channel_connectivity_changed;
op.connectivity_state = &chand->connectivity_state; op.connectivity_state = &chand->connectivity_state;
......
...@@ -358,6 +358,9 @@ struct grpc_chttp2_transport { ...@@ -358,6 +358,9 @@ struct grpc_chttp2_transport {
/** connectivity tracking */ /** connectivity tracking */
grpc_connectivity_state_tracker state_tracker; grpc_connectivity_state_tracker state_tracker;
} channel_callback; } channel_callback;
/** Transport op to be applied post-parsing */
grpc_transport_op *post_parsing_op;
}; };
typedef struct { typedef struct {
......
...@@ -951,12 +951,10 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, ...@@ -951,12 +951,10 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx,
unlock(exec_ctx, t); unlock(exec_ctx, t);
} }
static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_transport_op *op) { grpc_transport_op *op) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; bool close_transport = false;
int close_transport = 0;
lock(t);
grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, true, NULL); grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, true, NULL);
...@@ -975,8 +973,8 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, ...@@ -975,8 +973,8 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
close_transport = !grpc_chttp2_has_streams(t); close_transport = !grpc_chttp2_has_streams(t);
} }
if (op->set_accept_stream != NULL) { if (op->set_accept_stream) {
t->channel_callback.accept_stream = op->set_accept_stream; t->channel_callback.accept_stream = op->set_accept_stream_fn;
t->channel_callback.accept_stream_user_data = t->channel_callback.accept_stream_user_data =
op->set_accept_stream_user_data; op->set_accept_stream_user_data;
} }
...@@ -997,15 +995,29 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, ...@@ -997,15 +995,29 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
close_transport_locked(exec_ctx, t); close_transport_locked(exec_ctx, t);
} }
unlock(exec_ctx, t);
if (close_transport) { if (close_transport) {
lock(t);
close_transport_locked(exec_ctx, t); close_transport_locked(exec_ctx, t);
unlock(exec_ctx, t);
} }
} }
static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_transport_op *op) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
lock(t);
/* Let's be overly cautious: don't change any state while we're parsing */
if (t->parsing_active) {
GPR_ASSERT(t->post_parsing_op == NULL);
t->post_parsing_op = gpr_malloc(sizeof(*op));
memcpy(t->post_parsing_op, op, sizeof(*op));
} else {
perform_transport_op_locked(exec_ctx, t, op);
}
unlock(exec_ctx, t);
}
/******************************************************************************* /*******************************************************************************
* INPUT PROCESSING * INPUT PROCESSING
*/ */
...@@ -1401,6 +1413,13 @@ static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, bool success) { ...@@ -1401,6 +1413,13 @@ static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, bool success) {
/* handle higher level things */ /* handle higher level things */
grpc_chttp2_publish_reads(exec_ctx, transport_global, transport_parsing); grpc_chttp2_publish_reads(exec_ctx, transport_global, transport_parsing);
t->parsing_active = 0; t->parsing_active = 0;
/* handle delayed transport ops (if there is one) */
if (t->post_parsing_op) {
grpc_transport_op *op = t->post_parsing_op;
t->post_parsing_op = NULL;
perform_transport_op_locked(exec_ctx, t, op);
gpr_free(op);
}
/* if a stream is in the stream map, and gets cancelled, we need to ensure /* if a stream is in the stream map, and gets cancelled, we need to ensure
* we are not parsing before continuing the cancellation to keep things in * we are not parsing before continuing the cancellation to keep things in
* a sane state */ * a sane state */
......
...@@ -139,8 +139,10 @@ typedef struct grpc_transport_op { ...@@ -139,8 +139,10 @@ typedef struct grpc_transport_op {
gpr_slice *goaway_message; gpr_slice *goaway_message;
/** set the callback for accepting new streams; /** set the callback for accepting new streams;
this is a permanent callback, unlike the other one-shot closures */ this is a permanent callback, unlike the other one-shot closures */
void (*set_accept_stream)(grpc_exec_ctx *exec_ctx, void *user_data, bool set_accept_stream;
grpc_transport *transport, const void *server_data); void (*set_accept_stream_fn)(grpc_exec_ctx *exec_ctx, void *user_data,
grpc_transport *transport,
const void *server_data);
void *set_accept_stream_user_data; void *set_accept_stream_user_data;
/** add this transport to a pollset */ /** add this transport to a pollset */
grpc_pollset *bind_pollset; grpc_pollset *bind_pollset;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment