Skip to content
Snippets Groups Projects
Commit 0882a353 authored by Craig Tiller's avatar Craig Tiller
Browse files

Fix refcounting

parent c84d4aa6
No related branches found
No related tags found
No related merge requests found
...@@ -96,17 +96,44 @@ typedef struct grpc_tcp { ...@@ -96,17 +96,44 @@ typedef struct grpc_tcp {
char *peer_string; char *peer_string;
} grpc_tcp; } grpc_tcp;
static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); } static void tcp_free(grpc_tcp *tcp) {
grpc_winsocket_orphan(tcp->socket);
gpr_mu_destroy(&tcp->mu);
gpr_free(tcp->peer_string);
gpr_free(tcp);
}
#define GRPC_TCP_REFCOUNT_DEBUG
#ifdef GRPC_TCP_REFCOUNT_DEBUG
#define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__)
#define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__)
static void tcp_unref(grpc_tcp *tcp, const char *reason, const char *file,
int line) {
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %d -> %d", tcp,
reason, tcp->refcount.count, tcp->refcount.count - 1);
if (gpr_unref(&tcp->refcount)) {
tcp_free(tcp);
}
}
static void tcp_ref(grpc_tcp *tcp, const char *reason, const char *file,
int line) {
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP ref %p : %s %d -> %d", tcp,
reason, tcp->refcount.count, tcp->refcount.count + 1);
gpr_ref(&tcp->refcount);
}
#else
#define TCP_UNREF(tcp, reason) tcp_unref((tcp))
#define TCP_REF(tcp, reason) tcp_ref((tcp))
static void tcp_unref(grpc_tcp *tcp) { static void tcp_unref(grpc_tcp *tcp) {
if (gpr_unref(&tcp->refcount)) { if (gpr_unref(&tcp->refcount)) {
grpc_winsocket_orphan(tcp->socket); tcp_free(tcp);
gpr_mu_destroy(&tcp->mu);
gpr_free(tcp->peer_string);
gpr_free(tcp);
} }
} }
static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); }
#endif
/* Asynchronous callback from the IOCP, or the background thread. */ /* Asynchronous callback from the IOCP, or the background thread. */
static int on_read(grpc_tcp *tcp, int from_iocp) { static int on_read(grpc_tcp *tcp, int from_iocp) {
grpc_winsocket *socket = tcp->socket; grpc_winsocket *socket = tcp->socket;
...@@ -131,7 +158,6 @@ static int on_read(grpc_tcp *tcp, int from_iocp) { ...@@ -131,7 +158,6 @@ static int on_read(grpc_tcp *tcp, int from_iocp) {
tcp->socket->read_info.outstanding = 0; tcp->socket->read_info.outstanding = 0;
gpr_slice_unref(tcp->read_slice); gpr_slice_unref(tcp->read_slice);
} }
tcp_unref(tcp);
return 0; return 0;
} }
...@@ -166,8 +192,10 @@ static void on_read_cb(void *tcpp, int from_iocp) { ...@@ -166,8 +192,10 @@ static void on_read_cb(void *tcpp, int from_iocp) {
grpc_iomgr_closure *cb = tcp->read_cb; grpc_iomgr_closure *cb = tcp->read_cb;
int success = on_read(tcp, from_iocp); int success = on_read(tcp, from_iocp);
tcp->read_cb = NULL; tcp->read_cb = NULL;
tcp_unref(tcp); TCP_UNREF(tcp, "read");
cb->cb(cb->cb_arg, success); if (cb) {
cb->cb(cb->cb_arg, success);
}
} }
static grpc_endpoint_op_status win_read(grpc_endpoint *ep, static grpc_endpoint_op_status win_read(grpc_endpoint *ep,
...@@ -185,6 +213,9 @@ static grpc_endpoint_op_status win_read(grpc_endpoint *ep, ...@@ -185,6 +213,9 @@ static grpc_endpoint_op_status win_read(grpc_endpoint *ep,
if (tcp->shutting_down) { if (tcp->shutting_down) {
return GRPC_ENDPOINT_ERROR; return GRPC_ENDPOINT_ERROR;
} }
TCP_REF(tcp, "read");
tcp->socket->read_info.outstanding = 1; tcp->socket->read_info.outstanding = 1;
tcp->read_cb = cb; tcp->read_cb = cb;
tcp->read_slices = read_slices; tcp->read_slices = read_slices;
...@@ -201,8 +232,11 @@ static grpc_endpoint_op_status win_read(grpc_endpoint *ep, ...@@ -201,8 +232,11 @@ static grpc_endpoint_op_status win_read(grpc_endpoint *ep,
/* Did we get data immediately ? Yay. */ /* Did we get data immediately ? Yay. */
if (info->wsa_error != WSAEWOULDBLOCK) { if (info->wsa_error != WSAEWOULDBLOCK) {
int ok;
info->bytes_transfered = bytes_read; info->bytes_transfered = bytes_read;
return on_read(tcp, 1) ? GRPC_ENDPOINT_DONE : GRPC_ENDPOINT_ERROR; ok = on_read(tcp, 1);
TCP_UNREF(tcp, "read");
return ok ? GRPC_ENDPOINT_DONE : GRPC_ENDPOINT_ERROR;
} }
/* Otherwise, let's retry, by queuing a read. */ /* Otherwise, let's retry, by queuing a read. */
...@@ -213,12 +247,13 @@ static grpc_endpoint_op_status win_read(grpc_endpoint *ep, ...@@ -213,12 +247,13 @@ static grpc_endpoint_op_status win_read(grpc_endpoint *ep,
if (status != 0) { if (status != 0) {
int wsa_error = WSAGetLastError(); int wsa_error = WSAGetLastError();
if (wsa_error != WSA_IO_PENDING) { if (wsa_error != WSA_IO_PENDING) {
int ok;
info->wsa_error = wsa_error; info->wsa_error = wsa_error;
return on_read(tcp, 1) ? GRPC_ENDPOINT_DONE : GRPC_ENDPOINT_ERROR; ok = on_read(tcp, 1);
return ok ? GRPC_ENDPOINT_DONE : GRPC_ENDPOINT_ERROR;
} }
} }
tcp_ref(tcp);
grpc_socket_notify_on_read(tcp->socket, on_read_cb, tcp); grpc_socket_notify_on_read(tcp->socket, on_read_cb, tcp);
return GRPC_ENDPOINT_PENDING; return GRPC_ENDPOINT_PENDING;
} }
...@@ -247,7 +282,7 @@ static void on_write(void *tcpp, int from_iocp) { ...@@ -247,7 +282,7 @@ static void on_write(void *tcpp, int from_iocp) {
if (from_iocp) { if (from_iocp) {
tcp->socket->write_info.outstanding = 0; tcp->socket->write_info.outstanding = 0;
} }
tcp_unref(tcp); TCP_UNREF(tcp, "write");
if (cb) { if (cb) {
cb->cb(cb->cb_arg, 0); cb->cb(cb->cb_arg, 0);
} }
...@@ -270,7 +305,7 @@ static void on_write(void *tcpp, int from_iocp) { ...@@ -270,7 +305,7 @@ static void on_write(void *tcpp, int from_iocp) {
tcp->socket->write_info.outstanding = 0; tcp->socket->write_info.outstanding = 0;
tcp_unref(tcp); TCP_UNREF(tcp, "write");
cb->cb(cb->cb_arg, success); cb->cb(cb->cb_arg, success);
} }
...@@ -292,7 +327,7 @@ static grpc_endpoint_op_status win_write(grpc_endpoint *ep, ...@@ -292,7 +327,7 @@ static grpc_endpoint_op_status win_write(grpc_endpoint *ep,
if (tcp->shutting_down) { if (tcp->shutting_down) {
return GRPC_ENDPOINT_ERROR; return GRPC_ENDPOINT_ERROR;
} }
tcp_ref(tcp); TCP_REF(tcp, "write");
tcp->socket->write_info.outstanding = 1; tcp->socket->write_info.outstanding = 1;
tcp->write_cb = cb; tcp->write_cb = cb;
...@@ -330,7 +365,7 @@ static grpc_endpoint_op_status win_write(grpc_endpoint *ep, ...@@ -330,7 +365,7 @@ static grpc_endpoint_op_status win_write(grpc_endpoint *ep,
} }
if (allocated) gpr_free(allocated); if (allocated) gpr_free(allocated);
tcp->socket->write_info.outstanding = 0; tcp->socket->write_info.outstanding = 0;
tcp_unref(tcp); TCP_UNREF(tcp, "write");
return ret; return ret;
} }
...@@ -345,7 +380,7 @@ static grpc_endpoint_op_status win_write(grpc_endpoint *ep, ...@@ -345,7 +380,7 @@ static grpc_endpoint_op_status win_write(grpc_endpoint *ep,
int wsa_error = WSAGetLastError(); int wsa_error = WSAGetLastError();
if (wsa_error != WSA_IO_PENDING) { if (wsa_error != WSA_IO_PENDING) {
tcp->socket->write_info.outstanding = 0; tcp->socket->write_info.outstanding = 0;
tcp_unref(tcp); TCP_UNREF(tcp, "write");
return GRPC_ENDPOINT_ERROR; return GRPC_ENDPOINT_ERROR;
} }
} }
...@@ -378,19 +413,17 @@ static void win_add_to_pollset_set(grpc_endpoint *ep, grpc_pollset_set *pss) { ...@@ -378,19 +413,17 @@ static void win_add_to_pollset_set(grpc_endpoint *ep, grpc_pollset_set *pss) {
concurrent access of the data structure in that regard. */ concurrent access of the data structure in that regard. */
static void win_shutdown(grpc_endpoint *ep) { static void win_shutdown(grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep; grpc_tcp *tcp = (grpc_tcp *)ep;
int extra_refs = 0;
gpr_mu_lock(&tcp->mu); gpr_mu_lock(&tcp->mu);
/* At that point, what may happen is that we're already inside the IOCP /* At that point, what may happen is that we're already inside the IOCP
callback. See the comments in on_read and on_write. */ callback. See the comments in on_read and on_write. */
tcp->shutting_down = 1; tcp->shutting_down = 1;
extra_refs = grpc_winsocket_shutdown(tcp->socket); grpc_winsocket_shutdown(tcp->socket);
while (extra_refs--) tcp_ref(tcp);
gpr_mu_unlock(&tcp->mu); gpr_mu_unlock(&tcp->mu);
} }
static void win_destroy(grpc_endpoint *ep) { static void win_destroy(grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep; grpc_tcp *tcp = (grpc_tcp *)ep;
tcp_unref(tcp); TCP_UNREF(tcp, "destroy");
} }
static char *win_get_peer(grpc_endpoint *ep) { static char *win_get_peer(grpc_endpoint *ep) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment