diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 86e18f853e0b91cac678424181b28b5e76ca7edb..dae1b1e1b794f6158e3ed1fbf017c66c277405d3 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -61,6 +61,8 @@ #define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024) #define MAX_WINDOW 0x7fffffffu +#define MAX_CLIENT_STREAM_ID 0x7fffffffu + #define CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" #define CLIENT_CONNECT_STRLEN 24 @@ -1043,16 +1045,36 @@ static void perform_write(transport *t, grpc_endpoint *ep) { } } +static void add_goaway(transport *t, gpr_uint32 goaway_error, gpr_slice goaway_text) { + if (t->num_pending_goaways == t->cap_pending_goaways) { + t->cap_pending_goaways = GPR_MAX(1, t->cap_pending_goaways * 2); + t->pending_goaways = + gpr_realloc(t->pending_goaways, + sizeof(pending_goaway) * t->cap_pending_goaways); + } + t->pending_goaways[t->num_pending_goaways].status = + grpc_chttp2_http2_error_to_grpc_status(goaway_error); + t->pending_goaways[t->num_pending_goaways].debug = goaway_text; + t->num_pending_goaways++; +} + + static void maybe_start_some_streams(transport *t) { + /* start streams where we have free stream ids and free concurrency */ while ( + t->next_stream_id <= MAX_CLIENT_STREAM_ID && grpc_chttp2_stream_map_size(&t->stream_map) < t->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS]) { stream *s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY); - if (!s) break; + if (!s) return; IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: Allocating new stream %p to id %d", t->is_client ? "CLI" : "SVR", s, t->next_stream_id)); + if (t->next_stream_id == MAX_CLIENT_STREAM_ID) { + add_goaway(t, GRPC_CHTTP2_NO_ERROR, gpr_slice_from_copied_string("Exceeded sequence number limit")); + } + GPR_ASSERT(s->id == 0); s->id = t->next_stream_id; t->next_stream_id += 2; @@ -1063,6 +1085,13 @@ static void maybe_start_some_streams(transport *t) { grpc_chttp2_stream_map_add(&t->stream_map, s->id, s); stream_list_join(t, s, WRITABLE); } + /* cancel out streams that will never be started */ + while (t->next_stream_id > MAX_CLIENT_STREAM_ID) { + stream *s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY); + if (!s) return; + + cancel_stream(t, s, GRPC_STATUS_UNAVAILABLE, grpc_chttp2_grpc_status_to_http2_error(GRPC_STATUS_UNAVAILABLE), NULL, 0); + } } static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) { @@ -1620,16 +1649,7 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) { grpc_chttp2_ping_create(1, t->simple_parsers.ping.opaque_8bytes)); } if (st.goaway) { - if (t->num_pending_goaways == t->cap_pending_goaways) { - t->cap_pending_goaways = GPR_MAX(1, t->cap_pending_goaways * 2); - t->pending_goaways = - gpr_realloc(t->pending_goaways, - sizeof(pending_goaway) * t->cap_pending_goaways); - } - t->pending_goaways[t->num_pending_goaways].status = - grpc_chttp2_http2_error_to_grpc_status(st.goaway_error); - t->pending_goaways[t->num_pending_goaways].debug = st.goaway_text; - t->num_pending_goaways++; + add_goaway(t, st.goaway_error, st.goaway_text); } if (st.process_ping_reply) { for (i = 0; i < t->ping_count; i++) { diff --git a/test/core/end2end/tests/simple_request_with_high_initial_sequence_number.c b/test/core/end2end/tests/simple_request_with_high_initial_sequence_number.c index 39ab184a424d2a5068c81ffd2fdd5c60fa17686a..538291a5f245c88c038238a746ed5df5f6ef74f9 100644 --- a/test/core/end2end/tests/simple_request_with_high_initial_sequence_number.c +++ b/test/core/end2end/tests/simple_request_with_high_initial_sequence_number.c @@ -217,4 +217,7 @@ static void test_invoke_10_simple_requests(grpc_end2end_test_config config, int void grpc_end2end_tests(grpc_end2end_test_config config) { test_invoke_10_simple_requests(config, 16777213); + if (config.feature_mask & FEATURE_MASK_SUPPORTS_DELAYED_CONNECTION) { + test_invoke_10_simple_requests(config, 2147483645); + } }