diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index 4f52339bc14d719a4486ae83d38866acb6105fb6..41fd24e05a853f6045923d21d0145b4aa9c36c56 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -45,7 +45,10 @@ #include <grpc/support/log.h> #include <grpc/support/useful.h> -enum descriptor_state { NOT_READY, READY, WAITING }; +enum descriptor_state { + NOT_READY = 0, + READY = 1 +}; /* or a pointer to a closure to call */ /* We need to keep a freelist not because of any concerns of malloc performance * but instead so that implementations with multiple threads in (for example) @@ -88,8 +91,8 @@ static grpc_fd *alloc_fd(int fd) { gpr_mu_init(&r->watcher_mu); } gpr_atm_rel_store(&r->refst, 1); - gpr_atm_rel_store(&r->readst.state, NOT_READY); - gpr_atm_rel_store(&r->writest.state, NOT_READY); + gpr_atm_rel_store(&r->readst, NOT_READY); + gpr_atm_rel_store(&r->writest, NOT_READY); gpr_atm_rel_store(&r->shutdown, 0); r->fd = fd; r->watcher_root.next = r->watcher_root.prev = &r->watcher_root; @@ -166,11 +169,6 @@ void grpc_fd_ref(grpc_fd *fd) { ref_by(fd, 2); } void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); } -typedef struct { - grpc_iomgr_cb_func cb; - void *arg; -} callback; - static void make_callback(grpc_iomgr_cb_func cb, void *arg, int success, int allow_synchronous_callback) { if (allow_synchronous_callback) { @@ -180,18 +178,18 @@ static void make_callback(grpc_iomgr_cb_func cb, void *arg, int success, } } -static void make_callbacks(callback *callbacks, size_t n, int success, +static void make_callbacks(grpc_iomgr_closure *callbacks, size_t n, int success, int allow_synchronous_callback) { size_t i; for (i = 0; i < n; i++) { - make_callback(callbacks[i].cb, callbacks[i].arg, success, + make_callback(callbacks[i].cb, callbacks[i].cb_arg, success, allow_synchronous_callback); } } -static void notify_on(grpc_fd *fd, grpc_fd_state *st, grpc_iomgr_cb_func cb, - void *arg, int allow_synchronous_callback) { - switch ((enum descriptor_state)gpr_atm_acq_load(&st->state)) { +static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_iomgr_closure *closure, + int allow_synchronous_callback) { + switch (gpr_atm_acq_load(st)) { case NOT_READY: /* There is no race if the descriptor is already ready, so we skip the interlocked op in that case. As long as the app doesn't @@ -199,9 +197,7 @@ static void notify_on(grpc_fd *fd, grpc_fd_state *st, grpc_iomgr_cb_func cb, oldval should never be anything other than READY or NOT_READY. We don't check for user error on the fast path. */ - st->cb = cb; - st->cb_arg = arg; - if (gpr_atm_rel_cas(&st->state, NOT_READY, WAITING)) { + if (gpr_atm_rel_cas(st, NOT_READY, (gpr_intptr)closure)) { /* swap was successful -- the closure will run after the next set_ready call. NOTE: we don't have an ABA problem here, since we should never have concurrent calls to the same @@ -212,12 +208,13 @@ static void notify_on(grpc_fd *fd, grpc_fd_state *st, grpc_iomgr_cb_func cb, /* swap was unsuccessful due to an intervening set_ready call. Fall through to the READY code below */ case READY: - assert(gpr_atm_acq_load(&st->state) == READY); - gpr_atm_rel_store(&st->state, NOT_READY); - make_callback(cb, arg, !gpr_atm_acq_load(&fd->shutdown), + assert(gpr_atm_acq_load(st) == READY); + gpr_atm_rel_store(st, NOT_READY); + make_callback(closure->cb, closure->cb_arg, + !gpr_atm_acq_load(&fd->shutdown), allow_synchronous_callback); return; - case WAITING: + default: /* WAITING */ /* upcallptr was set to a different closure. This is an error! */ gpr_log(GPR_ERROR, "User called a notify_on function with a previous callback still " @@ -228,38 +225,38 @@ static void notify_on(grpc_fd *fd, grpc_fd_state *st, grpc_iomgr_cb_func cb, abort(); } -static void set_ready_locked(grpc_fd_state *st, callback *callbacks, +static void set_ready_locked(gpr_atm *st, grpc_iomgr_closure *callbacks, size_t *ncallbacks) { - callback *c; + gpr_intptr state = gpr_atm_acq_load(st); - switch ((enum descriptor_state)gpr_atm_acq_load(&st->state)) { + switch (state) { + case READY: + /* duplicate ready, ignore */ + return; case NOT_READY: - if (gpr_atm_rel_cas(&st->state, NOT_READY, READY)) { + if (gpr_atm_rel_cas(st, NOT_READY, READY)) { /* swap was successful -- the closure will run after the next notify_on call. */ return; } - /* swap was unsuccessful due to an intervening set_ready call. - Fall through to the WAITING code below */ - case WAITING: - assert(gpr_atm_acq_load(&st->state) == WAITING); - c = &callbacks[(*ncallbacks)++]; - c->cb = st->cb; - c->arg = st->cb_arg; - gpr_atm_rel_store(&st->state, NOT_READY); - return; - case READY: - /* duplicate ready, ignore */ + /* swap was unsuccessful due to an intervening set_ready call. + Fall through to the WAITING code below */ + state = gpr_atm_acq_load(st); + default: /* waiting */ + assert(gpr_atm_acq_load(st) != READY && + gpr_atm_acq_load(st) != NOT_READY); + callbacks[(*ncallbacks)++] = *(grpc_iomgr_closure *)state; + gpr_atm_rel_store(st, NOT_READY); return; } } -static void set_ready(grpc_fd *fd, grpc_fd_state *st, +static void set_ready(grpc_fd *fd, gpr_atm *st, int allow_synchronous_callback) { /* only one set_ready can be active at once (but there may be a racing notify_on) */ int success; - callback cb; + grpc_iomgr_closure cb; size_t ncb = 0; gpr_mu_lock(&fd->set_state_mu); set_ready_locked(st, &cb, &ncb); @@ -269,7 +266,7 @@ static void set_ready(grpc_fd *fd, grpc_fd_state *st, } void grpc_fd_shutdown(grpc_fd *fd) { - callback cb[2]; + grpc_iomgr_closure cb[2]; size_t ncb = 0; gpr_mu_lock(&fd->set_state_mu); GPR_ASSERT(!gpr_atm_acq_load(&fd->shutdown)); @@ -280,14 +277,12 @@ void grpc_fd_shutdown(grpc_fd *fd) { make_callbacks(cb, ncb, 0, 0); } -void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_cb_func read_cb, - void *read_cb_arg) { - notify_on(fd, &fd->readst, read_cb, read_cb_arg, 0); +void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_closure *closure) { + notify_on(fd, &fd->readst, closure, 0); } -void grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_cb_func write_cb, - void *write_cb_arg) { - notify_on(fd, &fd->writest, write_cb, write_cb_arg, 0); +void grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_closure *closure) { + notify_on(fd, &fd->writest, closure, 0); } gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, @@ -305,8 +300,8 @@ gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, watcher->fd = fd; gpr_mu_unlock(&fd->watcher_mu); - return (gpr_atm_acq_load(&fd->readst.state) != READY ? read_mask : 0) | - (gpr_atm_acq_load(&fd->writest.state) != READY ? write_mask : 0); + return (gpr_atm_acq_load(&fd->readst) != READY ? read_mask : 0) | + (gpr_atm_acq_load(&fd->writest) != READY ? write_mask : 0); } void grpc_fd_end_poll(grpc_fd_watcher *watcher) { diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h index 370ab1345a04e10ee2c2dfed8f06073aa0cf6e98..2a308c8ae20190363a5f906b86ceb577d40f2526 100644 --- a/src/core/iomgr/fd_posix.h +++ b/src/core/iomgr/fd_posix.h @@ -43,9 +43,7 @@ typedef struct { grpc_iomgr_cb_func cb; void *cb_arg; - int success; - gpr_atm state; -} grpc_fd_state; +} grpc_iomgr_closure; typedef struct grpc_fd grpc_fd; @@ -71,8 +69,8 @@ struct grpc_fd { gpr_mu watcher_mu; grpc_fd_watcher watcher_root; - grpc_fd_state readst; - grpc_fd_state writest; + gpr_atm readst; + gpr_atm writest; grpc_iomgr_cb_func on_done; void *on_done_user_data; @@ -126,12 +124,10 @@ void grpc_fd_shutdown(grpc_fd *fd); underlying platform. This means that users must drain fd in read_cb before calling notify_on_read again. Users are also expected to handle spurious events, i.e read_cb is called while nothing can be readable from fd */ -void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_cb_func read_cb, - void *read_cb_arg); +void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_closure *closure); /* Exactly the same semantics as above, except based on writable events. */ -void grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_cb_func write_cb, - void *write_cb_arg); +void grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_closure *closure); /* Notification from the poller to an fd that it has become readable or writable. diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c index 137aa99c7b101f9d165b712b7e9544e40483386e..e20cc3d1b2ed7e829def445d79db1b4e02b5171f 100644 --- a/src/core/iomgr/tcp_client_posix.c +++ b/src/core/iomgr/tcp_client_posix.c @@ -60,6 +60,7 @@ typedef struct { gpr_timespec deadline; grpc_alarm alarm; int refs; + grpc_iomgr_closure write_closure; } async_connect; static int prepare_socket(const struct sockaddr *addr, int fd) { @@ -136,7 +137,7 @@ static void on_writable(void *acp, int success) { opened too many network connections. The "easy" fix: don't do that! */ gpr_log(GPR_ERROR, "kernel out of buffers"); - grpc_fd_notify_on_write(ac->fd, on_writable, ac); + grpc_fd_notify_on_write(ac->fd, &ac->write_closure); return; } else { switch (so_error) { @@ -229,9 +230,11 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), ac->fd = grpc_fd_create(fd); gpr_mu_init(&ac->mu); ac->refs = 2; + ac->write_closure.cb = on_writable; + ac->write_closure.cb_arg = ac; grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac, gpr_now()); - grpc_fd_notify_on_write(ac->fd, on_writable, ac); + grpc_fd_notify_on_write(ac->fd, &ac->write_closure); } #endif diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index 150a907cb105fe9b13b4b523ce96a3fc4837a749..34eefc126bf893c30fd3b083d3a5c8eb2bae9af8 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -263,6 +263,9 @@ typedef struct { void *write_user_data; grpc_tcp_slice_state write_state; + + grpc_iomgr_closure read_closure; + grpc_iomgr_closure write_closure; } grpc_tcp; static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success); @@ -370,7 +373,7 @@ static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) { } else { /* Spurious read event, consume it here */ slice_state_destroy(&read_state); - grpc_fd_notify_on_read(tcp->em_fd, grpc_tcp_handle_read, tcp); + grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure); } } else { /* TODO(klempner): Log interesting errors */ @@ -405,7 +408,7 @@ static void grpc_tcp_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, tcp->read_cb = cb; tcp->read_user_data = user_data; gpr_ref(&tcp->refcount); - grpc_fd_notify_on_read(tcp->em_fd, grpc_tcp_handle_read, tcp); + grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure); } #define MAX_WRITE_IOVEC 16 @@ -468,7 +471,7 @@ static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, int success) { write_status = grpc_tcp_flush(tcp); if (write_status == GRPC_ENDPOINT_WRITE_PENDING) { - grpc_fd_notify_on_write(tcp->em_fd, grpc_tcp_handle_write, tcp); + grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_closure); } else { slice_state_destroy(&tcp->write_state); if (write_status == GRPC_ENDPOINT_WRITE_DONE) { @@ -513,7 +516,7 @@ static grpc_endpoint_write_status grpc_tcp_write(grpc_endpoint *ep, gpr_ref(&tcp->refcount); tcp->write_cb = cb; tcp->write_user_data = user_data; - grpc_fd_notify_on_write(tcp->em_fd, grpc_tcp_handle_write, tcp); + grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_closure); } return status; @@ -541,6 +544,10 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size) { /* paired with unref in grpc_tcp_destroy */ gpr_ref_init(&tcp->refcount, 1); tcp->em_fd = em_fd; + tcp->read_closure.cb = grpc_tcp_handle_read; + tcp->read_closure.cb_arg = tcp; + tcp->write_closure.cb = grpc_tcp_handle_write; + tcp->write_closure.cb_arg = tcp; return &tcp->base; } diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c index b7a06259497f20780f9a05e03022c881c96bd1d8..90b7eb451d4159cbdac680af6558e61d51d8d457 100644 --- a/src/core/iomgr/tcp_server_posix.c +++ b/src/core/iomgr/tcp_server_posix.c @@ -82,6 +82,7 @@ typedef struct { struct sockaddr_un un; } addr; int addr_len; + grpc_iomgr_closure read_closure; } server_port; static void unlink_if_unix_domain_socket(const struct sockaddr_un *un) { @@ -244,7 +245,7 @@ static void on_read(void *arg, int success) { case EINTR: continue; case EAGAIN: - grpc_fd_notify_on_read(sp->emfd, on_read, sp); + grpc_fd_notify_on_read(sp->emfd, &sp->read_closure); return; default: gpr_log(GPR_ERROR, "Failed accept4: %s", strerror(errno)); @@ -393,7 +394,9 @@ void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset **pollsets, for (j = 0; j < pollset_count; j++) { grpc_pollset_add_fd(pollsets[j], s->ports[i].emfd); } - grpc_fd_notify_on_read(s->ports[i].emfd, on_read, &s->ports[i]); + s->ports[i].read_closure.cb = on_read; + s->ports[i].read_closure.cb_arg = &s->ports[i]; + grpc_fd_notify_on_read(s->ports[i].emfd, &s->ports[i].read_closure); s->active_ports++; } gpr_mu_unlock(&s->mu); diff --git a/test/core/iomgr/fd_posix_test.c b/test/core/iomgr/fd_posix_test.c index 22090ead0ac275bf02137755efdf3ca7b2e2494a..57e2c6fc177ad16ecd2d90a3ce1c4316b26f3059 100644 --- a/test/core/iomgr/fd_posix_test.c +++ b/test/core/iomgr/fd_posix_test.c @@ -97,6 +97,7 @@ typedef struct { gpr_mu mu; /* protect done and done_cv */ gpr_cv done_cv; /* signaled when a server finishes serving */ int done; /* set to 1 when a server finishes serving */ + grpc_iomgr_closure listen_closure; } server; static void server_init(server *sv) { @@ -112,6 +113,7 @@ typedef struct { server *sv; /* not owned by a single session */ grpc_fd *em_fd; /* fd to read upload bytes */ char read_buf[BUF_SIZE]; /* buffer to store upload bytes */ + grpc_iomgr_closure session_read_closure; } session; /* Called when an upload session can be safely shutdown. @@ -162,7 +164,7 @@ static void session_read_cb(void *arg, /*session*/ TODO(chenw): in multi-threaded version, callback and polling can be run in different threads. polling may catch a persist read edge event before notify_on_read is called. */ - grpc_fd_notify_on_read(se->em_fd, session_read_cb, se); + grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure); } else { gpr_log(GPR_ERROR, "Unhandled read error %s", strerror(errno)); abort(); @@ -207,9 +209,11 @@ static void listen_cb(void *arg, /*=sv_arg*/ se = gpr_malloc(sizeof(*se)); se->sv = sv; se->em_fd = grpc_fd_create(fd); - grpc_fd_notify_on_read(se->em_fd, session_read_cb, se); + se->session_read_closure.cb = session_read_cb; + se->session_read_closure.cb_arg = se; + grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure); - grpc_fd_notify_on_read(listen_em_fd, listen_cb, sv); + grpc_fd_notify_on_read(listen_em_fd, &sv->listen_closure); } /* Max number of connections pending to be accepted by listen(). */ @@ -234,7 +238,9 @@ static int server_start(server *sv) { sv->em_fd = grpc_fd_create(fd); /* Register to be interested in reading from listen_fd. */ - grpc_fd_notify_on_read(sv->em_fd, listen_cb, sv); + sv->listen_closure.cb = listen_cb; + sv->listen_closure.cb_arg = sv; + grpc_fd_notify_on_read(sv->em_fd, &sv->listen_closure); return port; } @@ -268,6 +274,7 @@ typedef struct { gpr_mu mu; /* protect done and done_cv */ gpr_cv done_cv; /* signaled when a client finishes sending */ int done; /* set to 1 when a client finishes sending */ + grpc_iomgr_closure write_closure; } client; static void client_init(client *cl) { @@ -309,7 +316,9 @@ static void client_session_write(void *arg, /*client*/ if (errno == EAGAIN) { gpr_mu_lock(&cl->mu); if (cl->client_write_cnt < CLIENT_TOTAL_WRITE_CNT) { - grpc_fd_notify_on_write(cl->em_fd, client_session_write, cl); + cl->write_closure.cb = client_session_write; + cl->write_closure.cb_arg = cl; + grpc_fd_notify_on_write(cl->em_fd, &cl->write_closure); cl->client_write_cnt++; } else { client_session_shutdown_cb(arg, 1); @@ -421,6 +430,13 @@ static void test_grpc_fd_change(void) { int sv[2]; char data; int result; + grpc_iomgr_closure first_closure; + grpc_iomgr_closure second_closure; + + first_closure.cb = first_read_callback; + first_closure.cb_arg = &a; + second_closure.cb = second_read_callback; + second_closure.cb_arg = &b; init_change_data(&a); init_change_data(&b); @@ -434,7 +450,7 @@ static void test_grpc_fd_change(void) { em_fd = grpc_fd_create(sv[0]); /* Register the first callback, then make its FD readable */ - grpc_fd_notify_on_read(em_fd, first_read_callback, &a); + grpc_fd_notify_on_read(em_fd, &first_closure); data = 0; result = write(sv[1], &data, 1); GPR_ASSERT(result == 1); @@ -453,7 +469,7 @@ static void test_grpc_fd_change(void) { /* Now register a second callback with distinct change data, and do the same thing again. */ - grpc_fd_notify_on_read(em_fd, second_read_callback, &b); + grpc_fd_notify_on_read(em_fd, &second_closure); data = 0; result = write(sv[1], &data, 1); GPR_ASSERT(result == 1);