From 9dc01dd44d169811141853b47d6331e4c3a4245b Mon Sep 17 00:00:00 2001 From: Craig Tiller <ctiller@google.com> Date: Mon, 11 Jul 2016 16:26:34 -0700 Subject: [PATCH] Fix some refcounting bugs --- src/core/ext/client_config/subchannel.c | 1 - .../chttp2/transport/chttp2_transport.c | 2 +- src/core/lib/iomgr/combiner.c | 35 ++++++------------- src/core/lib/iomgr/workqueue_posix.c | 11 ++++-- 4 files changed, 21 insertions(+), 28 deletions(-) diff --git a/src/core/ext/client_config/subchannel.c b/src/core/ext/client_config/subchannel.c index 2318bd1023..43b88d2ce4 100644 --- a/src/core/ext/client_config/subchannel.c +++ b/src/core/ext/client_config/subchannel.c @@ -506,7 +506,6 @@ static void connected_subchannel_state_op(grpc_exec_ctx *exec_ctx, grpc_closure *closure) { grpc_transport_op *op = grpc_make_transport_op(NULL); grpc_channel_element *elem; - memset(&op, 0, sizeof(op)); op->connectivity_state = state; op->on_connectivity_state_change = closure; op->bind_pollset_set = interested_parties; diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 4dc642e37a..28f27c271d 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -490,10 +490,10 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, const void *server_data) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; - s->t = t; memset(s, 0, sizeof(*s)); + s->t = t; s->refcount = refcount; /* We reserve one 'active stream' that's dropped when the stream is read-closed. The others are for incoming_byte_streams that are actively diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c index 99e112308b..6b326facc3 100644 --- a/src/core/lib/iomgr/combiner.c +++ b/src/core/lib/iomgr/combiner.c @@ -53,9 +53,7 @@ struct grpc_combiner { grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) { grpc_combiner *lock = gpr_malloc(sizeof(*lock)); - lock->optional_workqueue = - optional_workqueue ? GRPC_WORKQUEUE_REF(optional_workqueue, "combiner") - : NULL; + lock->optional_workqueue = GRPC_WORKQUEUE_REF(optional_workqueue, "combiner"); gpr_atm_no_barrier_store(&lock->state, 1); gpr_mpscq_init(&lock->queue); lock->take_async_break_before_final_list = false; @@ -66,9 +64,7 @@ grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) { static void really_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { GPR_ASSERT(gpr_atm_no_barrier_load(&lock->state) == 0); gpr_mpscq_destroy(&lock->queue); - if (lock->optional_workqueue != NULL) { - GRPC_WORKQUEUE_UNREF(exec_ctx, lock->optional_workqueue, "combiner"); - } + GRPC_WORKQUEUE_UNREF(exec_ctx, lock->optional_workqueue, "combiner"); gpr_free(lock); } @@ -127,7 +123,7 @@ static bool start_execute_final(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { grpc_closure_init(&lock->continue_finishing, continue_executing_final, lock); grpc_exec_ctx_sched(exec_ctx, &lock->continue_finishing, GRPC_ERROR_NONE, - lock->optional_workqueue); + GRPC_WORKQUEUE_REF(lock->optional_workqueue, "sched")); return false; } else { execute_final(exec_ctx, lock); @@ -144,7 +140,7 @@ static bool maybe_finish_one(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { grpc_closure_init(&lock->continue_finishing, continue_finishing_mainline, lock); grpc_exec_ctx_sched(exec_ctx, &lock->continue_finishing, GRPC_ERROR_NONE, - lock->optional_workqueue); + GRPC_WORKQUEUE_REF(lock->optional_workqueue, "sched")); return false; } grpc_closure *cl = (grpc_closure *)n; @@ -183,25 +179,16 @@ void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, grpc_closure *cl, grpc_error *error) { gpr_atm last = gpr_atm_full_fetch_add(&lock->state, 2); GPR_ASSERT(last & 1); // ensure lock has not been destroyed - if (exec_ctx->active_combiner == NULL) { - if (last == 1) { - exec_ctx->active_combiner = lock; - cl->cb(exec_ctx, cl->cb_arg, error); - GRPC_ERROR_UNREF(error); - finish(exec_ctx, lock); - GPR_ASSERT(exec_ctx->active_combiner == lock); - exec_ctx->active_combiner = NULL; - } else { - cl->error = error; - gpr_mpscq_push(&lock->queue, &cl->next_data.atm_next); - } + if (last == 1) { + exec_ctx->active_combiner = lock; + cl->cb(exec_ctx, cl->cb_arg, error); + GRPC_ERROR_UNREF(error); + finish(exec_ctx, lock); + GPR_ASSERT(exec_ctx->active_combiner == lock); + exec_ctx->active_combiner = NULL; } else { cl->error = error; gpr_mpscq_push(&lock->queue, &cl->next_data.atm_next); - grpc_closure_init(&lock->continue_finishing, continue_finishing_mainline, - lock); - grpc_exec_ctx_sched(exec_ctx, &lock->continue_finishing, GRPC_ERROR_NONE, - lock->optional_workqueue); } } diff --git a/src/core/lib/iomgr/workqueue_posix.c b/src/core/lib/iomgr/workqueue_posix.c index 1d52a91694..5accd06744 100644 --- a/src/core/lib/iomgr/workqueue_posix.c +++ b/src/core/lib/iomgr/workqueue_posix.c @@ -86,11 +86,13 @@ void grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, int line, gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "WORKQUEUE:%p ref %d -> %d %s", workqueue, (int)workqueue->refs.count, (int)workqueue->refs.count + 1, reason); + gpr_ref(&workqueue->refs); +} #else void grpc_workqueue_ref(grpc_workqueue *workqueue) { -#endif gpr_ref(&workqueue->refs); } +#endif #ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, @@ -98,13 +100,17 @@ void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "WORKQUEUE:%p unref %d -> %d %s", workqueue, (int)workqueue->refs.count, (int)workqueue->refs.count - 1, reason); + if (gpr_unref(&workqueue->refs)) { + workqueue_orphan(exec_ctx, workqueue); + } +} #else void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { -#endif if (gpr_unref(&workqueue->refs)) { workqueue_orphan(exec_ctx, workqueue); } } +#endif static void drain(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { abort(); @@ -168,6 +174,7 @@ void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, grpc_closure *closure, grpc_error *error) { gpr_atm last = gpr_atm_full_fetch_add(&workqueue->state, 2); GPR_ASSERT(last & 1); + closure->error = error; gpr_mpscq_push(&workqueue->queue, &closure->next_data.atm_next); if (last == 1) { wakeup(exec_ctx, workqueue); -- GitLab