Skip to content
Snippets Groups Projects
Commit 404fc6ae authored by Nicolas "Pixel" Noble's avatar Nicolas "Pixel" Noble
Browse files

Wave of Win32 fixes.

-) tcp client and server should no longer starve waiting on orphans
-) proper server shutdown sequence to prevent use-after-free.
parent 99076fe5
No related branches found
No related tags found
No related merge requests found
...@@ -32,11 +32,12 @@ ...@@ -32,11 +32,12 @@
*/ */
#include <grpc/support/port_platform.h> #include <grpc/support/port_platform.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#ifdef GPR_WINSOCK_SOCKET #ifdef GPR_WINSOCK_SOCKET
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include "src/core/iomgr/iocp_windows.h" #include "src/core/iomgr/iocp_windows.h"
#include "src/core/iomgr/iomgr.h" #include "src/core/iomgr/iomgr.h"
#include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/iomgr_internal.h"
...@@ -64,16 +65,22 @@ void grpc_winsocket_shutdown(grpc_winsocket *socket) { ...@@ -64,16 +65,22 @@ void grpc_winsocket_shutdown(grpc_winsocket *socket) {
shutdown_op(&socket->write_info); shutdown_op(&socket->write_info);
} }
void grpc_winsocket_orphan(grpc_winsocket *socket) { void grpc_winsocket_orphan(grpc_winsocket *winsocket) {
grpc_iocp_socket_orphan(socket); SOCKET socket = winsocket->socket;
socket->orphan = 1; if (!winsocket->closed_early) {
grpc_iocp_socket_orphan(winsocket);
winsocket->orphan = 1;
}
grpc_iomgr_unref(); grpc_iomgr_unref();
closesocket(socket->socket); if (winsocket->closed_early) {
grpc_winsocket_destroy(winsocket);
}
closesocket(socket);
} }
void grpc_winsocket_destroy(grpc_winsocket *socket) { void grpc_winsocket_destroy(grpc_winsocket *winsocket) {
gpr_mu_destroy(&socket->state_mu); gpr_mu_destroy(&winsocket->state_mu);
gpr_free(socket); gpr_free(winsocket);
} }
#endif /* GPR_WINSOCK_SOCKET */ #endif /* GPR_WINSOCK_SOCKET */
...@@ -64,6 +64,7 @@ typedef struct grpc_winsocket { ...@@ -64,6 +64,7 @@ typedef struct grpc_winsocket {
int added_to_iocp; int added_to_iocp;
int orphan; int orphan;
int closed_early;
} grpc_winsocket; } grpc_winsocket;
/* Create a wrapped windows handle. /* Create a wrapped windows handle.
......
...@@ -115,6 +115,9 @@ static void on_connect(void *acp, int success) { ...@@ -115,6 +115,9 @@ static void on_connect(void *acp, int success) {
finish: finish:
gpr_mu_lock(&ac->mu); gpr_mu_lock(&ac->mu);
if (!ep) { if (!ep) {
if (success) {
ac->socket->closed_early = 1;
}
grpc_winsocket_orphan(ac->socket); grpc_winsocket_orphan(ac->socket);
} }
async_connect_cleanup(ac); async_connect_cleanup(ac);
...@@ -202,6 +205,7 @@ failure: ...@@ -202,6 +205,7 @@ failure:
gpr_log(GPR_ERROR, message, utf8_message); gpr_log(GPR_ERROR, message, utf8_message);
gpr_free(utf8_message); gpr_free(utf8_message);
if (socket) { if (socket) {
socket->closed_early = 1;
grpc_winsocket_orphan(socket); grpc_winsocket_orphan(socket);
} else if (sock != INVALID_SOCKET) { } else if (sock != INVALID_SOCKET) {
closesocket(sock); closesocket(sock);
......
...@@ -60,6 +60,7 @@ typedef struct server_port { ...@@ -60,6 +60,7 @@ typedef struct server_port {
grpc_winsocket *socket; grpc_winsocket *socket;
grpc_tcp_server *server; grpc_tcp_server *server;
LPFN_ACCEPTEX AcceptEx; LPFN_ACCEPTEX AcceptEx;
int shutting_down;
} server_port; } server_port;
/* the overall server */ /* the overall server */
...@@ -110,6 +111,7 @@ void grpc_tcp_server_destroy(grpc_tcp_server *s, ...@@ -110,6 +111,7 @@ void grpc_tcp_server_destroy(grpc_tcp_server *s,
/* delete ALL the things */ /* delete ALL the things */
for (i = 0; i < s->nports; i++) { for (i = 0; i < s->nports; i++) {
server_port *sp = &s->ports[i]; server_port *sp = &s->ports[i];
sp->socket->closed_early = 1;
grpc_winsocket_orphan(sp->socket); grpc_winsocket_orphan(sp->socket);
} }
gpr_free(s->ports); gpr_free(s->ports);
...@@ -191,8 +193,6 @@ static void start_accept(server_port *port) { ...@@ -191,8 +193,6 @@ static void start_accept(server_port *port) {
goto failure; goto failure;
} }
/* TODO(jtattermusch): probably a race here, we regularly get use-after-free on server shutdown */
GPR_ASSERT(port->socket != (grpc_winsocket*)0xfeeefeee);
success = port->AcceptEx(port->socket->socket, sock, port->addresses, 0, success = port->AcceptEx(port->socket->socket, sock, port->addresses, 0,
addrlen, addrlen, &bytes_received, addrlen, addrlen, &bytes_received,
&port->socket->read_info.overlapped); &port->socket->read_info.overlapped);
...@@ -223,6 +223,16 @@ static void on_accept(void *arg, int success) { ...@@ -223,6 +223,16 @@ static void on_accept(void *arg, int success) {
grpc_winsocket_callback_info *info = &sp->socket->read_info; grpc_winsocket_callback_info *info = &sp->socket->read_info;
grpc_endpoint *ep = NULL; grpc_endpoint *ep = NULL;
if (sp->shutting_down) {
sp->shutting_down = 0;
gpr_mu_lock(&sp->server->mu);
if (0 == --sp->server->active_ports) {
gpr_cv_broadcast(&sp->server->cv);
}
gpr_mu_unlock(&sp->server->mu);
return;
}
if (success) { if (success) {
DWORD transfered_bytes = 0; DWORD transfered_bytes = 0;
DWORD flags; DWORD flags;
...@@ -237,12 +247,9 @@ static void on_accept(void *arg, int success) { ...@@ -237,12 +247,9 @@ static void on_accept(void *arg, int success) {
ep = grpc_tcp_create(grpc_winsocket_create(sock)); ep = grpc_tcp_create(grpc_winsocket_create(sock));
} }
} else { } else {
sp->shutting_down = 1;
sp->new_socket = INVALID_SOCKET;
closesocket(sock); closesocket(sock);
gpr_mu_lock(&sp->server->mu);
if (0 == --sp->server->active_ports) {
gpr_cv_broadcast(&sp->server->cv);
}
gpr_mu_unlock(&sp->server->mu);
} }
if (ep) sp->server->cb(sp->server->cb_arg, ep); if (ep) sp->server->cb(sp->server->cb_arg, ep);
...@@ -286,6 +293,7 @@ static int add_socket_to_server(grpc_tcp_server *s, SOCKET sock, ...@@ -286,6 +293,7 @@ static int add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
sp = &s->ports[s->nports++]; sp = &s->ports[s->nports++];
sp->server = s; sp->server = s;
sp->socket = grpc_winsocket_create(sock); sp->socket = grpc_winsocket_create(sock);
sp->shutting_down = 0;
sp->AcceptEx = AcceptEx; sp->AcceptEx = AcceptEx;
GPR_ASSERT(sp->socket); GPR_ASSERT(sp->socket);
gpr_mu_unlock(&s->mu); gpr_mu_unlock(&s->mu);
......
...@@ -130,6 +130,7 @@ static void on_read(void *tcpp, int success) { ...@@ -130,6 +130,7 @@ static void on_read(void *tcpp, int success) {
gpr_log(GPR_ERROR, "ReadFile overlapped error: %s", utf8_message); gpr_log(GPR_ERROR, "ReadFile overlapped error: %s", utf8_message);
gpr_free(utf8_message); gpr_free(utf8_message);
status = GRPC_ENDPOINT_CB_ERROR; status = GRPC_ENDPOINT_CB_ERROR;
socket->closed_early = 1;
} else { } else {
if (info->bytes_transfered != 0) { if (info->bytes_transfered != 0) {
sub = gpr_slice_sub(tcp->read_slice, 0, info->bytes_transfered); sub = gpr_slice_sub(tcp->read_slice, 0, info->bytes_transfered);
...@@ -225,6 +226,7 @@ static void on_write(void *tcpp, int success) { ...@@ -225,6 +226,7 @@ static void on_write(void *tcpp, int success) {
gpr_log(GPR_ERROR, "WSASend overlapped error: %s", utf8_message); gpr_log(GPR_ERROR, "WSASend overlapped error: %s", utf8_message);
gpr_free(utf8_message); gpr_free(utf8_message);
status = GRPC_ENDPOINT_CB_ERROR; status = GRPC_ENDPOINT_CB_ERROR;
tcp->socket->closed_early = 1;
} else { } else {
GPR_ASSERT(info->bytes_transfered == tcp->write_slices.length); GPR_ASSERT(info->bytes_transfered == tcp->write_slices.length);
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment