Skip to content
Snippets Groups Projects
Commit ab630735 authored by Craig Tiller's avatar Craig Tiller
Browse files

Addressing comments

parent 48f0a13f
No related branches found
No related tags found
No related merge requests found
...@@ -72,56 +72,56 @@ typedef enum { ...@@ -72,56 +72,56 @@ typedef enum {
/* deframer state for the overall http2 stream of bytes */ /* deframer state for the overall http2 stream of bytes */
typedef enum { typedef enum {
/* prefix: one entry per http2 connection prefix byte */ /* prefix: one entry per http2 connection prefix byte */
DTS_CLIENT_PREFIX_0 = 0, GRPC_DTS_CLIENT_PREFIX_0 = 0,
DTS_CLIENT_PREFIX_1, GRPC_DTS_CLIENT_PREFIX_1,
DTS_CLIENT_PREFIX_2, GRPC_DTS_CLIENT_PREFIX_2,
DTS_CLIENT_PREFIX_3, GRPC_DTS_CLIENT_PREFIX_3,
DTS_CLIENT_PREFIX_4, GRPC_DTS_CLIENT_PREFIX_4,
DTS_CLIENT_PREFIX_5, GRPC_DTS_CLIENT_PREFIX_5,
DTS_CLIENT_PREFIX_6, GRPC_DTS_CLIENT_PREFIX_6,
DTS_CLIENT_PREFIX_7, GRPC_DTS_CLIENT_PREFIX_7,
DTS_CLIENT_PREFIX_8, GRPC_DTS_CLIENT_PREFIX_8,
DTS_CLIENT_PREFIX_9, GRPC_DTS_CLIENT_PREFIX_9,
DTS_CLIENT_PREFIX_10, GRPC_DTS_CLIENT_PREFIX_10,
DTS_CLIENT_PREFIX_11, GRPC_DTS_CLIENT_PREFIX_11,
DTS_CLIENT_PREFIX_12, GRPC_DTS_CLIENT_PREFIX_12,
DTS_CLIENT_PREFIX_13, GRPC_DTS_CLIENT_PREFIX_13,
DTS_CLIENT_PREFIX_14, GRPC_DTS_CLIENT_PREFIX_14,
DTS_CLIENT_PREFIX_15, GRPC_DTS_CLIENT_PREFIX_15,
DTS_CLIENT_PREFIX_16, GRPC_DTS_CLIENT_PREFIX_16,
DTS_CLIENT_PREFIX_17, GRPC_DTS_CLIENT_PREFIX_17,
DTS_CLIENT_PREFIX_18, GRPC_DTS_CLIENT_PREFIX_18,
DTS_CLIENT_PREFIX_19, GRPC_DTS_CLIENT_PREFIX_19,
DTS_CLIENT_PREFIX_20, GRPC_DTS_CLIENT_PREFIX_20,
DTS_CLIENT_PREFIX_21, GRPC_DTS_CLIENT_PREFIX_21,
DTS_CLIENT_PREFIX_22, GRPC_DTS_CLIENT_PREFIX_22,
DTS_CLIENT_PREFIX_23, GRPC_DTS_CLIENT_PREFIX_23,
/* frame header byte 0... */ /* frame header byte 0... */
/* must follow from the prefix states */ /* must follow from the prefix states */
DTS_FH_0, GRPC_DTS_FH_0,
DTS_FH_1, GRPC_DTS_FH_1,
DTS_FH_2, GRPC_DTS_FH_2,
DTS_FH_3, GRPC_DTS_FH_3,
DTS_FH_4, GRPC_DTS_FH_4,
DTS_FH_5, GRPC_DTS_FH_5,
DTS_FH_6, GRPC_DTS_FH_6,
DTS_FH_7, GRPC_DTS_FH_7,
/* ... frame header byte 8 */ /* ... frame header byte 8 */
DTS_FH_8, GRPC_DTS_FH_8,
/* inside a http2 frame */ /* inside a http2 frame */
DTS_FRAME GRPC_DTS_FRAME
} grpc_chttp2_deframe_transport_state; } grpc_chttp2_deframe_transport_state;
typedef enum { typedef enum {
WRITE_STATE_OPEN, GRPC_WRITE_STATE_OPEN,
WRITE_STATE_QUEUED_CLOSE, GRPC_WRITE_STATE_QUEUED_CLOSE,
WRITE_STATE_SENT_CLOSE GRPC_WRITE_STATE_SENT_CLOSE
} grpc_chttp2_write_state; } grpc_chttp2_write_state;
typedef enum { typedef enum {
DONT_SEND_CLOSED = 0, GRPC_DONT_SEND_CLOSED = 0,
SEND_CLOSED, GRPC_SEND_CLOSED,
SEND_CLOSED_WITH_RST_STREAM GRPC_SEND_CLOSED_WITH_RST_STREAM
} grpc_chttp2_send_closed; } grpc_chttp2_send_closed;
typedef struct { typedef struct {
...@@ -143,14 +143,14 @@ typedef enum { ...@@ -143,14 +143,14 @@ typedef enum {
/* We keep several sets of connection wide parameters */ /* We keep several sets of connection wide parameters */
typedef enum { typedef enum {
/* The settings our peer has asked for (and we have acked) */ /* The settings our peer has asked for (and we have acked) */
PEER_SETTINGS = 0, GRPC_PEER_SETTINGS = 0,
/* The settings we'd like to have */ /* The settings we'd like to have */
LOCAL_SETTINGS, GRPC_LOCAL_SETTINGS,
/* The settings we've published to our peer */ /* The settings we've published to our peer */
SENT_SETTINGS, GRPC_SENT_SETTINGS,
/* The settings the peer has acked */ /* The settings the peer has acked */
ACKED_SETTINGS, GRPC_ACKED_SETTINGS,
NUM_SETTING_SETS GRPC_NUM_SETTING_SETS
} grpc_chttp2_setting_set; } grpc_chttp2_setting_set;
/* Outstanding ping request data */ /* Outstanding ping request data */
...@@ -183,7 +183,7 @@ typedef struct { ...@@ -183,7 +183,7 @@ typedef struct {
/** bitmask of setting indexes to send out */ /** bitmask of setting indexes to send out */
gpr_uint32 force_send_settings; gpr_uint32 force_send_settings;
/** settings values */ /** settings values */
gpr_uint32 settings[NUM_SETTING_SETS][GRPC_CHTTP2_NUM_SETTINGS]; gpr_uint32 settings[GRPC_NUM_SETTING_SETS][GRPC_CHTTP2_NUM_SETTINGS];
/** has there been a connection level error, and have we notified /** has there been a connection level error, and have we notified
anyone about it? */ anyone about it? */
...@@ -352,34 +352,6 @@ struct grpc_chttp2_transport { ...@@ -352,34 +352,6 @@ struct grpc_chttp2_transport {
/** closure for notifying transport closure */ /** closure for notifying transport closure */
grpc_iomgr_closure notify_closed; grpc_iomgr_closure notify_closed;
} channel_callback; } channel_callback;
#if 0
/* basic state management - what are we doing at the moment? */
gpr_uint8 reading;
/** are we calling back any grpc_transport_op completion events */
gpr_uint8 calling_back_ops;
gpr_uint8 destroying;
gpr_uint8 closed;
/* stream indexing */
gpr_uint32 next_stream_id;
/* window management */
gpr_uint32 outgoing_window_update;
/* state for a stream that's not yet been created */
grpc_stream_op_buffer new_stream_sopb;
/* stream ops that need to be destroyed, but outside of the lock */
grpc_stream_op_buffer nuke_later_sopb;
/* pings */
gpr_int64 ping_counter;
grpc_chttp2_stream **accepting_stream;
#endif
}; };
typedef struct { typedef struct {
...@@ -451,14 +423,6 @@ struct grpc_chttp2_stream_parsing { ...@@ -451,14 +423,6 @@ struct grpc_chttp2_stream_parsing {
/** incoming metadata */ /** incoming metadata */
grpc_chttp2_incoming_metadata_buffer incoming_metadata; grpc_chttp2_incoming_metadata_buffer incoming_metadata;
/*
grpc_linked_mdelem *incoming_metadata;
size_t incoming_metadata_count;
size_t incoming_metadata_capacity;
grpc_linked_mdelem *old_incoming_metadata;
gpr_timespec incoming_deadline;
*/
}; };
struct grpc_chttp2_stream { struct grpc_chttp2_stream {
...@@ -468,14 +432,6 @@ struct grpc_chttp2_stream { ...@@ -468,14 +432,6 @@ struct grpc_chttp2_stream {
grpc_chttp2_stream_link links[STREAM_LIST_COUNT]; grpc_chttp2_stream_link links[STREAM_LIST_COUNT];
gpr_uint8 included[STREAM_LIST_COUNT]; gpr_uint8 included[STREAM_LIST_COUNT];
#if 0
gpr_uint32 outgoing_window_update;
gpr_uint8 cancelled;
grpc_stream_state callback_state;
grpc_stream_op_buffer callback_sopb;
#endif
}; };
/** Transport writing call flow: /** Transport writing call flow:
...@@ -502,14 +458,15 @@ void grpc_chttp2_cleanup_writing(grpc_chttp2_transport_global *global, ...@@ -502,14 +458,15 @@ void grpc_chttp2_cleanup_writing(grpc_chttp2_transport_global *global,
void grpc_chttp2_prepare_to_read(grpc_chttp2_transport_global *global, void grpc_chttp2_prepare_to_read(grpc_chttp2_transport_global *global,
grpc_chttp2_transport_parsing *parsing); grpc_chttp2_transport_parsing *parsing);
/** Process one slice of incoming data */ /** Process one slice of incoming data; return 1 if the connection is still
viable after reading, or 0 if the connection should be torn down */
int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing,
gpr_slice slice); gpr_slice slice);
void grpc_chttp2_publish_reads(grpc_chttp2_transport_global *global, void grpc_chttp2_publish_reads(grpc_chttp2_transport_global *global,
grpc_chttp2_transport_parsing *parsing); grpc_chttp2_transport_parsing *parsing);
/** Get a writable stream /** Get a writable stream
\return non-zero if there was a stream available */ returns non-zero if there was a stream available */
void grpc_chttp2_list_add_writable_stream( void grpc_chttp2_list_add_writable_stream(
grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global); grpc_chttp2_stream_global *stream_global);
...@@ -622,10 +579,10 @@ void grpc_chttp2_parsing_become_skip_parser( ...@@ -622,10 +579,10 @@ void grpc_chttp2_parsing_become_skip_parser(
extern int grpc_http_trace; extern int grpc_http_trace;
extern int grpc_flowctl_trace; extern int grpc_flowctl_trace;
#define IF_TRACING(stmt) \ #define GRPC_CHTTP2_IF_TRACING(stmt) \
if (!(grpc_http_trace)) \ if (!(grpc_http_trace)) \
; \ ; \
else \ else \
stmt stmt
#define GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(reason, transport, context, var, \ #define GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(reason, transport, context, var, \
......
...@@ -117,15 +117,15 @@ void grpc_chttp2_publish_reads( ...@@ -117,15 +117,15 @@ void grpc_chttp2_publish_reads(
/* update global settings */ /* update global settings */
if (transport_parsing->settings_updated) { if (transport_parsing->settings_updated) {
memcpy(transport_global->settings[PEER_SETTINGS], memcpy(transport_global->settings[GRPC_PEER_SETTINGS],
transport_parsing->settings, sizeof(transport_parsing->settings)); transport_parsing->settings, sizeof(transport_parsing->settings));
transport_parsing->settings_updated = 0; transport_parsing->settings_updated = 0;
} }
/* update settings based on ack if received */ /* update settings based on ack if received */
if (transport_parsing->settings_ack_received) { if (transport_parsing->settings_ack_received) {
memcpy(transport_global->settings[ACKED_SETTINGS], memcpy(transport_global->settings[GRPC_ACKED_SETTINGS],
transport_global->settings[SENT_SETTINGS], transport_global->settings[GRPC_SENT_SETTINGS],
GRPC_CHTTP2_NUM_SETTINGS * sizeof(gpr_uint32)); GRPC_CHTTP2_NUM_SETTINGS * sizeof(gpr_uint32));
transport_parsing->settings_ack_received = 0; transport_parsing->settings_ack_received = 0;
} }
...@@ -238,34 +238,34 @@ int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, ...@@ -238,34 +238,34 @@ int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing,
if (cur == end) return 1; if (cur == end) return 1;
switch (transport_parsing->deframe_state) { switch (transport_parsing->deframe_state) {
case DTS_CLIENT_PREFIX_0: case GRPC_DTS_CLIENT_PREFIX_0:
case DTS_CLIENT_PREFIX_1: case GRPC_DTS_CLIENT_PREFIX_1:
case DTS_CLIENT_PREFIX_2: case GRPC_DTS_CLIENT_PREFIX_2:
case DTS_CLIENT_PREFIX_3: case GRPC_DTS_CLIENT_PREFIX_3:
case DTS_CLIENT_PREFIX_4: case GRPC_DTS_CLIENT_PREFIX_4:
case DTS_CLIENT_PREFIX_5: case GRPC_DTS_CLIENT_PREFIX_5:
case DTS_CLIENT_PREFIX_6: case GRPC_DTS_CLIENT_PREFIX_6:
case DTS_CLIENT_PREFIX_7: case GRPC_DTS_CLIENT_PREFIX_7:
case DTS_CLIENT_PREFIX_8: case GRPC_DTS_CLIENT_PREFIX_8:
case DTS_CLIENT_PREFIX_9: case GRPC_DTS_CLIENT_PREFIX_9:
case DTS_CLIENT_PREFIX_10: case GRPC_DTS_CLIENT_PREFIX_10:
case DTS_CLIENT_PREFIX_11: case GRPC_DTS_CLIENT_PREFIX_11:
case DTS_CLIENT_PREFIX_12: case GRPC_DTS_CLIENT_PREFIX_12:
case DTS_CLIENT_PREFIX_13: case GRPC_DTS_CLIENT_PREFIX_13:
case DTS_CLIENT_PREFIX_14: case GRPC_DTS_CLIENT_PREFIX_14:
case DTS_CLIENT_PREFIX_15: case GRPC_DTS_CLIENT_PREFIX_15:
case DTS_CLIENT_PREFIX_16: case GRPC_DTS_CLIENT_PREFIX_16:
case DTS_CLIENT_PREFIX_17: case GRPC_DTS_CLIENT_PREFIX_17:
case DTS_CLIENT_PREFIX_18: case GRPC_DTS_CLIENT_PREFIX_18:
case DTS_CLIENT_PREFIX_19: case GRPC_DTS_CLIENT_PREFIX_19:
case DTS_CLIENT_PREFIX_20: case GRPC_DTS_CLIENT_PREFIX_20:
case DTS_CLIENT_PREFIX_21: case GRPC_DTS_CLIENT_PREFIX_21:
case DTS_CLIENT_PREFIX_22: case GRPC_DTS_CLIENT_PREFIX_22:
case DTS_CLIENT_PREFIX_23: case GRPC_DTS_CLIENT_PREFIX_23:
while (cur != end && transport_parsing->deframe_state != DTS_FH_0) { while (cur != end && transport_parsing->deframe_state != GRPC_DTS_FH_0) {
if (*cur != GRPC_CHTTP2_CLIENT_CONNECT_STRING[transport_parsing if (*cur != GRPC_CHTTP2_CLIENT_CONNECT_STRING[transport_parsing
->deframe_state]) { ->deframe_state]) {
gpr_log(GPR_ERROR, gpr_log(GPR_INFO,
"Connect string mismatch: expected '%c' (%d) got '%c' (%d) " "Connect string mismatch: expected '%c' (%d) got '%c' (%d) "
"at byte %d", "at byte %d",
GRPC_CHTTP2_CLIENT_CONNECT_STRING[transport_parsing GRPC_CHTTP2_CLIENT_CONNECT_STRING[transport_parsing
...@@ -283,74 +283,74 @@ int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, ...@@ -283,74 +283,74 @@ int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing,
} }
/* fallthrough */ /* fallthrough */
dts_fh_0: dts_fh_0:
case DTS_FH_0: case GRPC_DTS_FH_0:
GPR_ASSERT(cur < end); GPR_ASSERT(cur < end);
transport_parsing->incoming_frame_size = ((gpr_uint32)*cur) << 16; transport_parsing->incoming_frame_size = ((gpr_uint32)*cur) << 16;
if (++cur == end) { if (++cur == end) {
transport_parsing->deframe_state = DTS_FH_1; transport_parsing->deframe_state = GRPC_DTS_FH_1;
return 1; return 1;
} }
/* fallthrough */ /* fallthrough */
case DTS_FH_1: case GRPC_DTS_FH_1:
GPR_ASSERT(cur < end); GPR_ASSERT(cur < end);
transport_parsing->incoming_frame_size |= ((gpr_uint32)*cur) << 8; transport_parsing->incoming_frame_size |= ((gpr_uint32)*cur) << 8;
if (++cur == end) { if (++cur == end) {
transport_parsing->deframe_state = DTS_FH_2; transport_parsing->deframe_state = GRPC_DTS_FH_2;
return 1; return 1;
} }
/* fallthrough */ /* fallthrough */
case DTS_FH_2: case GRPC_DTS_FH_2:
GPR_ASSERT(cur < end); GPR_ASSERT(cur < end);
transport_parsing->incoming_frame_size |= *cur; transport_parsing->incoming_frame_size |= *cur;
if (++cur == end) { if (++cur == end) {
transport_parsing->deframe_state = DTS_FH_3; transport_parsing->deframe_state = GRPC_DTS_FH_3;
return 1; return 1;
} }
/* fallthrough */ /* fallthrough */
case DTS_FH_3: case GRPC_DTS_FH_3:
GPR_ASSERT(cur < end); GPR_ASSERT(cur < end);
transport_parsing->incoming_frame_type = *cur; transport_parsing->incoming_frame_type = *cur;
if (++cur == end) { if (++cur == end) {
transport_parsing->deframe_state = DTS_FH_4; transport_parsing->deframe_state = GRPC_DTS_FH_4;
return 1; return 1;
} }
/* fallthrough */ /* fallthrough */
case DTS_FH_4: case GRPC_DTS_FH_4:
GPR_ASSERT(cur < end); GPR_ASSERT(cur < end);
transport_parsing->incoming_frame_flags = *cur; transport_parsing->incoming_frame_flags = *cur;
if (++cur == end) { if (++cur == end) {
transport_parsing->deframe_state = DTS_FH_5; transport_parsing->deframe_state = GRPC_DTS_FH_5;
return 1; return 1;
} }
/* fallthrough */ /* fallthrough */
case DTS_FH_5: case GRPC_DTS_FH_5:
GPR_ASSERT(cur < end); GPR_ASSERT(cur < end);
transport_parsing->incoming_stream_id = (((gpr_uint32)*cur) & 0x7f) << 24; transport_parsing->incoming_stream_id = (((gpr_uint32)*cur) & 0x7f) << 24;
if (++cur == end) { if (++cur == end) {
transport_parsing->deframe_state = DTS_FH_6; transport_parsing->deframe_state = GRPC_DTS_FH_6;
return 1; return 1;
} }
/* fallthrough */ /* fallthrough */
case DTS_FH_6: case GRPC_DTS_FH_6:
GPR_ASSERT(cur < end); GPR_ASSERT(cur < end);
transport_parsing->incoming_stream_id |= ((gpr_uint32)*cur) << 16; transport_parsing->incoming_stream_id |= ((gpr_uint32)*cur) << 16;
if (++cur == end) { if (++cur == end) {
transport_parsing->deframe_state = DTS_FH_7; transport_parsing->deframe_state = GRPC_DTS_FH_7;
return 1; return 1;
} }
/* fallthrough */ /* fallthrough */
case DTS_FH_7: case GRPC_DTS_FH_7:
GPR_ASSERT(cur < end); GPR_ASSERT(cur < end);
transport_parsing->incoming_stream_id |= ((gpr_uint32)*cur) << 8; transport_parsing->incoming_stream_id |= ((gpr_uint32)*cur) << 8;
if (++cur == end) { if (++cur == end) {
transport_parsing->deframe_state = DTS_FH_8; transport_parsing->deframe_state = GRPC_DTS_FH_8;
return 1; return 1;
} }
/* fallthrough */ /* fallthrough */
case DTS_FH_8: case GRPC_DTS_FH_8:
GPR_ASSERT(cur < end); GPR_ASSERT(cur < end);
transport_parsing->incoming_stream_id |= ((gpr_uint32)*cur); transport_parsing->incoming_stream_id |= ((gpr_uint32)*cur);
transport_parsing->deframe_state = DTS_FRAME; transport_parsing->deframe_state = GRPC_DTS_FRAME;
if (!init_frame_parser(transport_parsing)) { if (!init_frame_parser(transport_parsing)) {
return 0; return 0;
} }
...@@ -364,7 +364,7 @@ int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, ...@@ -364,7 +364,7 @@ int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing,
} }
transport_parsing->incoming_stream = NULL; transport_parsing->incoming_stream = NULL;
if (++cur == end) { if (++cur == end) {
transport_parsing->deframe_state = DTS_FH_0; transport_parsing->deframe_state = GRPC_DTS_FH_0;
return 1; return 1;
} }
goto dts_fh_0; /* loop */ goto dts_fh_0; /* loop */
...@@ -373,7 +373,7 @@ int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, ...@@ -373,7 +373,7 @@ int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing,
return 1; return 1;
} }
/* fallthrough */ /* fallthrough */
case DTS_FRAME: case GRPC_DTS_FRAME:
GPR_ASSERT(cur < end); GPR_ASSERT(cur < end);
if ((gpr_uint32)(end - cur) == transport_parsing->incoming_frame_size) { if ((gpr_uint32)(end - cur) == transport_parsing->incoming_frame_size) {
if (!parse_frame_slice( if (!parse_frame_slice(
...@@ -381,7 +381,7 @@ int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, ...@@ -381,7 +381,7 @@ int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing,
gpr_slice_sub_no_ref(slice, cur - beg, end - beg), 1)) { gpr_slice_sub_no_ref(slice, cur - beg, end - beg), 1)) {
return 0; return 0;
} }
transport_parsing->deframe_state = DTS_FH_0; transport_parsing->deframe_state = GRPC_DTS_FH_0;
transport_parsing->incoming_stream = NULL; transport_parsing->incoming_stream = NULL;
return 1; return 1;
} else if ((gpr_uint32)(end - cur) > } else if ((gpr_uint32)(end - cur) >
...@@ -582,10 +582,10 @@ static void on_header(void *tp, grpc_mdelem *md) { ...@@ -582,10 +582,10 @@ static void on_header(void *tp, grpc_mdelem *md) {
GPR_ASSERT(stream_parsing); GPR_ASSERT(stream_parsing);
IF_TRACING(gpr_log(GPR_INFO, "HTTP:%d:HDR: %s: %s", stream_parsing->id, GRPC_CHTTP2_IF_TRACING(gpr_log(
transport_parsing->is_client ? "CLI" : "SVR", GPR_INFO, "HTTP:%d:HDR: %s: %s", stream_parsing->id,
grpc_mdstr_as_c_string(md->key), transport_parsing->is_client ? "CLI" : "SVR",
grpc_mdstr_as_c_string(md->value))); grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value)));
if (md->key == transport_parsing->str_grpc_timeout) { if (md->key == transport_parsing->str_grpc_timeout) {
gpr_timespec *cached_timeout = grpc_mdelem_get_user_data(md, free_timeout); gpr_timespec *cached_timeout = grpc_mdelem_get_user_data(md, free_timeout);
......
...@@ -344,10 +344,9 @@ void grpc_chttp2_for_all_streams( ...@@ -344,10 +344,9 @@ void grpc_chttp2_for_all_streams(
void (*cb)(grpc_chttp2_transport_global *transport_global, void *user_data, void (*cb)(grpc_chttp2_transport_global *transport_global, void *user_data,
grpc_chttp2_stream_global *stream_global)) { grpc_chttp2_stream_global *stream_global)) {
grpc_chttp2_stream *s; grpc_chttp2_stream *s;
for (s = TRANSPORT_FROM_GLOBAL(transport_global) grpc_chttp2_transport *t = TRANSPORT_FROM_GLOBAL(transport_global);
->lists[GRPC_CHTTP2_LIST_ALL_STREAMS] for (s = t->lists[GRPC_CHTTP2_LIST_ALL_STREAMS].head; s != NULL;
.head; s = s->links[GRPC_CHTTP2_LIST_ALL_STREAMS].next) {
s; s = s->links[GRPC_CHTTP2_LIST_ALL_STREAMS].next) {
cb(transport_global, user_data, &s->global); cb(transport_global, user_data, &s->global);
} }
} }
...@@ -54,10 +54,10 @@ int grpc_chttp2_unlocking_check_writes( ...@@ -54,10 +54,10 @@ int grpc_chttp2_unlocking_check_writes(
!transport_global->sent_local_settings) { !transport_global->sent_local_settings) {
gpr_slice_buffer_add( gpr_slice_buffer_add(
&transport_writing->outbuf, &transport_writing->outbuf,
grpc_chttp2_settings_create(transport_global->settings[SENT_SETTINGS], grpc_chttp2_settings_create(
transport_global->settings[LOCAL_SETTINGS], transport_global->settings[GRPC_SENT_SETTINGS],
transport_global->force_send_settings, transport_global->settings[GRPC_LOCAL_SETTINGS],
GRPC_CHTTP2_NUM_SETTINGS)); transport_global->force_send_settings, GRPC_CHTTP2_NUM_SETTINGS));
transport_global->force_send_settings = 0; transport_global->force_send_settings = 0;
transport_global->dirtied_local_settings = 0; transport_global->dirtied_local_settings = 0;
transport_global->sent_local_settings = 1; transport_global->sent_local_settings = 1;
...@@ -84,16 +84,16 @@ int grpc_chttp2_unlocking_check_writes( ...@@ -84,16 +84,16 @@ int grpc_chttp2_unlocking_check_writes(
transport_global->outgoing_window -= window_delta; transport_global->outgoing_window -= window_delta;
stream_global->outgoing_window -= window_delta; stream_global->outgoing_window -= window_delta;
if (stream_global->write_state == WRITE_STATE_QUEUED_CLOSE && if (stream_global->write_state == GRPC_WRITE_STATE_QUEUED_CLOSE &&
stream_global->outgoing_sopb->nops == 0) { stream_global->outgoing_sopb->nops == 0) {
if (!transport_global->is_client && !stream_global->read_closed) { if (!transport_global->is_client && !stream_global->read_closed) {
stream_writing->send_closed = SEND_CLOSED_WITH_RST_STREAM; stream_writing->send_closed = GRPC_SEND_CLOSED_WITH_RST_STREAM;
} else { } else {
stream_writing->send_closed = SEND_CLOSED; stream_writing->send_closed = GRPC_SEND_CLOSED;
} }
} }
if (stream_writing->sopb.nops > 0 || if (stream_writing->sopb.nops > 0 ||
stream_writing->send_closed != DONT_SEND_CLOSED) { stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) {
grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing); grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing);
} }
...@@ -112,7 +112,7 @@ int grpc_chttp2_unlocking_check_writes( ...@@ -112,7 +112,7 @@ int grpc_chttp2_unlocking_check_writes(
while (grpc_chttp2_list_pop_writable_window_update_stream(transport_global, while (grpc_chttp2_list_pop_writable_window_update_stream(transport_global,
&stream_global)) { &stream_global)) {
window_delta = window_delta =
transport_global->settings[LOCAL_SETTINGS] transport_global->settings[GRPC_LOCAL_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] -
stream_global->incoming_window; stream_global->incoming_window;
if (!stream_global->read_closed && window_delta > 0) { if (!stream_global->read_closed && window_delta > 0) {
...@@ -128,7 +128,7 @@ int grpc_chttp2_unlocking_check_writes( ...@@ -128,7 +128,7 @@ int grpc_chttp2_unlocking_check_writes(
} }
/* if the grpc_chttp2_transport is ready to send a window update, do so here /* if the grpc_chttp2_transport is ready to send a window update, do so here
* also */ also; 3/4 is a magic number that will likely get tuned soon */
if (transport_global->incoming_window < if (transport_global->incoming_window <
transport_global->connection_window_target * 3 / 4) { transport_global->connection_window_target * 3 / 4) {
window_delta = transport_global->connection_window_target - window_delta = transport_global->connection_window_target -
...@@ -174,11 +174,11 @@ static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing) { ...@@ -174,11 +174,11 @@ static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing) {
while ( while (
grpc_chttp2_list_pop_writing_stream(transport_writing, &stream_writing)) { grpc_chttp2_list_pop_writing_stream(transport_writing, &stream_writing)) {
grpc_chttp2_encode(stream_writing->sopb.ops, stream_writing->sopb.nops, grpc_chttp2_encode(stream_writing->sopb.ops, stream_writing->sopb.nops,
stream_writing->send_closed != DONT_SEND_CLOSED, stream_writing->send_closed != GRPC_DONT_SEND_CLOSED,
stream_writing->id, &transport_writing->hpack_compressor, stream_writing->id, &transport_writing->hpack_compressor,
&transport_writing->outbuf); &transport_writing->outbuf);
stream_writing->sopb.nops = 0; stream_writing->sopb.nops = 0;
if (stream_writing->send_closed == SEND_CLOSED_WITH_RST_STREAM) { if (stream_writing->send_closed == GRPC_SEND_CLOSED_WITH_RST_STREAM) {
gpr_slice_buffer_add(&transport_writing->outbuf, gpr_slice_buffer_add(&transport_writing->outbuf,
grpc_chttp2_rst_stream_create(stream_writing->id, grpc_chttp2_rst_stream_create(stream_writing->id,
GRPC_CHTTP2_NO_ERROR)); GRPC_CHTTP2_NO_ERROR));
...@@ -201,8 +201,8 @@ void grpc_chttp2_cleanup_writing( ...@@ -201,8 +201,8 @@ void grpc_chttp2_cleanup_writing(
while (grpc_chttp2_list_pop_written_stream( while (grpc_chttp2_list_pop_written_stream(
transport_global, transport_writing, &stream_global, &stream_writing)) { transport_global, transport_writing, &stream_global, &stream_writing)) {
if (stream_writing->send_closed != DONT_SEND_CLOSED) { if (stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) {
stream_global->write_state = WRITE_STATE_SENT_CLOSE; stream_global->write_state = GRPC_WRITE_STATE_SENT_CLOSE;
if (!transport_global->is_client) { if (!transport_global->is_client) {
stream_global->read_closed = 1; stream_global->read_closed = 1;
} }
......
...@@ -230,7 +230,8 @@ static void init_transport(grpc_chttp2_transport *t, ...@@ -230,7 +230,8 @@ static void init_transport(grpc_chttp2_transport *t,
t->parsing.is_client = is_client; t->parsing.is_client = is_client;
t->parsing.str_grpc_timeout = t->parsing.str_grpc_timeout =
grpc_mdstr_from_string(t->metadata_context, "grpc-timeout"); grpc_mdstr_from_string(t->metadata_context, "grpc-timeout");
t->parsing.deframe_state = is_client ? DTS_FH_0 : DTS_CLIENT_PREFIX_0; t->parsing.deframe_state =
is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0;
t->writing.is_client = is_client; t->writing.is_client = is_client;
gpr_slice_buffer_init(&t->global.qbuf); gpr_slice_buffer_init(&t->global.qbuf);
...@@ -261,7 +262,7 @@ static void init_transport(grpc_chttp2_transport *t, ...@@ -261,7 +262,7 @@ static void init_transport(grpc_chttp2_transport *t,
/* copy in initial settings to all setting sets */ /* copy in initial settings to all setting sets */
for (i = 0; i < GRPC_CHTTP2_NUM_SETTINGS; i++) { for (i = 0; i < GRPC_CHTTP2_NUM_SETTINGS; i++) {
t->parsing.settings[i] = grpc_chttp2_settings_parameters[i].default_value; t->parsing.settings[i] = grpc_chttp2_settings_parameters[i].default_value;
for (j = 0; j < NUM_SETTING_SETS; j++) { for (j = 0; j < GRPC_NUM_SETTING_SETS; j++) {
t->global.settings[j][i] = t->global.settings[j][i] =
grpc_chttp2_settings_parameters[i].default_value; grpc_chttp2_settings_parameters[i].default_value;
} }
...@@ -388,11 +389,11 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs, ...@@ -388,11 +389,11 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs,
GPR_ASSERT(t->parsing_active); GPR_ASSERT(t->parsing_active);
s->global.id = (gpr_uint32)(gpr_uintptr)server_data; s->global.id = (gpr_uint32)(gpr_uintptr)server_data;
s->global.outgoing_window = s->global.outgoing_window =
t->global t->global.settings[GRPC_PEER_SETTINGS]
.settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
s->parsing.incoming_window = s->global.incoming_window = s->parsing.incoming_window = s->global.incoming_window =
t->global t->global.settings[GRPC_SENT_SETTINGS]
.settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
*t->accepting_stream = s; *t->accepting_stream = s;
grpc_chttp2_stream_map_add(&t->parsing_stream_map, s->global.id, s); grpc_chttp2_stream_map_add(&t->parsing_stream_map, s->global.id, s);
s->global.in_stream_map = 1; s->global.in_stream_map = 1;
...@@ -509,8 +510,8 @@ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id, ...@@ -509,8 +510,8 @@ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
gpr_log(GPR_INFO, "Requested parameter %s clamped from %d to %d", sp->name, gpr_log(GPR_INFO, "Requested parameter %s clamped from %d to %d", sp->name,
value, use_value); value, use_value);
} }
if (use_value != t->global.settings[LOCAL_SETTINGS][id]) { if (use_value != t->global.settings[GRPC_LOCAL_SETTINGS][id]) {
t->global.settings[LOCAL_SETTINGS][id] = use_value; t->global.settings[GRPC_LOCAL_SETTINGS][id] = use_value;
t->global.dirtied_local_settings = 1; t->global.dirtied_local_settings = 1;
} }
} }
...@@ -569,14 +570,15 @@ static void maybe_start_some_streams( ...@@ -569,14 +570,15 @@ static void maybe_start_some_streams(
* concurrency */ * concurrency */
while (transport_global->next_stream_id <= MAX_CLIENT_STREAM_ID && while (transport_global->next_stream_id <= MAX_CLIENT_STREAM_ID &&
transport_global->concurrent_stream_count < transport_global->concurrent_stream_count <
transport_global->settings transport_global
[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS] && ->settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS] &&
grpc_chttp2_list_pop_waiting_for_concurrency(transport_global, grpc_chttp2_list_pop_waiting_for_concurrency(transport_global,
&stream_global)) { &stream_global)) {
IF_TRACING(gpr_log(GPR_DEBUG, GRPC_CHTTP2_IF_TRACING(gpr_log(
"HTTP:%s: Allocating new grpc_chttp2_stream %p to id %d", GPR_DEBUG, "HTTP:%s: Allocating new grpc_chttp2_stream %p to id %d",
transport_global->is_client ? "CLI" : "SVR", transport_global->is_client ? "CLI" : "SVR", stream_global,
stream_global, transport_global->next_stream_id)); transport_global->next_stream_id));
GPR_ASSERT(stream_global->id == 0); GPR_ASSERT(stream_global->id == 0);
stream_global->id = transport_global->next_stream_id; stream_global->id = transport_global->next_stream_id;
...@@ -589,11 +591,11 @@ static void maybe_start_some_streams( ...@@ -589,11 +591,11 @@ static void maybe_start_some_streams(
} }
stream_global->outgoing_window = stream_global->outgoing_window =
transport_global transport_global->settings[GRPC_PEER_SETTINGS]
->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
stream_global->incoming_window = stream_global->incoming_window =
transport_global transport_global->settings[GRPC_SENT_SETTINGS]
->settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
grpc_chttp2_stream_map_add( grpc_chttp2_stream_map_add(
&TRANSPORT_FROM_GLOBAL(transport_global)->new_stream_map, &TRANSPORT_FROM_GLOBAL(transport_global)->new_stream_map,
stream_global->id, STREAM_FROM_GLOBAL(stream_global)); stream_global->id, STREAM_FROM_GLOBAL(stream_global));
...@@ -623,11 +625,12 @@ static void perform_op_locked(grpc_chttp2_transport_global *transport_global, ...@@ -623,11 +625,12 @@ static void perform_op_locked(grpc_chttp2_transport_global *transport_global,
stream_global->send_done_closure = op->on_done_send; stream_global->send_done_closure = op->on_done_send;
if (!stream_global->cancelled) { if (!stream_global->cancelled) {
stream_global->outgoing_sopb = op->send_ops; stream_global->outgoing_sopb = op->send_ops;
if (op->is_last_send && stream_global->write_state == WRITE_STATE_OPEN) { if (op->is_last_send &&
stream_global->write_state = WRITE_STATE_QUEUED_CLOSE; stream_global->write_state == GRPC_WRITE_STATE_OPEN) {
stream_global->write_state = GRPC_WRITE_STATE_QUEUED_CLOSE;
} }
if (stream_global->id == 0) { if (stream_global->id == 0) {
IF_TRACING(gpr_log( GRPC_CHTTP2_IF_TRACING(gpr_log(
GPR_DEBUG, GPR_DEBUG,
"HTTP:%s: New grpc_chttp2_stream %p waiting for concurrency", "HTTP:%s: New grpc_chttp2_stream %p waiting for concurrency",
transport_global->is_client ? "CLI" : "SVR", stream_global)); transport_global->is_client ? "CLI" : "SVR", stream_global));
...@@ -747,7 +750,7 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) { ...@@ -747,7 +750,7 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) {
while (grpc_chttp2_list_pop_closed_waiting_for_parsing(transport_global, while (grpc_chttp2_list_pop_closed_waiting_for_parsing(transport_global,
&stream_global)) { &stream_global)) {
GPR_ASSERT(stream_global->in_stream_map); GPR_ASSERT(stream_global->in_stream_map);
GPR_ASSERT(stream_global->write_state != WRITE_STATE_OPEN); GPR_ASSERT(stream_global->write_state != GRPC_WRITE_STATE_OPEN);
GPR_ASSERT(stream_global->read_closed); GPR_ASSERT(stream_global->read_closed);
remove_stream(t, stream_global->id); remove_stream(t, stream_global->id);
grpc_chttp2_list_add_read_write_state_changed(transport_global, grpc_chttp2_list_add_read_write_state_changed(transport_global,
...@@ -758,7 +761,7 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) { ...@@ -758,7 +761,7 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) {
while (grpc_chttp2_list_pop_read_write_state_changed(transport_global, while (grpc_chttp2_list_pop_read_write_state_changed(transport_global,
&stream_global)) { &stream_global)) {
if (stream_global->cancelled) { if (stream_global->cancelled) {
stream_global->write_state = WRITE_STATE_SENT_CLOSE; stream_global->write_state = GRPC_WRITE_STATE_SENT_CLOSE;
stream_global->read_closed = 1; stream_global->read_closed = 1;
if (!stream_global->published_cancelled) { if (!stream_global->published_cancelled) {
char buffer[GPR_LTOA_MIN_BUFSIZE]; char buffer[GPR_LTOA_MIN_BUFSIZE];
...@@ -771,7 +774,7 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) { ...@@ -771,7 +774,7 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) {
stream_global->published_cancelled = 1; stream_global->published_cancelled = 1;
} }
} }
if (stream_global->write_state == WRITE_STATE_SENT_CLOSE && if (stream_global->write_state == GRPC_WRITE_STATE_SENT_CLOSE &&
stream_global->read_closed && stream_global->in_stream_map) { stream_global->read_closed && stream_global->in_stream_map) {
if (t->parsing_active) { if (t->parsing_active) {
grpc_chttp2_list_add_closed_waiting_for_parsing(transport_global, grpc_chttp2_list_add_closed_waiting_for_parsing(transport_global,
...@@ -790,7 +793,8 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) { ...@@ -790,7 +793,8 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) {
To fix this will require having an edge after stream-closed To fix this will require having an edge after stream-closed
indicating that the stream is closed AND safe to delete. */ indicating that the stream is closed AND safe to delete. */
state = compute_state( state = compute_state(
stream_global->write_state == WRITE_STATE_SENT_CLOSE && !stream_global->in_stream_map, stream_global->write_state == GRPC_WRITE_STATE_SENT_CLOSE &&
!stream_global->in_stream_map,
stream_global->read_closed); stream_global->read_closed);
if (stream_global->incoming_sopb.nops == 0 && if (stream_global->incoming_sopb.nops == 0 &&
state == stream_global->published_state) { state == stream_global->published_state) {
...@@ -842,6 +846,19 @@ static void drop_connection(grpc_chttp2_transport *t) { ...@@ -842,6 +846,19 @@ static void drop_connection(grpc_chttp2_transport *t) {
end_all_the_calls(t); end_all_the_calls(t);
} }
/** update window from a settings change */
static void update_global_window(void *args, gpr_uint32 id, void *stream) {
grpc_chttp2_transport *t = args;
grpc_chttp2_stream *s = stream;
grpc_chttp2_transport_global *transport_global = &t->global;
grpc_chttp2_stream_global *stream_global = &s->global;
GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("settings", transport_global, stream_global,
outgoing_window,
t->parsing.initial_window_update);
stream_global->outgoing_window += t->parsing.initial_window_update;
}
/* tcp read callback */ /* tcp read callback */
static void recv_data(void *tp, gpr_slice *slices, size_t nslices, static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
grpc_endpoint_cb_status error) { grpc_endpoint_cb_status error) {
...@@ -888,6 +905,10 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices, ...@@ -888,6 +905,10 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
grpc_chttp2_stream_map_move_into(&t->new_stream_map, grpc_chttp2_stream_map_move_into(&t->new_stream_map,
&t->parsing_stream_map); &t->parsing_stream_map);
t->global.concurrent_stream_count = grpc_chttp2_stream_map_size(&t->parsing_stream_map); t->global.concurrent_stream_count = grpc_chttp2_stream_map_size(&t->parsing_stream_map);
if (t->parsing.initial_window_update != 0) {
grpc_chttp2_stream_map_for_each(&t->parsing_stream_map,
update_global_window, t);
}
/* handle higher level things */ /* handle higher level things */
grpc_chttp2_publish_reads(&t->global, &t->parsing); grpc_chttp2_publish_reads(&t->global, &t->parsing);
t->parsing_active = 0; t->parsing_active = 0;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment