diff --git a/src/core/eventmanager/em.c b/src/core/eventmanager/em.c index 36f3720e0dadecfe844990aecda88165dbc6bca5..0dc6c6a6d0d7d391a2e38dfbec50896d447e0d87 100644 --- a/src/core/eventmanager/em.c +++ b/src/core/eventmanager/em.c @@ -46,12 +46,33 @@ int evthread_use_threads(void); +static void grpc_em_fd_impl_destroy(struct grpc_em_fd_impl *impl); + #define ALARM_TRIGGER_INIT ((gpr_atm)0) #define ALARM_TRIGGER_INCREMENT ((gpr_atm)1) #define DONE_SHUTDOWN ((void *)1) #define POLLER_ID_INVALID ((gpr_atm)-1) +typedef struct grpc_em_fd_impl { + grpc_em_task task; /* Base class, callbacks, queues, etc */ + int fd; /* File descriptor */ + + /* Note that the shutdown event is only needed as a workaround for libevent + not properly handling event_active on an in flight event. */ + struct event *shutdown_ev; /* activated to trigger shutdown */ + + /* protect shutdown_started|read_state|write_state and ensure barriers + between notify_on_[read|write] and read|write callbacks */ + gpr_mu mu; + int shutdown_started; /* 0 -> shutdown not started, 1 -> started */ + grpc_em_fd_state read_state; + grpc_em_fd_state write_state; + + /* descriptor delete list. These are destroyed during polling. */ + struct grpc_em_fd_impl *next; +} grpc_em_fd_impl; + /* ================== grpc_em implementation ===================== */ /* If anything is in the work queue, process one item and return 1. @@ -83,7 +104,18 @@ static void timer_callback(int fd, short events, void *context) { event_base_loopbreak((struct event_base *)context); } -/* Spend some time polling if no other thread is. +static void free_fd_list(grpc_em_fd_impl *impl) { + while (impl != NULL) { + grpc_em_fd_impl *current = impl; + impl = impl->next; + grpc_em_fd_impl_destroy(current); + gpr_free(current); + } +} + +/* Spend some time doing polling and libevent maintenance work if no other + thread is. This includes both polling for events and destroying/closing file + descriptor objects. Returns 1 if polling was performed, 0 otherwise. Requires em->mu locked, may unlock and relock during the call. */ static int maybe_do_polling_work(grpc_em *em, struct timeval delay) { @@ -92,6 +124,10 @@ static int maybe_do_polling_work(grpc_em *em, struct timeval delay) { if (em->num_pollers) return 0; em->num_pollers = 1; + + free_fd_list(em->fds_to_free); + em->fds_to_free = NULL; + gpr_mu_unlock(&em->mu); event_add(em->timeout_ev, &delay); @@ -102,6 +138,11 @@ static int maybe_do_polling_work(grpc_em *em, struct timeval delay) { event_del(em->timeout_ev); gpr_mu_lock(&em->mu); + if (em->fds_to_free) { + free_fd_list(em->fds_to_free); + em->fds_to_free = NULL; + } + em->num_pollers = 0; gpr_cv_broadcast(&em->cv); return 1; @@ -191,6 +232,7 @@ grpc_em_error grpc_em_init(grpc_em *em) { em->num_fds = 0; em->last_poll_completed = gpr_now(); em->shutdown_backup_poller = 0; + em->fds_to_free = NULL; gpr_event_init(&em->backup_poller_done); @@ -247,6 +289,8 @@ grpc_em_error grpc_em_destroy(grpc_em *em) { ; gpr_mu_unlock(&em->mu); + free_fd_list(em->fds_to_free); + /* complete shutdown */ gpr_mu_destroy(&em->mu); gpr_cv_destroy(&em->cv); @@ -284,6 +328,7 @@ static void add_task(grpc_em *em, grpc_em_activation_data *adata) { static void alarm_ev_destroy(grpc_em_alarm *alarm) { grpc_em_activation_data *adata = &alarm->task.activation[GRPC_EM_TA_ONLY]; if (adata->ev != NULL) { + /* TODO(klempner): Is this safe to do when we're cancelling? */ event_free(adata->ev); adata->ev = NULL; } @@ -368,16 +413,14 @@ grpc_em_error grpc_em_alarm_cancel(grpc_em_alarm *alarm) { /* ==================== grpc_em_fd implementation =================== */ /* Proxy callback to call a gRPC read/write callback */ -static void em_fd_cb(int fd, short what, void *arg /*=em_fd*/) { - grpc_em_fd *em_fd = arg; +static void em_fd_cb(int fd, short what, void *arg /*=em_fd_impl*/) { + grpc_em_fd_impl *em_fd = arg; grpc_em_cb_status status = GRPC_CALLBACK_SUCCESS; int run_read_cb = 0; int run_write_cb = 0; grpc_em_activation_data *rdata, *wdata; gpr_mu_lock(&em_fd->mu); - /* TODO(klempner): We need to delete the event here too so we avoid spurious - shutdowns. */ if (em_fd->shutdown_started) { status = GRPC_CALLBACK_CANCELLED; } else if (status == GRPC_CALLBACK_SUCCESS && (what & EV_TIMEOUT)) { @@ -428,28 +471,32 @@ static void em_fd_shutdown_cb(int fd, short what, void *arg /*=em_fd*/) { that libevent's handling of event_active() on an event which is already in flight on a different thread is racy and easily triggers TSAN. */ - grpc_em_fd *em_fd = arg; - gpr_mu_lock(&em_fd->mu); - em_fd->shutdown_started = 1; - if (em_fd->read_state == GRPC_EM_FD_WAITING) { - event_active(em_fd->task.activation[GRPC_EM_TA_READ].ev, EV_READ, 1); + grpc_em_fd_impl *impl = arg; + gpr_mu_lock(&impl->mu); + impl->shutdown_started = 1; + if (impl->read_state == GRPC_EM_FD_WAITING) { + event_active(impl->task.activation[GRPC_EM_TA_READ].ev, EV_READ, 1); } - if (em_fd->write_state == GRPC_EM_FD_WAITING) { - event_active(em_fd->task.activation[GRPC_EM_TA_WRITE].ev, EV_WRITE, 1); + if (impl->write_state == GRPC_EM_FD_WAITING) { + event_active(impl->task.activation[GRPC_EM_TA_WRITE].ev, EV_WRITE, 1); } - gpr_mu_unlock(&em_fd->mu); + gpr_mu_unlock(&impl->mu); } grpc_em_error grpc_em_fd_init(grpc_em_fd *em_fd, grpc_em *em, int fd) { int flags; grpc_em_activation_data *rdata, *wdata; + grpc_em_fd_impl *impl = gpr_malloc(sizeof(grpc_em_fd_impl)); gpr_mu_lock(&em->mu); em->num_fds++; + gpr_mu_unlock(&em->mu); - em_fd->shutdown_ev = NULL; - gpr_mu_init(&em_fd->mu); + em_fd->impl = impl; + + impl->shutdown_ev = NULL; + gpr_mu_init(&impl->mu); flags = fcntl(fd, F_GETFL, 0); if ((flags & O_NONBLOCK) == 0) { @@ -457,11 +504,11 @@ grpc_em_error grpc_em_fd_init(grpc_em_fd *em_fd, grpc_em *em, int fd) { return GRPC_EM_INVALID_ARGUMENTS; } - em_fd->task.type = GRPC_EM_TASK_FD; - em_fd->task.em = em; - em_fd->fd = fd; + impl->task.type = GRPC_EM_TASK_FD; + impl->task.em = em; + impl->fd = fd; - rdata = &(em_fd->task.activation[GRPC_EM_TA_READ]); + rdata = &(impl->task.activation[GRPC_EM_TA_READ]); rdata->ev = NULL; rdata->cb = NULL; rdata->arg = NULL; @@ -469,7 +516,7 @@ grpc_em_error grpc_em_fd_init(grpc_em_fd *em_fd, grpc_em *em, int fd) { rdata->prev = NULL; rdata->next = NULL; - wdata = &(em_fd->task.activation[GRPC_EM_TA_WRITE]); + wdata = &(impl->task.activation[GRPC_EM_TA_WRITE]); wdata->ev = NULL; wdata->cb = NULL; wdata->arg = NULL; @@ -477,49 +524,45 @@ grpc_em_error grpc_em_fd_init(grpc_em_fd *em_fd, grpc_em *em, int fd) { wdata->prev = NULL; wdata->next = NULL; - em_fd->read_state = GRPC_EM_FD_IDLE; - em_fd->write_state = GRPC_EM_FD_IDLE; + impl->read_state = GRPC_EM_FD_IDLE; + impl->write_state = GRPC_EM_FD_IDLE; + + impl->shutdown_started = 0; + impl->next = NULL; /* TODO(chenw): detect platforms where only level trigger is supported, and set the event to non-persist. */ - rdata->ev = event_new(em->event_base, em_fd->fd, EV_ET | EV_PERSIST | EV_READ, - em_fd_cb, em_fd); + rdata->ev = event_new(em->event_base, impl->fd, EV_ET | EV_PERSIST | EV_READ, + em_fd_cb, impl); if (!rdata->ev) { gpr_log(GPR_ERROR, "Failed to create read event"); return GRPC_EM_ERROR; } - wdata->ev = event_new(em->event_base, em_fd->fd, - EV_ET | EV_PERSIST | EV_WRITE, em_fd_cb, em_fd); + wdata->ev = event_new(em->event_base, impl->fd, EV_ET | EV_PERSIST | EV_WRITE, + em_fd_cb, impl); if (!wdata->ev) { gpr_log(GPR_ERROR, "Failed to create write event"); return GRPC_EM_ERROR; } - em_fd->shutdown_ev = - event_new(em->event_base, -1, EV_READ, em_fd_shutdown_cb, em_fd); + impl->shutdown_ev = + event_new(em->event_base, -1, EV_READ, em_fd_shutdown_cb, impl); - if (!em_fd->shutdown_ev) { + if (!impl->shutdown_ev) { gpr_log(GPR_ERROR, "Failed to create shutdown event"); return GRPC_EM_ERROR; } - em_fd->shutdown_started = 0; return GRPC_EM_OK; } -void grpc_em_fd_destroy(grpc_em_fd *em_fd) { +static void grpc_em_fd_impl_destroy(grpc_em_fd_impl *impl) { grpc_em_task_activity_type type; grpc_em_activation_data *adata; - grpc_em *em = em_fd->task.em; - - /* ensure anyone holding the lock has left - it's the callers responsibility - to ensure that no new users enter */ - gpr_mu_lock(&em_fd->mu); - gpr_mu_unlock(&em_fd->mu); for (type = GRPC_EM_TA_READ; type < GRPC_EM_TA_COUNT; type++) { - adata = &(em_fd->task.activation[type]); + adata = &(impl->task.activation[type]); GPR_ASSERT(adata->next == NULL); if (adata->ev != NULL) { event_free(adata->ev); @@ -527,24 +570,43 @@ void grpc_em_fd_destroy(grpc_em_fd *em_fd) { } } - if (em_fd->shutdown_ev != NULL) { - event_free(em_fd->shutdown_ev); - em_fd->shutdown_ev = NULL; + if (impl->shutdown_ev != NULL) { + event_free(impl->shutdown_ev); + impl->shutdown_ev = NULL; } - gpr_mu_destroy(&em_fd->mu); + gpr_mu_destroy(&impl->mu); + close(impl->fd); +} + +void grpc_em_fd_destroy(grpc_em_fd *em_fd) { + grpc_em_fd_impl *impl = em_fd->impl; + grpc_em *em = impl->task.em; gpr_mu_lock(&em->mu); + + if (em->num_pollers == 0) { + /* it is safe to simply free it */ + grpc_em_fd_impl_destroy(impl); + gpr_free(impl); + } else { + /* Put the impl on the list to be destroyed by the poller. */ + impl->next = em->fds_to_free; + em->fds_to_free = impl; + /* Kick the poller so it closes the fd promptly. + * TODO(klempner): maybe this should be a different event. + */ + event_active(em_fd->impl->shutdown_ev, EV_READ, 1); + } + em->num_fds--; gpr_cv_broadcast(&em->cv); gpr_mu_unlock(&em->mu); - - close(em_fd->fd); } -int grpc_em_fd_get(struct grpc_em_fd *em_fd) { return em_fd->fd; } +int grpc_em_fd_get(struct grpc_em_fd *em_fd) { return em_fd->impl->fd; } /* Returns the event manager associated with *em_fd. */ -grpc_em *grpc_em_fd_get_em(grpc_em_fd *em_fd) { return em_fd->task.em; } +grpc_em *grpc_em_fd_get_em(grpc_em_fd *em_fd) { return em_fd->impl->task.em; } /* TODO(chenw): should we enforce the contract that notify_on_read cannot be called when the previously registered callback has not been called yet. */ @@ -552,6 +614,7 @@ grpc_em_error grpc_em_fd_notify_on_read(grpc_em_fd *em_fd, grpc_em_cb_func read_cb, void *read_cb_arg, gpr_timespec deadline) { + grpc_em_fd_impl *impl = em_fd->impl; int force_event = 0; grpc_em_activation_data *rdata; grpc_em_error result = GRPC_EM_OK; @@ -560,22 +623,22 @@ grpc_em_error grpc_em_fd_notify_on_read(grpc_em_fd *em_fd, struct timeval *delayp = gpr_time_cmp(deadline, gpr_inf_future) ? &delay : NULL; - rdata = &em_fd->task.activation[GRPC_EM_TA_READ]; + rdata = &impl->task.activation[GRPC_EM_TA_READ]; - gpr_mu_lock(&em_fd->mu); + gpr_mu_lock(&impl->mu); rdata->cb = read_cb; rdata->arg = read_cb_arg; force_event = - (em_fd->shutdown_started || em_fd->read_state == GRPC_EM_FD_CACHED); - em_fd->read_state = GRPC_EM_FD_WAITING; + (impl->shutdown_started || impl->read_state == GRPC_EM_FD_CACHED); + impl->read_state = GRPC_EM_FD_WAITING; if (force_event) { event_active(rdata->ev, EV_READ, 1); } else if (event_add(rdata->ev, delayp) == -1) { result = GRPC_EM_ERROR; } - gpr_mu_unlock(&em_fd->mu); + gpr_mu_unlock(&impl->mu); return result; } @@ -583,6 +646,7 @@ grpc_em_error grpc_em_fd_notify_on_write(grpc_em_fd *em_fd, grpc_em_cb_func write_cb, void *write_cb_arg, gpr_timespec deadline) { + grpc_em_fd_impl *impl = em_fd->impl; int force_event = 0; grpc_em_activation_data *wdata; grpc_em_error result = GRPC_EM_OK; @@ -591,27 +655,27 @@ grpc_em_error grpc_em_fd_notify_on_write(grpc_em_fd *em_fd, struct timeval *delayp = gpr_time_cmp(deadline, gpr_inf_future) ? &delay : NULL; - wdata = &em_fd->task.activation[GRPC_EM_TA_WRITE]; + wdata = &impl->task.activation[GRPC_EM_TA_WRITE]; - gpr_mu_lock(&em_fd->mu); + gpr_mu_lock(&impl->mu); wdata->cb = write_cb; wdata->arg = write_cb_arg; force_event = - (em_fd->shutdown_started || em_fd->write_state == GRPC_EM_FD_CACHED); - em_fd->write_state = GRPC_EM_FD_WAITING; + (impl->shutdown_started || impl->write_state == GRPC_EM_FD_CACHED); + impl->write_state = GRPC_EM_FD_WAITING; if (force_event) { event_active(wdata->ev, EV_WRITE, 1); } else if (event_add(wdata->ev, delayp) == -1) { result = GRPC_EM_ERROR; } - gpr_mu_unlock(&em_fd->mu); + gpr_mu_unlock(&impl->mu); return result; } void grpc_em_fd_shutdown(grpc_em_fd *em_fd) { - event_active(em_fd->shutdown_ev, EV_READ, 1); + event_active(em_fd->impl->shutdown_ev, EV_READ, 1); } /*====================== Other callback functions ======================*/ diff --git a/src/core/eventmanager/em.h b/src/core/eventmanager/em.h index aa439d1112adc8719f774ee6ffe318f51d7a50d9..f190bc8743e559652510ba6f27645bc57222d78a 100644 --- a/src/core/eventmanager/em.h +++ b/src/core/eventmanager/em.h @@ -204,6 +204,7 @@ grpc_em_error grpc_em_add_callback(grpc_em *em, grpc_em_cb_func cb, /* Forward declarations */ struct grpc_em_activation_data; +struct grpc_em_fd_impl; /* ================== Actual structure definitions ========================= */ /* gRPC event manager handle. @@ -223,6 +224,8 @@ struct grpc_em { int shutdown_backup_poller; gpr_event backup_poller_done; + struct grpc_em_fd_impl *fds_to_free; + struct event *timeout_ev; /* activated to break out of the event loop early */ }; @@ -330,23 +333,12 @@ typedef enum grpc_em_fd_state { GRPC_EM_FD_CACHED = 2 } grpc_em_fd_state; +struct grpc_em_fd_impl; + /* gRPC file descriptor handle. The handle is used to register read/write callbacks to a file descriptor */ struct grpc_em_fd { - grpc_em_task task; /* Base class, callbacks, queues, etc */ - int fd; /* File descriptor */ - - /* Note that the shutdown event is only needed as a workaround for libevent - not properly handling event_active on an in flight event. */ - struct event *shutdown_ev; /* activated to trigger shutdown */ - - /* protect shutdown_started|read_state|write_state and ensure barriers - between notify_on_[read|write] and read|write callbacks */ - gpr_mu mu; - int shutdown_started; /* 0 -> shutdown not started, 1 -> started */ - grpc_em_fd_state read_state; - grpc_em_fd_state write_state; - /* activated after some timeout to activate shutdown_ev */ + struct grpc_em_fd_impl *impl; }; #endif /* __GRPC_INTERNAL_EVENTMANAGER_EM_H__ */ diff --git a/test/core/end2end/dualstack_socket_test.c b/test/core/end2end/dualstack_socket_test.c index fd679661a1cca83d43d20d39c582fe603d19d75e..4813672104a3371bba818f57699c87df3c9370f3 100644 --- a/test/core/end2end/dualstack_socket_test.c +++ b/test/core/end2end/dualstack_socket_test.c @@ -165,6 +165,11 @@ void test_connect(const char *server_host, const char *client_host, int port, grpc_completion_queue_shutdown(server_cq); drain_cq(server_cq); grpc_completion_queue_destroy(server_cq); + /* TODO(klempner): We need to give the EM time to actually close the listening + socket, or later tests will fail to bind to this port. We should fix this + by adding an API to EM to get notified when this happens and having it + prevent listener teardown. */ + gpr_sleep_until(gpr_time_add(gpr_now(), gpr_time_from_millis(250))); } int main(int argc, char **argv) {