diff --git a/src/core/ext/client_config/subchannel.c b/src/core/ext/client_config/subchannel.c index 2c4364b259ffdf5b46c224e3f7e901ceaefb24d7..e622862fc93c3803290bf5c020b21d756bdb767c 100644 --- a/src/core/ext/client_config/subchannel.c +++ b/src/core/ext/client_config/subchannel.c @@ -219,8 +219,8 @@ static gpr_atm ref_mutate(grpc_subchannel *c, gpr_atm delta, : gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta); #ifdef GRPC_STREAM_REFCOUNT_DEBUG gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, - "SUBCHANNEL: %p % 12s 0x%08x -> 0x%08x [%s]", c, purpose, old_val, - old_val + delta, reason); + "SUBCHANNEL: %p %s 0x%08" PRIxPTR " -> 0x%08" PRIxPTR " [%s]", c, + purpose, old_val, old_val + delta, reason); #endif return old_val; } diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 0be00a78d4ef552809571bfba7ee371c792b9388..82c040b186688a561a42ed65e8c434fd36b08107 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -501,7 +501,6 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, REF_TRANSPORT(t, "stream"); if (server_data) { - GPR_ASSERT(t->executor.parsing_active); s->global.id = (uint32_t)(uintptr_t)server_data; s->global.outgoing_window = t->global.settings[GRPC_PEER_SETTINGS] @@ -540,7 +539,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, exec_ctx, t, GRPC_ERROR_CREATE("Last stream closed after sending goaway")); } - if (!t->executor.parsing_active && s->global.id) { + if (s->global.id != 0) { GPR_ASSERT(grpc_chttp2_stream_map_find(&t->parsing_stream_map, s->global.id) == NULL); } @@ -1246,15 +1245,6 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t = op->transport_private.args[0]; grpc_error *close_transport = op->disconnect_with_error; - /* If there's a set_accept_stream ensure that we're not parsing - to avoid changing things out from underneath */ - if (t->executor.parsing_active && op->set_accept_stream) { - GPR_ASSERT(t->post_parsing_op == NULL); - t->post_parsing_op = gpr_malloc(sizeof(*op)); - memcpy(t->post_parsing_op, op, sizeof(*op)); - return; - } - if (op->on_connectivity_state_change != NULL) { grpc_connectivity_state_notify_on_state_change( exec_ctx, &t->channel_callback.state_tracker, op->connectivity_state, @@ -1627,18 +1617,12 @@ void grpc_chttp2_mark_stream_closed( } } if (stream_global->read_closed && stream_global->write_closed) { - if (stream_global->id != 0 && - TRANSPORT_FROM_GLOBAL(transport_global)->executor.parsing_active) { - grpc_chttp2_list_add_closed_waiting_for_parsing(transport_global, - stream_global); - } else { - if (stream_global->id != 0) { - remove_stream(exec_ctx, TRANSPORT_FROM_GLOBAL(transport_global), - stream_global->id, - removal_error(GRPC_ERROR_REF(error), stream_global)); - } - GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2"); + if (stream_global->id != 0) { + remove_stream(exec_ctx, TRANSPORT_FROM_GLOBAL(transport_global), + stream_global->id, + removal_error(GRPC_ERROR_REF(error), stream_global)); } + GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2"); } GRPC_ERROR_UNREF(error); } @@ -1874,9 +1858,7 @@ static void reading_action_locked(grpc_exec_ctx *exec_ctx, void *tp, GRPC_ERROR_REF(error); - GPR_ASSERT(!t->executor.parsing_active); if (!t->closed) { - t->executor.parsing_active = 1; /* merge stream lists */ grpc_chttp2_stream_map_move_into(&t->new_stream_map, &t->parsing_stream_map); @@ -1919,27 +1901,6 @@ static void reading_action_locked(grpc_exec_ctx *exec_ctx, void *tp, grpc_chttp2_initiate_write(exec_ctx, transport_global, false, "global incoming window"); } - t->executor.parsing_active = 0; - /* handle delayed transport ops (if there is one) */ - if (t->post_parsing_op) { - grpc_transport_op *op = t->post_parsing_op; - t->post_parsing_op = NULL; - perform_transport_op_locked(exec_ctx, op, GRPC_ERROR_NONE); - gpr_free(op); - } - /* if a stream is in the stream map, and gets cancelled, we need to - * ensure we are not parsing before continuing the cancellation to keep - * things in a sane state */ - grpc_chttp2_stream_global *stream_global; - while (grpc_chttp2_list_pop_closed_waiting_for_parsing(transport_global, - &stream_global)) { - GPR_ASSERT(stream_global->in_stream_map); - GPR_ASSERT(stream_global->write_closed); - GPR_ASSERT(stream_global->read_closed); - remove_stream(exec_ctx, t, stream_global->id, - removal_error(GRPC_ERROR_NONE, stream_global)); - GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2"); - } GPR_TIMER_END("post_parse_locked", 0); } diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 517b21c5d3ddfd55b37f0d5920aec6da233bafe9..da90464400514fda7ca3156d5f684306c601e5a1 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -64,7 +64,6 @@ typedef enum { GRPC_CHTTP2_LIST_WRITABLE, GRPC_CHTTP2_LIST_WRITING, GRPC_CHTTP2_LIST_WRITTEN, - GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_PARSING, GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_WRITING, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT, /* streams waiting for the outgoing window in the writing path, they will be @@ -308,10 +307,6 @@ struct grpc_chttp2_transport { struct { grpc_combiner *combiner; - /** is a thread currently in the global lock */ - bool global_active; - /** is a thread currently parsing */ - bool parsing_active; /** write execution state of the transport */ grpc_chttp2_write_state write_state; /** has a check_read_ops been scheduled */ @@ -374,9 +369,6 @@ struct grpc_chttp2_transport { /** connectivity tracking */ grpc_connectivity_state_tracker state_tracker; } channel_callback; - - /** Transport op to be applied post-parsing */ - grpc_transport_op *post_parsing_op; }; struct grpc_chttp2_stream_global { @@ -602,13 +594,6 @@ void grpc_chttp2_list_remove_stalled_by_transport( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global); -void grpc_chttp2_list_add_closed_waiting_for_parsing( - grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global *stream_global); -int grpc_chttp2_list_pop_closed_waiting_for_parsing( - grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global **stream_global); - void grpc_chttp2_list_add_closed_waiting_for_writing( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global); diff --git a/src/core/ext/transport/chttp2/transport/stream_lists.c b/src/core/ext/transport/chttp2/transport/stream_lists.c index 7c31466c800ae064618fbb535260dc6e664dea93..0805551b64232ada5dea2a7d3bda1b2eb4a668fb 100644 --- a/src/core/ext/transport/chttp2/transport/stream_lists.c +++ b/src/core/ext/transport/chttp2/transport/stream_lists.c @@ -334,26 +334,6 @@ void grpc_chttp2_list_remove_stalled_by_transport( GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT); } -void grpc_chttp2_list_add_closed_waiting_for_parsing( - grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global *stream_global) { - stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), - STREAM_FROM_GLOBAL(stream_global), - GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_PARSING); -} - -int grpc_chttp2_list_pop_closed_waiting_for_parsing( - grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global **stream_global) { - grpc_chttp2_stream *stream; - int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream, - GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_PARSING); - if (r != 0) { - *stream_global = &stream->global; - } - return r; -} - void grpc_chttp2_list_add_closed_waiting_for_writing( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global) { diff --git a/src/core/lib/iomgr/error.c b/src/core/lib/iomgr/error.c index e366961936dc37dac6bd13670310247200e4b5c2..45ef75e04df50e2722d1b365dec2df570dfe839b 100644 --- a/src/core/lib/iomgr/error.c +++ b/src/core/lib/iomgr/error.c @@ -265,7 +265,7 @@ static grpc_error *copy_error_and_unref(grpc_error *in) { } else { out = gpr_malloc(sizeof(*out)); #ifdef GRPC_ERROR_REFCOUNT_DEBUG - gpr_log(GPR_DEBUG, "%p create copying", out); + gpr_log(GPR_DEBUG, "%p create copying %p", out, in); #endif out->ints = gpr_avl_ref(in->ints); out->strs = gpr_avl_ref(in->strs); diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c index 08f9d7e8d9e67cd0cd1211062d917ad9b60e90bc..a78ad4349a19c0a5d9d639ee08254b4f6fbae3ed 100644 --- a/src/core/lib/transport/transport.c +++ b/src/core/lib/transport/transport.c @@ -46,8 +46,9 @@ #ifdef GRPC_STREAM_REFCOUNT_DEBUG void grpc_stream_ref(grpc_stream_refcount *refcount, const char *reason) { gpr_atm val = gpr_atm_no_barrier_load(&refcount->refs.count); - gpr_log(GPR_DEBUG, "%s %p:%p REF %d->%d %s", refcount->object_type, - refcount, refcount->destroy.cb_arg, val, val + 1, reason); + gpr_log(GPR_DEBUG, "%s %p:%p REF %" PRIdPTR "->%" PRIdPTR " %s", + refcount->object_type, refcount, refcount->destroy.cb_arg, val, + val + 1, reason); #else void grpc_stream_ref(grpc_stream_refcount *refcount) { #endif @@ -58,8 +59,9 @@ void grpc_stream_ref(grpc_stream_refcount *refcount) { void grpc_stream_unref(grpc_exec_ctx *exec_ctx, grpc_stream_refcount *refcount, const char *reason) { gpr_atm val = gpr_atm_no_barrier_load(&refcount->refs.count); - gpr_log(GPR_DEBUG, "%s %p:%p UNREF %d->%d %s", refcount->object_type, - refcount, refcount->destroy.cb_arg, val, val - 1, reason); + gpr_log(GPR_DEBUG, "%s %p:%p UNREF %" PRIdPTR "->%" PRIdPTR " %s", + refcount->object_type, refcount, refcount->destroy.cb_arg, val, + val - 1, reason); #else void grpc_stream_unref(grpc_exec_ctx *exec_ctx, grpc_stream_refcount *refcount) { diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index d0d0c2a461d2aaec5cb92dd07d927e40124e407a..392a7ca422eb015b8dc59ef29911f494b4c070ab 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -55,7 +55,7 @@ typedef struct grpc_transport grpc_transport; for a stream. */ typedef struct grpc_stream grpc_stream; -//#define GRPC_STREAM_REFCOUNT_DEBUG +#define GRPC_STREAM_REFCOUNT_DEBUG typedef struct grpc_stream_refcount { gpr_refcount refs;