Skip to content
Snippets Groups Projects
Commit 5352ff11 authored by Craig Tiller's avatar Craig Tiller
Browse files

Windows fixes for error handling

parent 32c71e0e
No related branches found
No related tags found
No related merge requests found
......@@ -104,7 +104,6 @@ grpc_iocp_work_status grpc_iocp_work(grpc_exec_ctx *exec_ctx,
} else if (overlapped == &socket->read_info.overlapped) {
info = &socket->read_info;
} else {
gpr_log(GPR_ERROR, "Unknown IOCP operation");
abort();
}
success = WSAGetOverlappedResult(socket->socket, &info->overlapped, &bytes,
......@@ -112,16 +111,7 @@ grpc_iocp_work_status grpc_iocp_work(grpc_exec_ctx *exec_ctx,
info->bytes_transfered = bytes;
info->wsa_error = success ? 0 : WSAGetLastError();
GPR_ASSERT(overlapped == &info->overlapped);
GPR_ASSERT(!info->has_pending_iocp);
gpr_mu_lock(&socket->state_mu);
if (info->closure) {
closure = info->closure;
info->closure = NULL;
} else {
info->has_pending_iocp = 1;
}
gpr_mu_unlock(&socket->state_mu);
grpc_exec_ctx_push(exec_ctx, closure, GRPC_ERROR_NONE, NULL);
grpc_socket_become_ready(exec_ctx, socket, info);
return GRPC_IOCP_WORK_WORK;
}
......@@ -176,33 +166,4 @@ void grpc_iocp_add_socket(grpc_winsocket *socket) {
GPR_ASSERT(ret == g_iocp);
}
/* Calling notify_on_read or write means either of two things:
-) The IOCP already completed in the background, and we need to call
the callback now.
-) The IOCP hasn't completed yet, and we're queuing it for later. */
static void socket_notify_on_iocp(grpc_exec_ctx *exec_ctx,
grpc_winsocket *socket, grpc_closure *closure,
grpc_winsocket_callback_info *info) {
GPR_ASSERT(info->closure == NULL);
gpr_mu_lock(&socket->state_mu);
if (info->has_pending_iocp) {
info->has_pending_iocp = 0;
grpc_exec_ctx_push(exec_ctx, closure, GRPC_ERROR_NONE, NULL);
} else {
info->closure = closure;
}
gpr_mu_unlock(&socket->state_mu);
}
void grpc_socket_notify_on_write(grpc_exec_ctx *exec_ctx,
grpc_winsocket *socket,
grpc_closure *closure) {
socket_notify_on_iocp(exec_ctx, socket, closure, &socket->write_info);
}
void grpc_socket_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_winsocket *socket,
grpc_closure *closure) {
socket_notify_on_iocp(exec_ctx, socket, closure, &socket->read_info);
}
#endif /* GPR_WINSOCK_SOCKET */
......@@ -52,12 +52,4 @@ void grpc_iocp_flush(void);
void grpc_iocp_shutdown(void);
void grpc_iocp_add_socket(grpc_winsocket *);
void grpc_socket_notify_on_write(grpc_exec_ctx *exec_ctx,
grpc_winsocket *winsocket,
grpc_closure *closure);
void grpc_socket_notify_on_read(grpc_exec_ctx *exec_ctx,
grpc_winsocket *winsocket,
grpc_closure *closure);
#endif /* GRPC_CORE_LIB_IOMGR_IOCP_WINDOWS_H */
......@@ -91,10 +91,66 @@ void grpc_winsocket_shutdown(grpc_winsocket *winsocket) {
closesocket(winsocket->socket);
}
void grpc_winsocket_destroy(grpc_winsocket *winsocket) {
static void destroy(grpc_winsocket *winsocket) {
grpc_iomgr_unregister_object(&winsocket->iomgr_object);
gpr_mu_destroy(&winsocket->state_mu);
gpr_free(winsocket);
}
static bool check_destroyable(grpc_winsocket *winsocket) {
return winsocket->destroy_called == true && winsocket->write_info.closure == NULL && winsocket->read_info.closure == NULL;
}
void grpc_winsocket_destroy(grpc_winsocket *winsocket) {
gpr_mu_lock(&winsocket->state_mu);
GPR_ASSERT(!winsocket->destroy_called);
winsocket->destroy_called = true;
bool should_destroy = check_destroyable(winsocket);
gpr_mu_unlock(&winsocket->state_mu);
if (should_destroy) destroy(winsocket);
}
/* Calling notify_on_read or write means either of two things:
-) The IOCP already completed in the background, and we need to call
the callback now.
-) The IOCP hasn't completed yet, and we're queuing it for later. */
static void socket_notify_on_iocp(grpc_exec_ctx *exec_ctx,
grpc_winsocket *socket, grpc_closure *closure,
grpc_winsocket_callback_info *info) {
GPR_ASSERT(info->closure == NULL);
gpr_mu_lock(&socket->state_mu);
if (info->has_pending_iocp) {
info->has_pending_iocp = 0;
grpc_exec_ctx_push(exec_ctx, closure, GRPC_ERROR_NONE, NULL);
} else {
info->closure = closure;
}
gpr_mu_unlock(&socket->state_mu);
}
void grpc_socket_notify_on_write(grpc_exec_ctx *exec_ctx,
grpc_winsocket *socket,
grpc_closure *closure) {
socket_notify_on_iocp(exec_ctx, socket, closure, &socket->write_info);
}
void grpc_socket_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_winsocket *socket,
grpc_closure *closure) {
socket_notify_on_iocp(exec_ctx, socket, closure, &socket->read_info);
}
void grpc_socket_become_ready(grpc_exec_ctx *exec_ctx, grpc_winsocket *socket, grpc_winsocket_callback_info *info) {
GPR_ASSERT(!info->has_pending_iocp);
gpr_mu_lock(&socket->state_mu);
if (info->closure) {
grpc_exec_ctx_push(exec_ctx, info->closure, GRPC_ERROR_NONE, NULL);
info->closure = NULL;
} else {
info->has_pending_iocp = 1;
}
bool should_destroy = check_destroyable(socket);
gpr_mu_unlock(&socket->state_mu);
if (should_destroy) destroy(socket);
}
#endif /* GPR_WINSOCK_SOCKET */
......@@ -81,6 +81,7 @@ typedef struct grpc_winsocket_callback_info {
is closer to what happens in posix world. */
typedef struct grpc_winsocket {
SOCKET socket;
bool destroy_called;
grpc_winsocket_callback_info write_info;
grpc_winsocket_callback_info read_info;
......@@ -108,4 +109,14 @@ void grpc_winsocket_shutdown(grpc_winsocket *socket);
/* Destroy a socket. Should only be called if there's no pending operation. */
void grpc_winsocket_destroy(grpc_winsocket *socket);
void grpc_socket_notify_on_write(grpc_exec_ctx *exec_ctx,
grpc_winsocket *winsocket,
grpc_closure *closure);
void grpc_socket_notify_on_read(grpc_exec_ctx *exec_ctx,
grpc_winsocket *winsocket,
grpc_closure *closure);
void grpc_socket_become_ready(grpc_exec_ctx *exec_ctx, grpc_winsocket *winsocket, grpc_winsocket_callback_info *ci);
#endif /* GRPC_CORE_LIB_IOMGR_SOCKET_WINDOWS_H */
......@@ -78,18 +78,18 @@ static void async_connect_unlock_and_cleanup(async_connect *ac,
static void on_alarm(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
async_connect *ac = acp;
gpr_mu_lock(&ac->mu);
if (ac->socket != NULL) {
grpc_winsocket_shutdown(ac->socket);
grpc_winsocket *socket = ac->socket;
ac->socket = NULL;
if (socket != NULL) {
grpc_winsocket_shutdown(socket);
}
async_connect_unlock_and_cleanup(ac, ac->socket);
async_connect_unlock_and_cleanup(ac, socket);
}
static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
async_connect *ac = acp;
SOCKET sock = ac->socket->socket;
grpc_endpoint **ep = ac->endpoint;
GPR_ASSERT(*ep == NULL);
grpc_winsocket_callback_info *info = &ac->socket->write_info;
grpc_closure *on_done = ac->on_done;
GRPC_ERROR_REF(error);
......@@ -106,7 +106,7 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
if (error == GRPC_ERROR_NONE && socket != NULL) {
DWORD transfered_bytes = 0;
DWORD flags;
BOOL wsa_success = WSAGetOverlappedResult(sock, &info->overlapped,
BOOL wsa_success = WSAGetOverlappedResult(socket->socket, &socket->write_info.overlapped,
&transfered_bytes, FALSE, &flags);
GPR_ASSERT(transfered_bytes == 0);
if (!wsa_success) {
......
......@@ -449,7 +449,7 @@ static grpc_error *add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
size_t addr_len, int *port) {
grpc_tcp_listener *sp;
grpc_tcp_listener *sp = NULL;
SOCKET sock;
struct sockaddr_in6 addr6_v4mapped;
struct sockaddr_in6 wildcard;
......@@ -512,6 +512,10 @@ done:
"Failed to add port to server", &error, 1);
GRPC_ERROR_UNREF(error);
error = error_out;
*port = -1;
} else {
GPR_ASSERT(sp != NULL);
*port = sp->port;
}
return error;
}
......
......@@ -167,7 +167,6 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) {
if (error == GRPC_ERROR_NONE) {
if (info->wsa_error != 0 && !tcp->shutting_down) {
char *utf8_message = gpr_format_message(info->wsa_error);
gpr_log(GPR_ERROR, "ReadFile overlapped error: %s", utf8_message);
error = GRPC_ERROR_CREATE(utf8_message);
gpr_free(utf8_message);
gpr_slice_unref(tcp->read_slice);
......
......@@ -118,6 +118,7 @@ void bad_server_thread(void *vargs) {
addr.ss_family = AF_INET;
error =
grpc_tcp_server_add_port(s, (struct sockaddr *)&addr, addr_len, &port);
GPR_ASSERT(GRPC_LOG_IF_ERROR("grpc_tcp_server_add_port", error));
GPR_ASSERT(port > 0);
gpr_asprintf(&args->addr, "localhost:%d", port);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment