Skip to content
Snippets Groups Projects
Commit 83fb070f authored by Craig Tiller's avatar Craig Tiller
Browse files

Read path improvements

parent 31123db7
Branches
Tags
No related merge requests found
...@@ -62,7 +62,7 @@ typedef enum { ...@@ -62,7 +62,7 @@ typedef enum {
GRPC_CHTTP2_LIST_WRITTEN, GRPC_CHTTP2_LIST_WRITTEN,
GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE, GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE,
GRPC_CHTTP2_LIST_PARSING_SEEN, GRPC_CHTTP2_LIST_PARSING_SEEN,
GRPC_CHTTP2_LIST_CANCELLED_WAITING_FOR_PARSING, GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_PARSING,
GRPC_CHTTP2_LIST_INCOMING_WINDOW_UPDATED, GRPC_CHTTP2_LIST_INCOMING_WINDOW_UPDATED,
/** streams that are waiting to start because there are too many concurrent /** streams that are waiting to start because there are too many concurrent
streams on the connection */ streams on the connection */
...@@ -585,10 +585,10 @@ int grpc_chttp2_list_pop_waiting_for_concurrency( ...@@ -585,10 +585,10 @@ int grpc_chttp2_list_pop_waiting_for_concurrency(
grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global); grpc_chttp2_stream_global **stream_global);
void grpc_chttp2_list_add_cancelled_waiting_for_parsing( void grpc_chttp2_list_add_closed_waiting_for_parsing(
grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global); grpc_chttp2_stream_global *stream_global);
int grpc_chttp2_list_pop_cancelled_waiting_for_parsing( int grpc_chttp2_list_pop_closed_waiting_for_parsing(
grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global); grpc_chttp2_stream_global **stream_global);
...@@ -610,6 +610,8 @@ void grpc_chttp2_schedule_closure( ...@@ -610,6 +610,8 @@ void grpc_chttp2_schedule_closure(
grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream( grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream(
grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id); grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id);
void grpc_chttp2_parsing_remove_stream(
grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id);
grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream( grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream(
grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id); grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id);
......
...@@ -175,6 +175,10 @@ void grpc_chttp2_publish_reads( ...@@ -175,6 +175,10 @@ void grpc_chttp2_publish_reads(
/* updating closed status */ /* updating closed status */
if (stream_parsing->received_close) { if (stream_parsing->received_close) {
stream_global->read_closed = 1; stream_global->read_closed = 1;
if (stream_global->write_state != WRITE_STATE_OPEN) {
stream_global->in_stream_map = 0;
grpc_chttp2_parsing_remove_stream(transport_parsing, stream_parsing->id);
}
grpc_chttp2_list_add_read_write_state_changed(transport_global, grpc_chttp2_list_add_read_write_state_changed(transport_global,
stream_global); stream_global);
} }
......
...@@ -250,20 +250,20 @@ int grpc_chttp2_list_pop_waiting_for_concurrency( ...@@ -250,20 +250,20 @@ int grpc_chttp2_list_pop_waiting_for_concurrency(
return r; return r;
} }
void grpc_chttp2_list_add_cancelled_waiting_for_parsing( void grpc_chttp2_list_add_closed_waiting_for_parsing(
grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) { grpc_chttp2_stream_global *stream_global) {
stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global),
STREAM_FROM_GLOBAL(stream_global), STREAM_FROM_GLOBAL(stream_global),
GRPC_CHTTP2_LIST_CANCELLED_WAITING_FOR_PARSING); GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_PARSING);
} }
int grpc_chttp2_list_pop_cancelled_waiting_for_parsing( int grpc_chttp2_list_pop_closed_waiting_for_parsing(
grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global) { grpc_chttp2_stream_global **stream_global) {
grpc_chttp2_stream *stream; grpc_chttp2_stream *stream;
int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream, int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream,
GRPC_CHTTP2_LIST_CANCELLED_WAITING_FOR_PARSING); GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_PARSING);
*stream_global = &stream->global; *stream_global = &stream->global;
return r; return r;
} }
......
...@@ -78,9 +78,8 @@ static const grpc_transport_vtable vtable; ...@@ -78,9 +78,8 @@ static const grpc_transport_vtable vtable;
static void lock(grpc_chttp2_transport *t); static void lock(grpc_chttp2_transport *t);
static void unlock(grpc_chttp2_transport *t); static void unlock(grpc_chttp2_transport *t);
static void unlock_check_cancellations(grpc_chttp2_transport *t);
static void unlock_check_channel_callbacks(grpc_chttp2_transport *t); static void unlock_check_channel_callbacks(grpc_chttp2_transport *t);
static void unlock_check_reads(grpc_chttp2_transport *t); static void unlock_check_read_write_state(grpc_chttp2_transport *t);
/* forward declarations of various callbacks that we'll build closures around */ /* forward declarations of various callbacks that we'll build closures around */
static void writing_action(void *t, int iomgr_success_ignored); static void writing_action(void *t, int iomgr_success_ignored);
...@@ -198,6 +197,7 @@ static void init_transport(grpc_chttp2_transport *t, ...@@ -198,6 +197,7 @@ static void init_transport(grpc_chttp2_transport *t,
t->global.incoming_window = DEFAULT_WINDOW; t->global.incoming_window = DEFAULT_WINDOW;
t->global.connection_window_target = DEFAULT_CONNECTION_WINDOW_TARGET; t->global.connection_window_target = DEFAULT_CONNECTION_WINDOW_TARGET;
t->global.ping_counter = 1; t->global.ping_counter = 1;
t->global.pings.next = t->global.pings.prev = &t->global.pings;
t->parsing.is_client = is_client; t->parsing.is_client = is_client;
t->parsing.str_grpc_timeout = t->parsing.str_grpc_timeout =
grpc_mdstr_from_string(t->metadata_context, "grpc-timeout"); grpc_mdstr_from_string(t->metadata_context, "grpc-timeout");
...@@ -382,6 +382,7 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) { ...@@ -382,6 +382,7 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) {
GPR_ASSERT(s->global.published_state == GRPC_STREAM_CLOSED || GPR_ASSERT(s->global.published_state == GRPC_STREAM_CLOSED ||
s->global.id == 0); s->global.id == 0);
GPR_ASSERT(!s->global.in_stream_map);
grpc_chttp2_unregister_stream(t, s); grpc_chttp2_unregister_stream(t, s);
gpr_mu_unlock(&t->mu); gpr_mu_unlock(&t->mu);
...@@ -404,6 +405,12 @@ grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream( ...@@ -404,6 +405,12 @@ grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream(
return s ? &s->parsing : NULL; return s ? &s->parsing : NULL;
} }
void grpc_chttp2_parsing_remove_stream(
grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id) {
grpc_chttp2_transport *t = TRANSPORT_FROM_PARSING(transport_parsing);
grpc_chttp2_stream_map_delete(&t->parsing_stream_map, id);
}
grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream( grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream(
grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id) { grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id) {
grpc_chttp2_stream *accepting; grpc_chttp2_stream *accepting;
...@@ -448,8 +455,7 @@ static void unlock(grpc_chttp2_transport *t) { ...@@ -448,8 +455,7 @@ static void unlock(grpc_chttp2_transport *t) {
ref_transport(t); ref_transport(t);
grpc_chttp2_schedule_closure(&t->global, &t->writing_action, 1); grpc_chttp2_schedule_closure(&t->global, &t->writing_action, 1);
} }
unlock_check_cancellations(t); unlock_check_read_write_state(t);
unlock_check_reads(t);
/* unlock_check_parser(t); */ /* unlock_check_parser(t); */
unlock_check_channel_callbacks(t); unlock_check_channel_callbacks(t);
...@@ -668,17 +674,28 @@ static void send_ping(grpc_transport *gt, grpc_iomgr_closure *on_recv) { ...@@ -668,17 +674,28 @@ static void send_ping(grpc_transport *gt, grpc_iomgr_closure *on_recv) {
* INPUT PROCESSING * INPUT PROCESSING
*/ */
static void unlock_check_cancellations(grpc_chttp2_transport *t) { static grpc_stream_state compute_state(gpr_uint8 write_closed,
gpr_uint8 read_closed) {
if (write_closed && read_closed) return GRPC_STREAM_CLOSED;
if (write_closed) return GRPC_STREAM_SEND_CLOSED;
if (read_closed) return GRPC_STREAM_RECV_CLOSED;
return GRPC_STREAM_OPEN;
}
static void unlock_check_read_write_state(grpc_chttp2_transport *t) {
grpc_chttp2_transport_global *transport_global = &t->global; grpc_chttp2_transport_global *transport_global = &t->global;
grpc_chttp2_stream_global *stream_global; grpc_chttp2_stream_global *stream_global;
grpc_stream_state state;
/* if a stream is in the stream map, and gets cancelled, we need to ensure /* if a stream is in the stream map, and gets cancelled, we need to ensure
we are not parsing before continuing the cancellation to keep things in we are not parsing before continuing the cancellation to keep things in
a sane state */ a sane state */
if (!t->parsing_active) { if (!t->parsing_active) {
while (grpc_chttp2_list_pop_cancelled_waiting_for_parsing(transport_global, while (grpc_chttp2_list_pop_closed_waiting_for_parsing(transport_global,
&stream_global)) { &stream_global)) {
GPR_ASSERT(stream_global->in_stream_map); GPR_ASSERT(stream_global->in_stream_map);
GPR_ASSERT(stream_global->write_state != WRITE_STATE_OPEN);
GPR_ASSERT(stream_global->read_closed);
grpc_chttp2_stream_map_delete(&t->parsing_stream_map, stream_global->id); grpc_chttp2_stream_map_delete(&t->parsing_stream_map, stream_global->id);
stream_global->in_stream_map = 0; stream_global->in_stream_map = 0;
grpc_chttp2_list_add_read_write_state_changed(transport_global, grpc_chttp2_list_add_read_write_state_changed(transport_global,
...@@ -686,46 +703,27 @@ static void unlock_check_cancellations(grpc_chttp2_transport *t) { ...@@ -686,46 +703,27 @@ static void unlock_check_cancellations(grpc_chttp2_transport *t) {
} }
} }
#if 0 while (grpc_chttp2_list_pop_read_write_state_changed(transport_global, &stream_global)) {
grpc_chttp2_stream *s;
if (t->writing_active) {
return;
}
while ((s = stream_list_remove_head(t, CANCELLED))) {
s->global.read_closed = 1;
s->global.write_state = WRITE_STATE_SENT_CLOSE;
grpc_chttp2_list_add_read_write_state_changed(&t->global, &s->global);
}
#endif
}
static grpc_stream_state compute_state(gpr_uint8 write_closed,
gpr_uint8 read_closed) {
if (write_closed && read_closed) return GRPC_STREAM_CLOSED;
if (write_closed) return GRPC_STREAM_SEND_CLOSED;
if (read_closed) return GRPC_STREAM_RECV_CLOSED;
return GRPC_STREAM_OPEN;
}
static void unlock_check_reads(grpc_chttp2_transport *t) {
grpc_chttp2_stream_global *stream_global;
grpc_stream_state state;
while (grpc_chttp2_list_pop_read_write_state_changed(&t->global, &stream_global)) {
if (!stream_global->publish_sopb) { if (!stream_global->publish_sopb) {
continue; continue;
} }
if (stream_global->write_state != WRITE_STATE_OPEN && stream_global->read_closed && stream_global->in_stream_map) {
if (t->parsing_active) {
grpc_chttp2_list_add_closed_waiting_for_parsing(transport_global, stream_global);
} else {
grpc_chttp2_stream_map_delete(&t->parsing_stream_map, stream_global->id);
stream_global->in_stream_map = 0;
}
}
state = compute_state(stream_global->write_state == WRITE_STATE_SENT_CLOSE, stream_global->read_closed && !stream_global->in_stream_map); state = compute_state(stream_global->write_state == WRITE_STATE_SENT_CLOSE, stream_global->read_closed && !stream_global->in_stream_map);
gpr_log(GPR_DEBUG, "ws:%d rc:%d ism:%d => st:%d", stream_global->write_state, stream_global->read_closed, stream_global->in_stream_map, state); gpr_log(GPR_DEBUG, "cl:%d id:%d ws:%d rc:%d ism:%d => st:%d", t->global.is_client, stream_global->id, stream_global->write_state, stream_global->read_closed, stream_global->in_stream_map, state);
if (stream_global->incoming_sopb.nops == 0 && state == stream_global->published_state) { if (stream_global->incoming_sopb.nops == 0 && state == stream_global->published_state) {
continue; continue;
} }
grpc_chttp2_incoming_metadata_buffer_postprocess_sopb_and_begin_live_op(&stream_global->incoming_metadata, &stream_global->incoming_sopb, &stream_global->outstanding_metadata); grpc_chttp2_incoming_metadata_buffer_postprocess_sopb_and_begin_live_op(&stream_global->incoming_metadata, &stream_global->incoming_sopb, &stream_global->outstanding_metadata);
grpc_sopb_swap(stream_global->publish_sopb, &stream_global->incoming_sopb); grpc_sopb_swap(stream_global->publish_sopb, &stream_global->incoming_sopb);
stream_global->published_state = *stream_global->publish_state = state; stream_global->published_state = *stream_global->publish_state = state;
grpc_chttp2_schedule_closure(&t->global, stream_global->recv_done_closure, 1); grpc_chttp2_schedule_closure(transport_global, stream_global->recv_done_closure, 1);
stream_global->recv_done_closure = NULL; stream_global->recv_done_closure = NULL;
stream_global->publish_sopb = NULL; stream_global->publish_sopb = NULL;
stream_global->publish_state = NULL; stream_global->publish_state = NULL;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment