Skip to content
Snippets Groups Projects
Commit 2f40ff42 authored by Craig Tiller's avatar Craig Tiller
Browse files

Support making hybrid cqs in core

parent 4ef9ce79
No related branches found
No related tags found
No related merge requests found
...@@ -60,13 +60,145 @@ typedef struct { ...@@ -60,13 +60,145 @@ typedef struct {
void *tag; void *tag;
} plucker; } plucker;
typedef struct {
size_t (*size)(void);
void (*init)(grpc_pollset *pollset, gpr_mu **mu);
grpc_error *(*kick)(grpc_pollset *pollset,
grpc_pollset_worker *specific_worker);
grpc_error *(*work)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker **worker, gpr_timespec now,
gpr_timespec deadline);
void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_closure *closure);
void (*destroy)(grpc_pollset *pollset);
} cq_poller_vtable;
typedef struct non_polling_worker {
gpr_cv cv;
bool kicked;
struct non_polling_worker *next;
struct non_polling_worker *prev;
} non_polling_worker;
typedef struct {
gpr_mu mu;
non_polling_worker *root;
grpc_closure *shutdown;
} non_polling_poller;
static size_t non_polling_poller_size(void) {
return sizeof(non_polling_poller);
}
static void non_polling_poller_init(grpc_pollset *pollset, gpr_mu **mu) {
non_polling_poller *npp = (non_polling_poller *)pollset;
gpr_mu_init(&npp->mu);
*mu = &npp->mu;
}
static void non_polling_poller_destroy(grpc_pollset *pollset) {
non_polling_poller *npp = (non_polling_poller *)pollset;
gpr_mu_destroy(&npp->mu);
}
static grpc_error *non_polling_poller_work(grpc_exec_ctx *exec_ctx,
grpc_pollset *pollset,
grpc_pollset_worker **worker,
gpr_timespec now,
gpr_timespec deadline) {
non_polling_poller *npp = (non_polling_poller *)pollset;
non_polling_worker w;
gpr_cv_init(&w.cv);
*worker = (grpc_pollset_worker *)&w;
if (npp->root == NULL) {
npp->root = w.next = w.prev = &w;
} else {
w.next = npp->root;
w.prev = w.next->prev;
w.next->prev = w.prev->next = &w;
}
w.kicked = false;
while (!npp->shutdown && !w.kicked && !gpr_cv_wait(&w.cv, &npp->mu, deadline))
;
if (&w == npp->root) {
npp->root = w.next;
if (&w == npp->root) {
if (npp->shutdown) {
grpc_closure_sched(exec_ctx, npp->shutdown, GRPC_ERROR_NONE);
}
npp->root = NULL;
}
w.next->prev = w.prev;
w.prev->next = w.next;
}
gpr_cv_destroy(&w.cv);
*worker = NULL;
return GRPC_ERROR_NONE;
}
static grpc_error *non_polling_poller_kick(
grpc_pollset *pollset, grpc_pollset_worker *specific_worker) {
non_polling_poller *p = (non_polling_poller *)pollset;
if (specific_worker == NULL) specific_worker = (grpc_pollset_worker *)p->root;
if (specific_worker != NULL) {
non_polling_worker *w = (non_polling_worker *)specific_worker;
if (!w->kicked) {
w->kicked = true;
gpr_cv_signal(&w->cv);
}
}
return GRPC_ERROR_NONE;
}
static void non_polling_poller_shutdown(grpc_exec_ctx *exec_ctx,
grpc_pollset *pollset,
grpc_closure *closure) {
non_polling_poller *p = (non_polling_poller *)pollset;
GPR_ASSERT(closure != NULL);
p->shutdown = closure;
if (p->root == NULL) {
grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE);
} else {
non_polling_worker *w = p->root;
do {
gpr_cv_signal(&w->cv);
w = w->next;
} while (w != p->root);
}
}
static const cq_poller_vtable g_poller_vtable_by_poller_type[] = {
/* GRPC_CQ_DEFAULT_POLLING */
{.size = grpc_pollset_size,
.init = grpc_pollset_init,
.kick = grpc_pollset_kick,
.work = grpc_pollset_work,
.shutdown = grpc_pollset_shutdown,
.destroy = grpc_pollset_destroy},
/* GRPC_CQ_NON_LISTENING */
{.size = grpc_pollset_size,
.init = grpc_pollset_init,
.kick = grpc_pollset_kick,
.work = grpc_pollset_work,
.shutdown = grpc_pollset_shutdown,
.destroy = grpc_pollset_destroy},
/* GRPC_CQ_NON_POLLING */
{.size = non_polling_poller_size,
.init = non_polling_poller_init,
.kick = non_polling_poller_kick,
.work = non_polling_poller_work,
.shutdown = non_polling_poller_shutdown,
.destroy = non_polling_poller_destroy},
};
/* Completion queue structure */ /* Completion queue structure */
struct grpc_completion_queue { struct grpc_completion_queue {
/** owned by pollset */ /** owned by pollset */
gpr_mu *mu; gpr_mu *mu;
grpc_cq_completion_type completion_type; grpc_cq_completion_type completion_type;
grpc_cq_polling_type polling_type;
const cq_poller_vtable *poller_vtable;
/** completed events */ /** completed events */
grpc_cq_completion completed_head; grpc_cq_completion completed_head;
...@@ -127,15 +259,18 @@ grpc_completion_queue *grpc_completion_queue_create_internal( ...@@ -127,15 +259,18 @@ grpc_completion_queue *grpc_completion_queue_create_internal(
"polling_type=%d)", "polling_type=%d)",
2, (completion_type, polling_type)); 2, (completion_type, polling_type));
cc = gpr_zalloc(sizeof(grpc_completion_queue) + grpc_pollset_size()); const cq_poller_vtable *poller_vtable =
grpc_pollset_init(POLLSET_FROM_CQ(cc), &cc->mu); &g_poller_vtable_by_poller_type[polling_type];
cc = gpr_zalloc(sizeof(grpc_completion_queue) + poller_vtable->size());
poller_vtable->init(POLLSET_FROM_CQ(cc), &cc->mu);
#ifndef NDEBUG #ifndef NDEBUG
cc->outstanding_tags = NULL; cc->outstanding_tags = NULL;
cc->outstanding_tag_capacity = 0; cc->outstanding_tag_capacity = 0;
#endif #endif
cc->completion_type = completion_type; cc->completion_type = completion_type;
cc->polling_type = polling_type; cc->poller_vtable = poller_vtable;
/* Initial ref is dropped by grpc_completion_queue_shutdown */ /* Initial ref is dropped by grpc_completion_queue_shutdown */
gpr_ref_init(&cc->pending_events, 1); gpr_ref_init(&cc->pending_events, 1);
...@@ -164,10 +299,6 @@ grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc) { ...@@ -164,10 +299,6 @@ grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc) {
return cc->completion_type; return cc->completion_type;
} }
grpc_cq_polling_type grpc_get_cq_polling_type(grpc_completion_queue *cc) {
return cc->polling_type;
}
#ifdef GRPC_CQ_REF_COUNT_DEBUG #ifdef GRPC_CQ_REF_COUNT_DEBUG
void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason, void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason,
const char *file, int line) { const char *file, int line) {
...@@ -195,7 +326,7 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc) { ...@@ -195,7 +326,7 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc) {
#endif #endif
if (gpr_unref(&cc->owning_refs)) { if (gpr_unref(&cc->owning_refs)) {
GPR_ASSERT(cc->completed_head.next == (uintptr_t)&cc->completed_head); GPR_ASSERT(cc->completed_head.next == (uintptr_t)&cc->completed_head);
grpc_pollset_destroy(POLLSET_FROM_CQ(cc)); cc->poller_vtable->destroy(POLLSET_FROM_CQ(cc));
#ifndef NDEBUG #ifndef NDEBUG
gpr_free(cc->outstanding_tags); gpr_free(cc->outstanding_tags);
#endif #endif
...@@ -280,7 +411,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, ...@@ -280,7 +411,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
} }
} }
grpc_error *kick_error = grpc_error *kick_error =
grpc_pollset_kick(POLLSET_FROM_CQ(cc), pluck_worker); cc->poller_vtable->kick(POLLSET_FROM_CQ(cc), pluck_worker);
gpr_mu_unlock(cc->mu); gpr_mu_unlock(cc->mu);
if (kick_error != GRPC_ERROR_NONE) { if (kick_error != GRPC_ERROR_NONE) {
const char *msg = grpc_error_string(kick_error); const char *msg = grpc_error_string(kick_error);
...@@ -295,8 +426,8 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, ...@@ -295,8 +426,8 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
GPR_ASSERT(!cc->shutdown); GPR_ASSERT(!cc->shutdown);
GPR_ASSERT(cc->shutdown_called); GPR_ASSERT(cc->shutdown_called);
cc->shutdown = 1; cc->shutdown = 1;
grpc_pollset_shutdown(exec_ctx, POLLSET_FROM_CQ(cc), cc->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cc),
&cc->pollset_shutdown_done); &cc->pollset_shutdown_done);
gpr_mu_unlock(cc->mu); gpr_mu_unlock(cc->mu);
} }
...@@ -452,8 +583,8 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, ...@@ -452,8 +583,8 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
gpr_mu_lock(cc->mu); gpr_mu_lock(cc->mu);
continue; continue;
} else { } else {
grpc_error *err = grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), NULL, grpc_error *err = cc->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cc),
now, iteration_deadline); NULL, now, iteration_deadline);
if (err != GRPC_ERROR_NONE) { if (err != GRPC_ERROR_NONE) {
gpr_mu_unlock(cc->mu); gpr_mu_unlock(cc->mu);
const char *msg = grpc_error_string(err); const char *msg = grpc_error_string(err);
...@@ -644,8 +775,8 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, ...@@ -644,8 +775,8 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
grpc_exec_ctx_flush(&exec_ctx); grpc_exec_ctx_flush(&exec_ctx);
gpr_mu_lock(cc->mu); gpr_mu_lock(cc->mu);
} else { } else {
grpc_error *err = grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), grpc_error *err = cc->poller_vtable->work(
&worker, now, iteration_deadline); &exec_ctx, POLLSET_FROM_CQ(cc), &worker, now, iteration_deadline);
if (err != GRPC_ERROR_NONE) { if (err != GRPC_ERROR_NONE) {
del_plucker(cc, tag, &worker); del_plucker(cc, tag, &worker);
gpr_mu_unlock(cc->mu); gpr_mu_unlock(cc->mu);
...@@ -689,8 +820,8 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) { ...@@ -689,8 +820,8 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
if (gpr_unref(&cc->pending_events)) { if (gpr_unref(&cc->pending_events)) {
GPR_ASSERT(!cc->shutdown); GPR_ASSERT(!cc->shutdown);
cc->shutdown = 1; cc->shutdown = 1;
grpc_pollset_shutdown(&exec_ctx, POLLSET_FROM_CQ(cc), cc->poller_vtable->shutdown(&exec_ctx, POLLSET_FROM_CQ(cc),
&cc->pollset_shutdown_done); &cc->pollset_shutdown_done);
} }
gpr_mu_unlock(cc->mu); gpr_mu_unlock(cc->mu);
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx);
......
...@@ -100,7 +100,6 @@ void grpc_cq_mark_server_cq(grpc_completion_queue *cc); ...@@ -100,7 +100,6 @@ void grpc_cq_mark_server_cq(grpc_completion_queue *cc);
int grpc_cq_is_server_cq(grpc_completion_queue *cc); int grpc_cq_is_server_cq(grpc_completion_queue *cc);
grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc); grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc);
grpc_cq_polling_type grpc_get_cq_polling_type(grpc_completion_queue *cc);
grpc_completion_queue *grpc_completion_queue_create_internal( grpc_completion_queue *grpc_completion_queue_create_internal(
grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type); grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment