From 533fbd362f96c573343403e85508390d0f81a2c5 Mon Sep 17 00:00:00 2001 From: yang-g <yangg@google.com> Date: Thu, 13 Jul 2017 11:39:44 -0700 Subject: [PATCH] Rebase with head and resolve conflicts --- src/core/lib/surface/completion_queue.c | 76 ++++++++++++------------- 1 file changed, 38 insertions(+), 38 deletions(-) diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index ef1dfb1eb0..98234e036a 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -196,7 +196,7 @@ typedef struct cq_vtable { void (*init)(void *data); void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq); void (*destroy)(void *data); - void (*begin_op)(grpc_completion_queue *cq, void *tag); + int (*begin_op)(grpc_completion_queue *cq, void *tag); void (*end_op)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, void *tag, grpc_error *error, void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, @@ -288,8 +288,8 @@ static void cq_shutdown_next(grpc_exec_ctx *exec_ctx, static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq); -static void cq_begin_op_for_next(grpc_completion_queue *cq, void *tag); -static void cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag); +static int cq_begin_op_for_next(grpc_completion_queue *cq, void *tag); +static int cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag); static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, void *tag, @@ -522,28 +522,55 @@ void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, } } -static void cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) { +#ifndef NDEBUG +static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) { + int found = 0; + if (lock_cq) { + gpr_mu_lock(cq->mu); + } + + for (int i = 0; i < (int)cq->outstanding_tag_count; i++) { + if (cq->outstanding_tags[i] == tag) { + cq->outstanding_tag_count--; + GPR_SWAP(void *, cq->outstanding_tags[i], + cq->outstanding_tags[cq->outstanding_tag_count]); + found = 1; + break; + } + } + + if (lock_cq) { + gpr_mu_unlock(cq->mu); + } + + GPR_ASSERT(found); +} +#else +static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {} +#endif + +static int cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) { cq_next_data *cqd = DATA_FROM_CQ(cq); - GPR_ASSERT(!cqd->shutdown_called); while (true) { - gpr_atm count = gpr_atm_no_barrier_load(&cqd->pending_events.count); + gpr_atm count = gpr_atm_no_barrier_load(&cqd->pending_events); if (count == 0) { + cq_check_tag(cq, tag, true); /* Used in debug builds only */ return 1; - } else if (gpr_atm_no_barrier_cas(&cqd->pending_events.count, count, - count + 1)) { + } else if (gpr_atm_no_barrier_cas(&cqd->pending_events, count, count + 1)) { break; } } return 0; } -static void cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) { +static int cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) { cq_pluck_data *cqd = DATA_FROM_CQ(cq); GPR_ASSERT(!cqd->shutdown_called); gpr_ref(&cqd->pending_events); + return 0; } -void grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) { +int grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) { #ifndef NDEBUG gpr_mu_lock(cq->mu); if (cq->outstanding_tag_count == cq->outstanding_tag_capacity) { @@ -555,35 +582,8 @@ void grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) { cq->outstanding_tags[cq->outstanding_tag_count++] = tag; gpr_mu_unlock(cq->mu); #endif - cq->vtable->begin_op(cq, tag); -} - -#ifndef NDEBUG -static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) { - int found = 0; - if (lock_cq) { - gpr_mu_lock(cq->mu); - } - - for (int i = 0; i < (int)cq->outstanding_tag_count; i++) { - if (cq->outstanding_tags[i] == tag) { - cq->outstanding_tag_count--; - GPR_SWAP(void *, cq->outstanding_tags[i], - cq->outstanding_tags[cq->outstanding_tag_count]); - found = 1; - break; - } - } - - if (lock_cq) { - gpr_mu_unlock(cq->mu); - } - - GPR_ASSERT(found); + return cq->vtable->begin_op(cq, tag); } -#else -static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {} -#endif /* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a * completion -- GitLab