Skip to content
Snippets Groups Projects
Commit 69259d42 authored by murgatroid99's avatar murgatroid99
Browse files

Add resource quota support to uv TCP code

parent a12e6d80
No related branches found
No related tags found
No related merge requests found
...@@ -41,8 +41,9 @@ ...@@ -41,8 +41,9 @@
#include "src/core/lib/iomgr/endpoint_pair.h" #include "src/core/lib/iomgr/endpoint_pair.h"
grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(
size_t read_slice_size) { const char *name, grpc_resource_quota *resource_quota,
size_t read_slice_size) {
grpc_endpoint_pair endpoint_pair; grpc_endpoint_pair endpoint_pair;
// TODO(mlumish): implement this properly under libuv // TODO(mlumish): implement this properly under libuv
GPR_ASSERT(false && GPR_ASSERT(false &&
......
...@@ -712,3 +712,10 @@ void grpc_resource_user_alloc_slices( ...@@ -712,3 +712,10 @@ void grpc_resource_user_alloc_slices(
grpc_resource_user_alloc(exec_ctx, slice_allocator->resource_user, grpc_resource_user_alloc(exec_ctx, slice_allocator->resource_user,
count * length, &slice_allocator->on_allocated); count * length, &slice_allocator->on_allocated);
} }
gpr_slice grpc_resource_user_slice_malloc(grpc_exec_ctx *exec_ctx,
grpc_resource_user *resource_user,
size_t size) {
grpc_resource_user_alloc(exec_ctx, resource_user, size, NULL);
return ru_slice_create(resource_user, size);
}
...@@ -221,4 +221,9 @@ void grpc_resource_user_alloc_slices( ...@@ -221,4 +221,9 @@ void grpc_resource_user_alloc_slices(
grpc_resource_user_slice_allocator *slice_allocator, size_t length, grpc_resource_user_slice_allocator *slice_allocator, size_t length,
size_t count, gpr_slice_buffer *dest); size_t count, gpr_slice_buffer *dest);
/* Allocate one slice of length \a size synchronously. */
gpr_slice grpc_resource_user_slice_malloc(grpc_exec_ctx *exec_ctx,
grpc_resource_user *resource_user,
size_t size);
#endif /* GRPC_CORE_LIB_IOMGR_RESOURCE_QUOTA_H */ #endif /* GRPC_CORE_LIB_IOMGR_RESOURCE_QUOTA_H */
...@@ -54,9 +54,12 @@ typedef struct grpc_uv_tcp_connect { ...@@ -54,9 +54,12 @@ typedef struct grpc_uv_tcp_connect {
grpc_endpoint **endpoint; grpc_endpoint **endpoint;
int refs; int refs;
char *addr_name; char *addr_name;
grpc_resource_quota *resource_quota;
} grpc_uv_tcp_connect; } grpc_uv_tcp_connect;
static void uv_tcp_connect_cleanup(grpc_uv_tcp_connect *connect) { static void uv_tcp_connect_cleanup(grpc_exec_ctx *exec_ctx,
grpc_uv_tcp_connect *connect) {
grpc_resource_quota_internal_unref(exec_ctx, connect->resource_quota);
gpr_free(connect); gpr_free(connect);
} }
...@@ -74,7 +77,7 @@ static void uv_tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, ...@@ -74,7 +77,7 @@ static void uv_tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp,
} }
done = (--connect->refs == 0); done = (--connect->refs == 0);
if (done) { if (done) {
uv_tcp_connect_cleanup(connect); uv_tcp_connect_cleanup(exec_ctx, connect);
} }
} }
...@@ -86,8 +89,8 @@ static void uv_tc_on_connect(uv_connect_t *req, int status) { ...@@ -86,8 +89,8 @@ static void uv_tc_on_connect(uv_connect_t *req, int status) {
grpc_closure *closure = connect->closure; grpc_closure *closure = connect->closure;
grpc_timer_cancel(&exec_ctx, &connect->alarm); grpc_timer_cancel(&exec_ctx, &connect->alarm);
if (status == 0) { if (status == 0) {
*connect->endpoint = *connect->endpoint = grpc_tcp_create(
grpc_tcp_create(connect->tcp_handle, connect->addr_name); connect->tcp_handle, connect->resource_quota, connect->addr_name);
} else { } else {
error = GRPC_ERROR_CREATE("Failed to connect to remote host"); error = GRPC_ERROR_CREATE("Failed to connect to remote host");
error = grpc_error_set_int(error, GRPC_ERROR_INT_ERRNO, -status); error = grpc_error_set_int(error, GRPC_ERROR_INT_ERRNO, -status);
...@@ -105,7 +108,7 @@ static void uv_tc_on_connect(uv_connect_t *req, int status) { ...@@ -105,7 +108,7 @@ static void uv_tc_on_connect(uv_connect_t *req, int status) {
} }
done = (--connect->refs == 0); done = (--connect->refs == 0);
if (done) { if (done) {
uv_tcp_connect_cleanup(connect); uv_tcp_connect_cleanup(&exec_ctx, connect);
} }
grpc_exec_ctx_sched(&exec_ctx, closure, error, NULL); grpc_exec_ctx_sched(&exec_ctx, closure, error, NULL);
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx);
...@@ -114,16 +117,31 @@ static void uv_tc_on_connect(uv_connect_t *req, int status) { ...@@ -114,16 +117,31 @@ static void uv_tc_on_connect(uv_connect_t *req, int status) {
static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
grpc_closure *closure, grpc_endpoint **ep, grpc_closure *closure, grpc_endpoint **ep,
grpc_pollset_set *interested_parties, grpc_pollset_set *interested_parties,
const grpc_channel_args *channel_args,
const grpc_resolved_address *resolved_addr, const grpc_resolved_address *resolved_addr,
gpr_timespec deadline) { gpr_timespec deadline) {
grpc_uv_tcp_connect *connect; grpc_uv_tcp_connect *connect;
grpc_resource_quota *resource_quota = grpc_resource_quota_create(NULL);
(void)channel_args;
(void)interested_parties; (void)interested_parties;
if (channel_args != NULL) {
for (size_t i = 0; i < channel_args->num_args; i++) {
if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
grpc_resource_quota_internal_unref(exec_ctx, resource_quota);
resource_quota = grpc_resource_quota_internal_ref(
channel_args->args[i].value.pointer.p);
}
}
}
connect = gpr_malloc(sizeof(grpc_uv_tcp_connect)); connect = gpr_malloc(sizeof(grpc_uv_tcp_connect));
memset(connect, 0, sizeof(grpc_uv_tcp_connect)); memset(connect, 0, sizeof(grpc_uv_tcp_connect));
connect->closure = closure; connect->closure = closure;
connect->endpoint = ep; connect->endpoint = ep;
connect->tcp_handle = gpr_malloc(sizeof(uv_tcp_t)); connect->tcp_handle = gpr_malloc(sizeof(uv_tcp_t));
connect->addr_name = grpc_sockaddr_to_uri(resolved_addr); connect->addr_name = grpc_sockaddr_to_uri(resolved_addr);
connect->resource_quota = resource_quota;
uv_tcp_init(uv_default_loop(), connect->tcp_handle); uv_tcp_init(uv_default_loop(), connect->tcp_handle);
connect->connect_req.data = connect; connect->connect_req.data = connect;
// TODO(murgatroid99): figure out what the return value here means // TODO(murgatroid99): figure out what the return value here means
...@@ -138,16 +156,18 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, ...@@ -138,16 +156,18 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
// overridden by api_fuzzer.c // overridden by api_fuzzer.c
void (*grpc_tcp_client_connect_impl)( void (*grpc_tcp_client_connect_impl)(
grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep,
grpc_pollset_set *interested_parties, const grpc_resolved_address *addr, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args,
const grpc_resolved_address *addr,
gpr_timespec deadline) = tcp_client_connect_impl; gpr_timespec deadline) = tcp_client_connect_impl;
void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure, void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
grpc_endpoint **ep, grpc_endpoint **ep,
grpc_pollset_set *interested_parties, grpc_pollset_set *interested_parties,
const grpc_channel_args *channel_args,
const grpc_resolved_address *addr, const grpc_resolved_address *addr,
gpr_timespec deadline) { gpr_timespec deadline) {
grpc_tcp_client_connect_impl(exec_ctx, closure, ep, interested_parties, addr, grpc_tcp_client_connect_impl(exec_ctx, closure, ep, interested_parties,
deadline); channel_args, addr, deadline);
} }
#endif /* GRPC_UV */ #endif /* GRPC_UV */
...@@ -76,13 +76,30 @@ struct grpc_tcp_server { ...@@ -76,13 +76,30 @@ struct grpc_tcp_server {
/* shutdown callback */ /* shutdown callback */
grpc_closure *shutdown_complete; grpc_closure *shutdown_complete;
grpc_resource_quota *resource_quota;
}; };
grpc_error *grpc_tcp_server_create(grpc_closure *shutdown_complete, grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx,
grpc_closure *shutdown_complete,
const grpc_channel_args *args, const grpc_channel_args *args,
grpc_tcp_server **server) { grpc_tcp_server **server) {
grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server)); grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server));
(void)args; s->resource_quota = grpc_resource_quota_create(NULL);
for (size_t i = 0; i < (args == NULL ? 0 : args->num_args); i++) {
if (0 == strcmp(GRPC_ARG_RESOURCE_QUOTA, args->args[i].key)) {
if (args->args[i].type == GRPC_ARG_POINTER) {
grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota);
s->resource_quota =
grpc_resource_quota_internal_ref(args->args[i].value.pointer.p);
} else {
grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota);
gpr_free(s);
return GRPC_ERROR_CREATE(GRPC_ARG_RESOURCE_QUOTA
" must be a pointer to a buffer pool");
}
}
}
gpr_ref_init(&s->refs, 1); gpr_ref_init(&s->refs, 1);
s->on_accept_cb = NULL; s->on_accept_cb = NULL;
s->on_accept_cb_arg = NULL; s->on_accept_cb_arg = NULL;
...@@ -119,6 +136,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { ...@@ -119,6 +136,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
gpr_free(sp->handle); gpr_free(sp->handle);
gpr_free(sp); gpr_free(sp);
} }
grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota);
gpr_free(s); gpr_free(s);
} }
...@@ -201,7 +219,7 @@ static void on_connect(uv_stream_t *server, int status) { ...@@ -201,7 +219,7 @@ static void on_connect(uv_stream_t *server, int status) {
} else { } else {
gpr_log(GPR_INFO, "uv_tcp_getpeername error: %s", uv_strerror(status)); gpr_log(GPR_INFO, "uv_tcp_getpeername error: %s", uv_strerror(status));
} }
ep = grpc_tcp_create(client, peer_name_string); ep = grpc_tcp_create(client, sp->server->resource_quota, peer_name_string);
sp->server->on_accept_cb(&exec_ctx, sp->server->on_accept_cb_arg, ep, NULL, sp->server->on_accept_cb(&exec_ctx, sp->server->on_accept_cb_arg, ep, NULL,
&acceptor); &acceptor);
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx);
......
/* /*
* *
* Copyright 2016, Google Inc. * Copyright 2016, Google Inc.
* All rights reserved. * All rights reserved.
* *
* Redistribution and use in source and binary forms, with or without * Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are * modification, are permitted provided that the following conditions are
* met: * met:
* *
* * Redistributions of source code must retain the above copyright * * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer. * notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above * * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer * copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the * in the documentation and/or other materials provided with the
* distribution. * distribution.
* * Neither the name of Google Inc. nor the names of its * * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from * contributors may be used to endorse or promote products derived from
* this software without specific prior written permission. * this software without specific prior written permission.
* *
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
* *
*/ */
#include "src/core/lib/iomgr/port.h" #include "src/core/lib/iomgr/port.h"
...@@ -54,6 +54,9 @@ typedef struct { ...@@ -54,6 +54,9 @@ typedef struct {
grpc_endpoint base; grpc_endpoint base;
gpr_refcount refcount; gpr_refcount refcount;
uv_write_t write_req;
uv_shutdown_t shutdown_req;
uv_tcp_t *handle; uv_tcp_t *handle;
grpc_closure *read_cb; grpc_closure *read_cb;
...@@ -64,14 +67,23 @@ typedef struct { ...@@ -64,14 +67,23 @@ typedef struct {
gpr_slice_buffer *write_slices; gpr_slice_buffer *write_slices;
uv_buf_t *write_buffers; uv_buf_t *write_buffers;
grpc_resource_user resource_user;
bool shutting_down; bool shutting_down;
bool resource_user_shutting_down;
char *peer_string; char *peer_string;
grpc_pollset *pollset; grpc_pollset *pollset;
} grpc_tcp; } grpc_tcp;
static void uv_close_callback(uv_handle_t *handle) { gpr_free(handle); } static void uv_close_callback(uv_handle_t *handle) { gpr_free(handle); }
static void tcp_free(grpc_tcp *tcp) { gpr_free(tcp); } static void tcp_free(grpc_tcp *tcp) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resource_user_destroy(&exec_ctx, &tcp->resource_user);
gpr_free(tcp);
grpc_exec_ctx_finish(&exec_ctx);
}
/*#define GRPC_TCP_REFCOUNT_DEBUG*/ /*#define GRPC_TCP_REFCOUNT_DEBUG*/
#ifdef GRPC_TCP_REFCOUNT_DEBUG #ifdef GRPC_TCP_REFCOUNT_DEBUG
...@@ -106,11 +118,14 @@ static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); } ...@@ -106,11 +118,14 @@ static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); }
static void alloc_uv_buf(uv_handle_t *handle, size_t suggested_size, static void alloc_uv_buf(uv_handle_t *handle, size_t suggested_size,
uv_buf_t *buf) { uv_buf_t *buf) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_tcp *tcp = handle->data; grpc_tcp *tcp = handle->data;
(void)suggested_size; (void)suggested_size;
tcp->read_slice = gpr_slice_malloc(GRPC_TCP_DEFAULT_READ_SLICE_SIZE); tcp->read_slice = grpc_resource_user_slice_malloc(
&exec_ctx, &tcp->resource_user, GRPC_TCP_DEFAULT_READ_SLICE_SIZE);
buf->base = (char *)GPR_SLICE_START_PTR(tcp->read_slice); buf->base = (char *)GPR_SLICE_START_PTR(tcp->read_slice);
buf->len = GPR_SLICE_LENGTH(tcp->read_slice); buf->len = GPR_SLICE_LENGTH(tcp->read_slice);
grpc_exec_ctx_finish(&exec_ctx);
} }
static void read_callback(uv_stream_t *stream, ssize_t nread, static void read_callback(uv_stream_t *stream, ssize_t nread,
...@@ -198,7 +213,8 @@ static void write_callback(uv_write_t *req, int status) { ...@@ -198,7 +213,8 @@ static void write_callback(uv_write_t *req, int status) {
gpr_log(GPR_DEBUG, "write complete on %p: error=%s", tcp, str); gpr_log(GPR_DEBUG, "write complete on %p: error=%s", tcp, str);
} }
gpr_free(tcp->write_buffers); gpr_free(tcp->write_buffers);
gpr_free(req); grpc_resource_user_free(&exec_ctx, &tcp->resource_user,
sizeof(uv_buf_t) * tcp->write_slices->count);
grpc_exec_ctx_sched(&exec_ctx, cb, error, NULL); grpc_exec_ctx_sched(&exec_ctx, cb, error, NULL);
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx);
} }
...@@ -243,12 +259,15 @@ static void uv_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, ...@@ -243,12 +259,15 @@ static void uv_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
tcp->write_cb = cb; tcp->write_cb = cb;
buffer_count = (unsigned int)tcp->write_slices->count; buffer_count = (unsigned int)tcp->write_slices->count;
buffers = gpr_malloc(sizeof(uv_buf_t) * buffer_count); buffers = gpr_malloc(sizeof(uv_buf_t) * buffer_count);
grpc_resource_user_alloc(exec_ctx, &tcp->resource_user,
sizeof(uv_buf_t) * buffer_count, NULL);
for (i = 0; i < buffer_count; i++) { for (i = 0; i < buffer_count; i++) {
slice = &tcp->write_slices->slices[i]; slice = &tcp->write_slices->slices[i];
buffers[i].base = (char *)GPR_SLICE_START_PTR(*slice); buffers[i].base = (char *)GPR_SLICE_START_PTR(*slice);
buffers[i].len = GPR_SLICE_LENGTH(*slice); buffers[i].len = GPR_SLICE_LENGTH(*slice);
} }
write_req = gpr_malloc(sizeof(uv_write_t)); tcp->write_buffers = buffers;
write_req = &tcp->write_req;
write_req->data = tcp; write_req->data = tcp;
TCP_REF(tcp, "write"); TCP_REF(tcp, "write");
// TODO(murgatroid99): figure out what the return value here means // TODO(murgatroid99): figure out what the return value here means
...@@ -274,13 +293,29 @@ static void uv_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, ...@@ -274,13 +293,29 @@ static void uv_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
(void)pollset; (void)pollset;
} }
static void shutdown_callback(uv_shutdown_t *req, int status) { gpr_free(req); } static void shutdown_callback(uv_shutdown_t *req, int status) {}
static void resource_user_shutdown_done(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
TCP_UNREF(arg, "resource_user");
}
static void uv_resource_user_maybe_shutdown(grpc_exec_ctx *exec_ctx,
grpc_tcp *tcp) {
if (!tcp->resource_user_shutting_down) {
tcp->resource_user_shutting_down = true;
TCP_REF(tcp, "resource_user");
grpc_resource_user_shutdown(
exec_ctx, &tcp->resource_user,
grpc_closure_create(resource_user_shutdown_done, tcp));
}
}
static void uv_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { static void uv_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep; grpc_tcp *tcp = (grpc_tcp *)ep;
if (!tcp->shutting_down) { if (!tcp->shutting_down) {
tcp->shutting_down = true; tcp->shutting_down = true;
uv_shutdown_t *req = gpr_malloc(sizeof(uv_shutdown_t)); uv_shutdown_t *req = &tcp->shutdown_req;
uv_shutdown(req, (uv_stream_t *)tcp->handle, shutdown_callback); uv_shutdown(req, (uv_stream_t *)tcp->handle, shutdown_callback);
} }
} }
...@@ -289,6 +324,7 @@ static void uv_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { ...@@ -289,6 +324,7 @@ static void uv_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
grpc_network_status_unregister_endpoint(ep); grpc_network_status_unregister_endpoint(ep);
grpc_tcp *tcp = (grpc_tcp *)ep; grpc_tcp *tcp = (grpc_tcp *)ep;
uv_close((uv_handle_t *)tcp->handle, uv_close_callback); uv_close((uv_handle_t *)tcp->handle, uv_close_callback);
uv_resource_user_maybe_shutdown(exec_ctx, tcp);
TCP_UNREF(tcp, "destroy"); TCP_UNREF(tcp, "destroy");
} }
...@@ -297,18 +333,21 @@ static char *uv_get_peer(grpc_endpoint *ep) { ...@@ -297,18 +333,21 @@ static char *uv_get_peer(grpc_endpoint *ep) {
return gpr_strdup(tcp->peer_string); return gpr_strdup(tcp->peer_string);
} }
static grpc_resource_user *uv_get_resource_user(grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
return &tcp->resource_user;
}
static grpc_workqueue *uv_get_workqueue(grpc_endpoint *ep) { return NULL; } static grpc_workqueue *uv_get_workqueue(grpc_endpoint *ep) { return NULL; }
static grpc_endpoint_vtable vtable = {uv_endpoint_read, static grpc_endpoint_vtable vtable = {
uv_endpoint_write, uv_endpoint_read, uv_endpoint_write, uv_get_workqueue,
uv_get_workqueue, uv_add_to_pollset, uv_add_to_pollset_set, uv_endpoint_shutdown,
uv_add_to_pollset, uv_destroy, uv_get_resource_user, uv_get_peer};
uv_add_to_pollset_set,
uv_endpoint_shutdown,
uv_destroy,
uv_get_peer};
grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, char *peer_string) { grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle,
grpc_resource_quota *resource_quota,
char *peer_string) {
grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp)); grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp));
if (grpc_tcp_trace) { if (grpc_tcp_trace) {
...@@ -325,6 +364,8 @@ grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, char *peer_string) { ...@@ -325,6 +364,8 @@ grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, char *peer_string) {
gpr_ref_init(&tcp->refcount, 1); gpr_ref_init(&tcp->refcount, 1);
tcp->peer_string = gpr_strdup(peer_string); tcp->peer_string = gpr_strdup(peer_string);
tcp->shutting_down = false; tcp->shutting_down = false;
tcp->resource_user_shutting_down = false;
grpc_resource_user_init(&tcp->resource_user, resource_quota, peer_string);
/* Tell network status tracking code about the new endpoint */ /* Tell network status tracking code about the new endpoint */
grpc_network_status_register_endpoint(&tcp->base); grpc_network_status_register_endpoint(&tcp->base);
......
...@@ -52,6 +52,8 @@ extern int grpc_tcp_trace; ...@@ -52,6 +52,8 @@ extern int grpc_tcp_trace;
#define GRPC_TCP_DEFAULT_READ_SLICE_SIZE 8192 #define GRPC_TCP_DEFAULT_READ_SLICE_SIZE 8192
grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, char *peer_string); grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle,
grpc_resource_quota *resource_quota,
char *peer_string);
#endif /* GRPC_CORE_LIB_IOMGR_TCP_UV_H */ #endif /* GRPC_CORE_LIB_IOMGR_TCP_UV_H */
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment