Skip to content
Snippets Groups Projects
Commit aef3a79a authored by Craig Tiller's avatar Craig Tiller
Browse files

Remove extraneous locks on cq checks

parent dfd3a8f7
No related branches found
No related tags found
No related merge requests found
...@@ -71,6 +71,9 @@ struct grpc_completion_queue { ...@@ -71,6 +71,9 @@ struct grpc_completion_queue {
gpr_refcount pending_events; gpr_refcount pending_events;
/** Once owning_refs drops to zero, we will destroy the cq */ /** Once owning_refs drops to zero, we will destroy the cq */
gpr_refcount owning_refs; gpr_refcount owning_refs;
/** counter of how many things have ever been queued on this completion queue
useful for avoiding locks to check the queue */
gpr_atm things_queued_ever;
/** 0 initially, 1 once we've begun shutting down */ /** 0 initially, 1 once we've begun shutting down */
int shutdown; int shutdown;
int shutdown_called; int shutdown_called;
...@@ -125,15 +128,6 @@ void grpc_cq_global_shutdown(void) { ...@@ -125,15 +128,6 @@ void grpc_cq_global_shutdown(void) {
} }
} }
struct grpc_cq_alarm {
grpc_timer alarm;
grpc_cq_completion completion;
/** completion queue where events about this alarm will be posted */
grpc_completion_queue *cq;
/** user supplied tag */
void *tag;
};
grpc_completion_queue *grpc_completion_queue_create(void *reserved) { grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
grpc_completion_queue *cc; grpc_completion_queue *cc;
GPR_ASSERT(!reserved); GPR_ASSERT(!reserved);
...@@ -170,6 +164,7 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) { ...@@ -170,6 +164,7 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
cc->is_server_cq = 0; cc->is_server_cq = 0;
cc->is_non_listening_server_cq = 0; cc->is_non_listening_server_cq = 0;
cc->num_pluckers = 0; cc->num_pluckers = 0;
gpr_atm_no_barrier_store(&cc->things_queued_ever, 0);
#ifndef NDEBUG #ifndef NDEBUG
cc->outstanding_tag_count = 0; cc->outstanding_tag_count = 0;
#endif #endif
...@@ -280,6 +275,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, ...@@ -280,6 +275,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
GPR_ASSERT(found); GPR_ASSERT(found);
#endif #endif
shutdown = gpr_unref(&cc->pending_events); shutdown = gpr_unref(&cc->pending_events);
gpr_atm_no_barrier_fetch_add(&cc->things_queued_ever, 1);
if (!shutdown) { if (!shutdown) {
cc->completed_tail->next = cc->completed_tail->next =
((uintptr_t)storage) | (1u & (uintptr_t)cc->completed_tail->next); ((uintptr_t)storage) | (1u & (uintptr_t)cc->completed_tail->next);
...@@ -318,6 +314,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, ...@@ -318,6 +314,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
} }
typedef struct { typedef struct {
gpr_atm last_seen_things_queued_ever;
grpc_completion_queue *cq; grpc_completion_queue *cq;
gpr_timespec deadline; gpr_timespec deadline;
grpc_cq_completion *stolen_completion; grpc_cq_completion *stolen_completion;
...@@ -328,17 +325,23 @@ static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) { ...@@ -328,17 +325,23 @@ static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) {
cq_is_finished_arg *a = arg; cq_is_finished_arg *a = arg;
grpc_completion_queue *cq = a->cq; grpc_completion_queue *cq = a->cq;
GPR_ASSERT(a->stolen_completion == NULL); GPR_ASSERT(a->stolen_completion == NULL);
gpr_mu_lock(cq->mu); gpr_atm current_last_seen_things_queued_ever =
if (cq->completed_tail != &cq->completed_head) { gpr_atm_no_barrier_load(&cq->things_queued_ever);
a->stolen_completion = (grpc_cq_completion *)cq->completed_head.next; if (current_last_seen_things_queued_ever != a->last_seen_things_queued_ever) {
cq->completed_head.next = a->stolen_completion->next & ~(uintptr_t)1; gpr_mu_lock(cq->mu);
if (a->stolen_completion == cq->completed_tail) { a->last_seen_things_queued_ever =
cq->completed_tail = &cq->completed_head; gpr_atm_no_barrier_load(&cq->things_queued_ever);
if (cq->completed_tail != &cq->completed_head) {
a->stolen_completion = (grpc_cq_completion *)cq->completed_head.next;
cq->completed_head.next = a->stolen_completion->next & ~(uintptr_t)1;
if (a->stolen_completion == cq->completed_tail) {
cq->completed_tail = &cq->completed_head;
}
gpr_mu_unlock(cq->mu);
return true;
} }
gpr_mu_unlock(cq->mu); gpr_mu_unlock(cq->mu);
return true;
} }
gpr_mu_unlock(cq->mu);
return gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0; return gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0;
} }
...@@ -386,12 +389,13 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, ...@@ -386,12 +389,13 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
cq_is_finished_arg is_finished_arg = {cc, deadline, NULL, NULL};
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(
cq_is_next_finished, &is_finished_arg);
GRPC_CQ_INTERNAL_REF(cc, "next"); GRPC_CQ_INTERNAL_REF(cc, "next");
gpr_mu_lock(cc->mu); gpr_mu_lock(cc->mu);
cq_is_finished_arg is_finished_arg = {
gpr_atm_no_barrier_load(&cc->things_queued_ever), cc, deadline, NULL,
NULL};
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(
cq_is_next_finished, &is_finished_arg);
for (;;) { for (;;) {
if (is_finished_arg.stolen_completion != NULL) { if (is_finished_arg.stolen_completion != NULL) {
gpr_mu_unlock(cc->mu); gpr_mu_unlock(cc->mu);
...@@ -496,23 +500,29 @@ static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) { ...@@ -496,23 +500,29 @@ static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) {
cq_is_finished_arg *a = arg; cq_is_finished_arg *a = arg;
grpc_completion_queue *cq = a->cq; grpc_completion_queue *cq = a->cq;
GPR_ASSERT(a->stolen_completion == NULL); GPR_ASSERT(a->stolen_completion == NULL);
gpr_mu_lock(cq->mu); gpr_atm current_last_seen_things_queued_ever =
grpc_cq_completion *c; gpr_atm_no_barrier_load(&cq->things_queued_ever);
grpc_cq_completion *prev = &cq->completed_head; if (current_last_seen_things_queued_ever != a->last_seen_things_queued_ever) {
while ((c = (grpc_cq_completion *)(prev->next & ~(uintptr_t)1)) != gpr_mu_lock(cq->mu);
&cq->completed_head) { a->last_seen_things_queued_ever =
if (c->tag == a->tag) { gpr_atm_no_barrier_load(&cq->things_queued_ever);
prev->next = (prev->next & (uintptr_t)1) | (c->next & ~(uintptr_t)1); grpc_cq_completion *c;
if (c == cq->completed_tail) { grpc_cq_completion *prev = &cq->completed_head;
cq->completed_tail = prev; while ((c = (grpc_cq_completion *)(prev->next & ~(uintptr_t)1)) !=
&cq->completed_head) {
if (c->tag == a->tag) {
prev->next = (prev->next & (uintptr_t)1) | (c->next & ~(uintptr_t)1);
if (c == cq->completed_tail) {
cq->completed_tail = prev;
}
gpr_mu_unlock(cq->mu);
a->stolen_completion = c;
return true;
} }
gpr_mu_unlock(cq->mu); prev = c;
a->stolen_completion = c;
return true;
} }
prev = c; gpr_mu_unlock(cq->mu);
} }
gpr_mu_unlock(cq->mu);
return gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0; return gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0;
} }
...@@ -543,12 +553,13 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, ...@@ -543,12 +553,13 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
cq_is_finished_arg is_finished_arg = {cc, deadline, NULL, tag};
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(
cq_is_pluck_finished, &is_finished_arg);
GRPC_CQ_INTERNAL_REF(cc, "pluck"); GRPC_CQ_INTERNAL_REF(cc, "pluck");
gpr_mu_lock(cc->mu); gpr_mu_lock(cc->mu);
cq_is_finished_arg is_finished_arg = {
gpr_atm_no_barrier_load(&cc->things_queued_ever), cc, deadline, NULL,
tag};
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(
cq_is_pluck_finished, &is_finished_arg);
for (;;) { for (;;) {
if (is_finished_arg.stolen_completion != NULL) { if (is_finished_arg.stolen_completion != NULL) {
gpr_mu_unlock(cc->mu); gpr_mu_unlock(cc->mu);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment