Skip to content
Snippets Groups Projects
Commit d1885e0b authored by Craig Tiller's avatar Craig Tiller
Browse files

Implement new pollset semantics for Windows

parent 06aeac64
No related branches found
No related tags found
No related merge requests found
......@@ -42,6 +42,38 @@
#include "src/core/iomgr/pollset.h"
#include "src/core/iomgr/pollset_windows.h"
static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
worker->prev->next = worker->next;
worker->next->prev = worker->prev;
}
static int has_workers(grpc_pollset *p) {
return p->root_worker.next != &p->root_worker;
}
static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
if (has_workers(p)) {
grpc_pollset_worker *w = p->root_worker.next;
remove_worker(p, w);
return w;
}
else {
return NULL;
}
}
static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
worker->next = &p->root_worker;
worker->prev = worker->next->prev;
worker->prev->next = worker->next->prev = worker;
}
static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
worker->prev = &p->root_worker;
worker->next = worker->prev->next;
worker->prev->next = worker->next->prev = worker;
}
/* There isn't really any such thing as a pollset under Windows, due to the
nature of the IO completion ports. We're still going to provide a minimal
set of features for the sake of the rest of grpc. But grpc_pollset_work
......@@ -50,7 +82,8 @@
void grpc_pollset_init(grpc_pollset *pollset) {
memset(pollset, 0, sizeof(*pollset));
gpr_mu_init(&pollset->mu);
gpr_cv_init(&pollset->cv);
pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
pollset->kicked_without_pollers = 0;
}
void grpc_pollset_shutdown(grpc_pollset *pollset,
......@@ -58,34 +91,66 @@ void grpc_pollset_shutdown(grpc_pollset *pollset,
void *shutdown_done_arg) {
gpr_mu_lock(&pollset->mu);
pollset->shutting_down = 1;
gpr_cv_broadcast(&pollset->cv);
grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
gpr_mu_unlock(&pollset->mu);
shutdown_done(shutdown_done_arg);
}
void grpc_pollset_destroy(grpc_pollset *pollset) {
gpr_mu_destroy(&pollset->mu);
gpr_cv_destroy(&pollset->cv);
}
int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) {
int grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec deadline) {
gpr_timespec now;
int added_worker = 0;
now = gpr_now(GPR_CLOCK_MONOTONIC);
if (gpr_time_cmp(now, deadline) > 0) {
return 0 /* GPR_FALSE */;
}
worker->next = worker->prev = NULL;
gpr_cv_init(&worker->cv);
if (grpc_maybe_call_delayed_callbacks(&pollset->mu, 1 /* GPR_TRUE */)) {
return 1 /* GPR_TRUE */;
goto done;
}
if (grpc_alarm_check(&pollset->mu, now, &deadline)) {
return 1 /* GPR_TRUE */;
goto done;
}
if (!pollset->shutting_down) {
gpr_cv_wait(&pollset->cv, &pollset->mu, deadline);
if (!pollset->kicked_without_pollers && !pollset->shutting_down) {
push_front_worker(pollset, worker);
added_worker = 1;
gpr_cv_wait(&worker->cv, &pollset->mu, deadline);
} else {
pollset->kicked_without_pollers = 0;
}
done:
gpr_cv_destroy(&worker->cv);
if (added_worker) {
remove_worker(pollset, worker);
}
return 1 /* GPR_TRUE */;
}
void grpc_pollset_kick(grpc_pollset *p) { gpr_cv_signal(&p->cv); }
void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) {
if (specific_worker != NULL) {
if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
for (specific_worker = p->root_worker.next;
specific_worker != &p->root_worker;
specific_worker = specific_worker->next) {
gpr_cv_signal(&specific_worker->cv);
}
p->kicked_without_pollers = 1;
} else {
gpr_cv_signal(&specific_worker->cv);
}
} else {
specific_worker = pop_front_worker(p);
if (specific_worker != NULL) {
push_back_worker(p, specific_worker);
gpr_cv_signal(&specific_worker->cv);
} else {
p->kicked_without_pollers = 1;
}
}
}
#endif /* GPR_WINSOCK_SOCKET */
......@@ -42,10 +42,17 @@
nature of the IO completion ports. A Windows "pollset" is merely a mutex
and a condition variable, used to synchronize with the IOCP. */
typedef struct grpc_pollset_worker {
gpr_cv cv;
struct grpc_pollset_worker *next;
struct grpc_pollset_worker *prev;
} grpc_pollset_worker;
typedef struct grpc_pollset {
gpr_mu mu;
gpr_cv cv;
int shutting_down;
int kicked_without_pollers;
grpc_pollset_worker root_worker;
} grpc_pollset;
#define GRPC_POLLSET_MU(pollset) (&(pollset)->mu)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment