Skip to content
Snippets Groups Projects
Commit 8e1a55d4 authored by Nicolas "Pixel" Noble's avatar Nicolas "Pixel" Noble
Browse files

More win32 fixes + documentation.

Fixing a very edge case where the tcp_client can crash due to race conditions on connection abortion.
parent 404fc6ae
No related branches found
No related tags found
No related merge requests found
......@@ -172,7 +172,9 @@ void grpc_iocp_add_socket(grpc_winsocket *socket) {
}
void grpc_iocp_socket_orphan(grpc_winsocket *socket) {
GPR_ASSERT(!socket->orphan);
gpr_atm_full_fetch_add(&g_orphans, 1);
socket->orphan = 1;
}
static void socket_notify_on_iocp(grpc_winsocket *socket,
......
......@@ -69,13 +69,12 @@ void grpc_winsocket_orphan(grpc_winsocket *winsocket) {
SOCKET socket = winsocket->socket;
if (!winsocket->closed_early) {
grpc_iocp_socket_orphan(winsocket);
winsocket->orphan = 1;
}
grpc_iomgr_unref();
if (winsocket->closed_early) {
grpc_winsocket_destroy(winsocket);
}
closesocket(socket);
grpc_iomgr_unref();
}
void grpc_winsocket_destroy(grpc_winsocket *winsocket) {
......
......@@ -59,6 +59,7 @@ typedef struct {
gpr_timespec deadline;
grpc_alarm alarm;
int refs;
int aborted;
} async_connect;
static void async_connect_cleanup(async_connect *ac) {
......@@ -70,26 +71,31 @@ static void async_connect_cleanup(async_connect *ac) {
}
}
static void on_alarm(void *acp, int success) {
static void on_alarm(void *acp, int occured) {
async_connect *ac = acp;
gpr_mu_lock(&ac->mu);
if (ac->socket != NULL && success) {
/* If the alarm didn't occor, it got cancelled. */
if (ac->socket != NULL && occured) {
grpc_winsocket_shutdown(ac->socket);
}
async_connect_cleanup(ac);
}
static void on_connect(void *acp, int success) {
static void on_connect(void *acp, int from_iocp) {
async_connect *ac = acp;
SOCKET sock = ac->socket->socket;
grpc_endpoint *ep = NULL;
grpc_winsocket_callback_info *info = &ac->socket->write_info;
void(*cb)(void *arg, grpc_endpoint *tcp) = ac->cb;
void *cb_arg = ac->cb_arg;
int aborted;
grpc_alarm_cancel(&ac->alarm);
if (success) {
gpr_mu_lock(&ac->mu);
aborted = ac->aborted;
if (from_iocp) {
DWORD transfered_bytes = 0;
DWORD flags;
BOOL wsa_success = WSAGetOverlappedResult(sock, &info->overlapped,
......@@ -107,23 +113,40 @@ static void on_connect(void *acp, int success) {
}
} else {
gpr_log(GPR_ERROR, "on_connect is shutting down");
goto finish;
/* If the connection timeouts, we will still get a notification from
the IOCP whatever happens. So we're just going to flag that connection
as being in the process of being aborted, and wait for the IOCP. We
can't just orphan the socket now, because the IOCP might already have
gotten a successful connection, which is our worst-case scenario.
We need to call our callback now to respect the deadline. */
ac->aborted = 1;
gpr_mu_unlock(&ac->mu);
cb(cb_arg, NULL);
return;
}
abort();
finish:
gpr_mu_lock(&ac->mu);
if (!ep) {
if (success) {
ac->socket->closed_early = 1;
}
/* If we don't have an endpoint, it means the connection failed,
so it doesn't matter if it aborted or failed. We need to orphan
that socket. */
if (!ep || aborted) {
/* If the connection failed, it means we won't get an IOCP notification,
so let's flag it as already closed. But if the connection was aborted,
while we still got an endpoint, we have to wait for the IOCP to collect
that socket. So let's properly flag that. */
ac->socket->closed_early = !ep;
grpc_winsocket_orphan(ac->socket);
}
async_connect_cleanup(ac);
cb(cb_arg, ep);
/* If the connection was aborted, the callback was already called when
the deadline was met. */
if (!aborted) cb(cb_arg, ep);
}
/* Tries to issue one async connection, then schedules both an IOCP
notification request for the connection, and one timeout alert. */
void grpc_tcp_client_connect(void(*cb)(void *arg, grpc_endpoint *tcp),
void *arg, const struct sockaddr *addr,
int addr_len, gpr_timespec deadline) {
......@@ -159,6 +182,8 @@ void grpc_tcp_client_connect(void(*cb)(void *arg, grpc_endpoint *tcp),
goto failure;
}
/* Grab the function pointer for ConnectEx for that specific socket.
It may change depending on the interface. */
status = WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER,
&guid, sizeof(guid), &ConnectEx, sizeof(ConnectEx),
&ioctl_num_bytes, NULL, NULL);
......@@ -181,6 +206,8 @@ void grpc_tcp_client_connect(void(*cb)(void *arg, grpc_endpoint *tcp),
info = &socket->write_info;
success = ConnectEx(sock, addr, addr_len, NULL, 0, NULL, &info->overlapped);
/* It wouldn't be unusual to get a success immediately. But we'll still get
an IOCP notification, so let's ignore it. */
if (!success) {
int error = WSAGetLastError();
if (error != ERROR_IO_PENDING) {
......@@ -195,6 +222,7 @@ void grpc_tcp_client_connect(void(*cb)(void *arg, grpc_endpoint *tcp),
ac->socket = socket;
gpr_mu_init(&ac->mu);
ac->refs = 2;
ac->aborted = 0;
grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac, gpr_now());
grpc_socket_notify_on_write(socket, on_connect, ac);
......
......@@ -55,10 +55,15 @@
/* one listening port */
typedef struct server_port {
gpr_uint8 addresses[sizeof(struct sockaddr_in6) * 2 + 32];
/* This seemingly magic number comes from AcceptEx's documentation. each
address buffer needs to have at least 16 more bytes at their end. */
gpr_uint8 addresses[(sizeof(struct sockaddr_in6) + 16) * 2];
/* This will hold the socket for the next accept. */
SOCKET new_socket;
/* The listener winsocked. */
grpc_winsocket *socket;
grpc_tcp_server *server;
/* The cached AcceptEx for that port. */
LPFN_ACCEPTEX AcceptEx;
int shutting_down;
} server_port;
......@@ -80,6 +85,8 @@ struct grpc_tcp_server {
size_t port_capacity;
};
/* Public function. Allocates the proper data structures to hold a
grpc_tcp_server. */
grpc_tcp_server *grpc_tcp_server_create(void) {
grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server));
gpr_mu_init(&s->mu);
......@@ -93,22 +100,26 @@ grpc_tcp_server *grpc_tcp_server_create(void) {
return s;
}
/* Public function. Stops and destroys a grpc_tcp_server. */
void grpc_tcp_server_destroy(grpc_tcp_server *s,
void (*shutdown_done)(void *shutdown_done_arg),
void *shutdown_done_arg) {
size_t i;
gpr_mu_lock(&s->mu);
/* shutdown all fd's */
/* First, shutdown all fd's. This will queue abortion calls for all
of the pending accepts. */
for (i = 0; i < s->nports; i++) {
grpc_winsocket_shutdown(s->ports[i].socket);
}
/* wait while that happens */
/* This happens asynchronously. Wait while that happens. */
while (s->active_ports) {
gpr_cv_wait(&s->cv, &s->mu, gpr_inf_future);
}
gpr_mu_unlock(&s->mu);
/* delete ALL the things */
/* Now that the accepts have been aborted, we can destroy the sockets.
The IOCP won't get notified on these, so we can flag them as already
closed by the system. */
for (i = 0; i < s->nports; i++) {
server_port *sp = &s->ports[i];
sp->socket->closed_early = 1;
......@@ -122,7 +133,7 @@ void grpc_tcp_server_destroy(grpc_tcp_server *s,
}
}
/* Prepare a recently-created socket for listening. */
/* Prepare (bind) a recently-created socket for listening. */
static int prepare_socket(SOCKET sock, const struct sockaddr *addr,
int addr_len) {
struct sockaddr_storage sockname_temp;
......@@ -170,8 +181,11 @@ error:
return -1;
}
static void on_accept(void *arg, int success);
/* start_accept will reference that for the IOCP notification request. */
static void on_accept(void *arg, int from_iocp);
/* In order to do an async accept, we need to create a socket first which
will be the one assigned to the new incoming connection. */
static void start_accept(server_port *port) {
SOCKET sock = INVALID_SOCKET;
char *message;
......@@ -193,10 +207,13 @@ static void start_accept(server_port *port) {
goto failure;
}
/* Start the "accept" asynchronously. */
success = port->AcceptEx(port->socket->socket, sock, port->addresses, 0,
addrlen, addrlen, &bytes_received,
&port->socket->read_info.overlapped);
/* It is possible to get an accept immediately without delay. However, we
will still get an IOCP notification for it. So let's just ignore it. */
if (!success) {
int error = WSAGetLastError();
if (error != ERROR_IO_PENDING) {
......@@ -205,6 +222,8 @@ static void start_accept(server_port *port) {
}
}
/* We're ready to do the accept. Calling grpc_socket_notify_on_read may
immediately process an accept that happened in the meantime. */
port->new_socket = sock;
grpc_socket_notify_on_read(port->socket, on_accept, port);
return;
......@@ -216,14 +235,18 @@ failure:
if (sock != INVALID_SOCKET) closesocket(sock);
}
/* event manager callback when reads are ready */
static void on_accept(void *arg, int success) {
/* Event manager callback when reads are ready. */
static void on_accept(void *arg, int from_iocp) {
server_port *sp = arg;
SOCKET sock = sp->new_socket;
grpc_winsocket_callback_info *info = &sp->socket->read_info;
grpc_endpoint *ep = NULL;
/* The shutdown sequence is done in two parts. This is the second
part here, acknowledging the IOCP notification, and doing nothing
else, especially not queuing a new accept. */
if (sp->shutting_down) {
GPR_ASSERT(from_iocp);
sp->shutting_down = 0;
gpr_mu_lock(&sp->server->mu);
if (0 == --sp->server->active_ports) {
......@@ -233,7 +256,9 @@ static void on_accept(void *arg, int success) {
return;
}
if (success) {
if (from_iocp) {
/* The IOCP notified us of a completed operation. Let's grab the results,
and act accordingly. */
DWORD transfered_bytes = 0;
DWORD flags;
BOOL wsa_success = WSAGetOverlappedResult(sock, &info->overlapped,
......@@ -247,13 +272,23 @@ static void on_accept(void *arg, int success) {
ep = grpc_tcp_create(grpc_winsocket_create(sock));
}
} else {
/* If we're not notified from the IOCP, it means we are asked to shutdown.
This will initiate that shutdown. Calling closesocket will trigger an
IOCP notification, that will call this function a second time, from
the IOCP thread. */
sp->shutting_down = 1;
sp->new_socket = INVALID_SOCKET;
closesocket(sock);
}
/* The only time we should call our callback, is where we successfully
managed to accept a connection, and created an endpoint. */
if (ep) sp->server->cb(sp->server->cb_arg, ep);
if (success) {
if (from_iocp) {
/* As we were notified from the IOCP of one and exactly one accept,
the former socked we created has now either been destroy or assigned
to the new connection. We need to create a new one for the next
connection. */
start_accept(sp);
}
}
......@@ -269,6 +304,8 @@ static int add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
if (sock == INVALID_SOCKET) return -1;
/* We need to grab the AcceptEx pointer for that port, as it may be
interface-dependent. We'll cache it to avoid doing that again. */
status =
WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
&AcceptEx, sizeof(AcceptEx), &ioctl_num_bytes, NULL, NULL);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment