Skip to content
Snippets Groups Projects
Commit c037fc18 authored by Craig Tiller's avatar Craig Tiller
Browse files

Fix windows for new RU api

parent a947f1c3
No related branches found
No related tags found
No related merge requests found
......@@ -109,46 +109,34 @@ typedef struct grpc_tcp {
gpr_slice_buffer *write_slices;
gpr_slice_buffer *read_slices;
grpc_resource_user resource_user;
grpc_resource_user *resource_user;
/* The IO Completion Port runs from another thread. We need some mechanism
to protect ourselves when requesting a shutdown. */
gpr_mu mu;
int shutting_down;
gpr_atm resource_user_shutdown_count;
char *peer_string;
} grpc_tcp;
static void win_unref_closure(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
grpc_error *error);
static void win_maybe_shutdown_resource_user(grpc_exec_ctx *exec_ctx,
grpc_tcp *tcp) {
if (gpr_atm_full_fetch_add(&tcp->resource_user_shutdown_count, 1) == 0) {
grpc_resource_user_shutdown(exec_ctx, &tcp->resource_user,
grpc_closure_create(win_unref_closure, tcp));
}
}
static void tcp_free(grpc_tcp *tcp) {
static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
grpc_winsocket_destroy(tcp->socket);
gpr_mu_destroy(&tcp->mu);
gpr_free(tcp->peer_string);
grpc_resource_user_unref(exec_ctx, tcp->resource_user);
gpr_free(tcp);
}
/*#define GRPC_TCP_REFCOUNT_DEBUG*/
#ifdef GRPC_TCP_REFCOUNT_DEBUG
#define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__)
#define TCP_UNREF(exec_ctx, tcp, reason) tcp_unref((exec_ctx), (tcp), (reason), __FILE__, __LINE__)
#define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__)
static void tcp_unref(grpc_tcp *tcp, const char *reason, const char *file,
static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp, const char *reason, const char *file,
int line) {
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %d -> %d", tcp,
reason, tcp->refcount.count, tcp->refcount.count - 1);
if (gpr_unref(&tcp->refcount)) {
tcp_free(tcp);
tcp_free(exec_ctx, tcp);
}
}
......@@ -159,22 +147,17 @@ static void tcp_ref(grpc_tcp *tcp, const char *reason, const char *file,
gpr_ref(&tcp->refcount);
}
#else
#define TCP_UNREF(tcp, reason) tcp_unref((tcp))
#define TCP_UNREF(exec_ctx, tcp, reason) tcp_unref((exec_ctx), (tcp))
#define TCP_REF(tcp, reason) tcp_ref((tcp))
static void tcp_unref(grpc_tcp *tcp) {
static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
if (gpr_unref(&tcp->refcount)) {
tcp_free(tcp);
tcp_free(exec_ctx, tcp);
}
}
static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); }
#endif
static void win_unref_closure(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
TCP_UNREF(arg, "resource_user");
}
/* Asynchronous callback from the IOCP, or the background thread. */
static void on_read(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) {
grpc_tcp *tcp = tcpp;
......@@ -203,7 +186,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) {
}
tcp->read_cb = NULL;
TCP_UNREF(tcp, "read");
TCP_UNREF(exec_ctx, tcp, "read");
grpc_exec_ctx_sched(exec_ctx, cb, error, NULL);
}
......@@ -287,7 +270,7 @@ static void on_write(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) {
}
}
TCP_UNREF(tcp, "write");
TCP_UNREF(exec_ctx, tcp, "write");
grpc_exec_ctx_sched(exec_ctx, cb, error, NULL);
}
......@@ -355,7 +338,7 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
if (status != 0) {
int wsa_error = WSAGetLastError();
if (wsa_error != WSA_IO_PENDING) {
TCP_UNREF(tcp, "write");
TCP_UNREF(exec_ctx, tcp, "write");
grpc_exec_ctx_sched(exec_ctx, cb, GRPC_WSA_ERROR(wsa_error, "WSASend"),
NULL);
return;
......@@ -396,15 +379,14 @@ static void win_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
callback. See the comments in on_read and on_write. */
tcp->shutting_down = 1;
grpc_winsocket_shutdown(tcp->socket);
win_maybe_shutdown_resource_user(exec_ctx, tcp);
gpr_mu_unlock(&tcp->mu);
grpc_resource_user_shutdown(exec_ctx, tcp->resource_user);
}
static void win_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
grpc_network_status_unregister_endpoint(ep);
grpc_tcp *tcp = (grpc_tcp *)ep;
win_maybe_shutdown_resource_user(exec_ctx, tcp);
TCP_UNREF(tcp, "destroy");
TCP_UNREF(exec_ctx, tcp, "destroy");
}
static char *win_get_peer(grpc_endpoint *ep) {
......@@ -416,7 +398,7 @@ static grpc_workqueue *win_get_workqueue(grpc_endpoint *ep) { return NULL; }
static grpc_resource_user *win_get_resource_user(grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
return &tcp->resource_user;
return tcp->resource_user;
}
static grpc_endpoint_vtable vtable = {win_read,
......@@ -441,7 +423,7 @@ grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket,
grpc_closure_init(&tcp->on_read, on_read, tcp);
grpc_closure_init(&tcp->on_write, on_write, tcp);
tcp->peer_string = gpr_strdup(peer_string);
grpc_resource_user_init(&tcp->resource_user, resource_quota, peer_string);
tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string);
/* Tell network status tracking code about the new endpoint */
grpc_network_status_register_endpoint(&tcp->base);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment