Skip to content
Snippets Groups Projects
Commit e2672c92 authored by murgatroid99's avatar murgatroid99
Browse files

Some slice and resource quota updates to UV and Node code

parent 3abd5cc5
No related branches found
No related tags found
No related merge requests found
...@@ -38,14 +38,17 @@ ...@@ -38,14 +38,17 @@
#include <limits.h> #include <limits.h>
#include <string.h> #include <string.h>
#include <grpc/slice_buffer.h>
#include <grpc/support/alloc.h> #include <grpc/support/alloc.h>
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include <grpc/support/slice_buffer.h>
#include <grpc/support/string_util.h> #include <grpc/support/string_util.h>
#include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/network_status_tracker.h" #include "src/core/lib/iomgr/network_status_tracker.h"
#include "src/core/lib/iomgr/resource_quota.h"
#include "src/core/lib/iomgr/tcp_uv.h" #include "src/core/lib/iomgr/tcp_uv.h"
#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/support/string.h" #include "src/core/lib/support/string.h"
int grpc_tcp_trace = 0; int grpc_tcp_trace = 0;
...@@ -62,15 +65,14 @@ typedef struct { ...@@ -62,15 +65,14 @@ typedef struct {
grpc_closure *read_cb; grpc_closure *read_cb;
grpc_closure *write_cb; grpc_closure *write_cb;
GRPC_SLICE read_slice; grpc_slice read_slice;
GRPC_SLICE_buffer *read_slices; grpc_slice_buffer *read_slices;
GRPC_SLICE_buffer *write_slices; grpc_slice_buffer *write_slices;
uv_buf_t *write_buffers; uv_buf_t *write_buffers;
grpc_resource_user resource_user; grpc_resource_user *resource_user;
bool shutting_down; bool shutting_down;
bool resource_user_shutting_down;
char *peer_string; char *peer_string;
grpc_pollset *pollset; grpc_pollset *pollset;
...@@ -78,23 +80,21 @@ typedef struct { ...@@ -78,23 +80,21 @@ typedef struct {
static void uv_close_callback(uv_handle_t *handle) { gpr_free(handle); } static void uv_close_callback(uv_handle_t *handle) { gpr_free(handle); }
static void tcp_free(grpc_tcp *tcp) { static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_resource_user_unref(exec_ctx, tcp->resource_user);
grpc_resource_user_destroy(&exec_ctx, &tcp->resource_user);
gpr_free(tcp); gpr_free(tcp);
grpc_exec_ctx_finish(&exec_ctx);
} }
/*#define GRPC_TCP_REFCOUNT_DEBUG*/ /*#define GRPC_TCP_REFCOUNT_DEBUG*/
#ifdef GRPC_TCP_REFCOUNT_DEBUG #ifdef GRPC_TCP_REFCOUNT_DEBUG
#define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__) #define TCP_UNREF(exec_ctx, tcp, reason) tcp_unref((exec_ctx), (tcp), (reason), __FILE__, __LINE__)
#define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__) #define TCP_REF(tcp, reason) tcp_ref((exec_ctx), (tcp), (reason), __FILE__, __LINE__)
static void tcp_unref(grpc_tcp *tcp, const char *reason, const char *file, static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp, const char *reason, const char *file,
int line) { int line) {
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %d -> %d", tcp, gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %d -> %d", tcp,
reason, tcp->refcount.count, tcp->refcount.count - 1); reason, tcp->refcount.count, tcp->refcount.count - 1);
if (gpr_unref(&tcp->refcount)) { if (gpr_unref(&tcp->refcount)) {
tcp_free(tcp); tcp_free(exec_ctx, tcp);
} }
} }
...@@ -105,11 +105,11 @@ static void tcp_ref(grpc_tcp *tcp, const char *reason, const char *file, ...@@ -105,11 +105,11 @@ static void tcp_ref(grpc_tcp *tcp, const char *reason, const char *file,
gpr_ref(&tcp->refcount); gpr_ref(&tcp->refcount);
} }
#else #else
#define TCP_UNREF(tcp, reason) tcp_unref((tcp)) #define TCP_UNREF(exec_ctx, tcp, reason) tcp_unref((exec_ctx), (tcp))
#define TCP_REF(tcp, reason) tcp_ref((tcp)) #define TCP_REF(tcp, reason) tcp_ref((tcp))
static void tcp_unref(grpc_tcp *tcp) { static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
if (gpr_unref(&tcp->refcount)) { if (gpr_unref(&tcp->refcount)) {
tcp_free(tcp); tcp_free(exec_ctx, tcp);
} }
} }
...@@ -122,7 +122,7 @@ static void alloc_uv_buf(uv_handle_t *handle, size_t suggested_size, ...@@ -122,7 +122,7 @@ static void alloc_uv_buf(uv_handle_t *handle, size_t suggested_size,
grpc_tcp *tcp = handle->data; grpc_tcp *tcp = handle->data;
(void)suggested_size; (void)suggested_size;
tcp->read_slice = grpc_resource_user_slice_malloc( tcp->read_slice = grpc_resource_user_slice_malloc(
&exec_ctx, &tcp->resource_user, GRPC_TCP_DEFAULT_READ_SLICE_SIZE); &exec_ctx, tcp->resource_user, GRPC_TCP_DEFAULT_READ_SLICE_SIZE);
buf->base = (char *)GRPC_SLICE_START_PTR(tcp->read_slice); buf->base = (char *)GRPC_SLICE_START_PTR(tcp->read_slice);
buf->len = GRPC_SLICE_LENGTH(tcp->read_slice); buf->len = GRPC_SLICE_LENGTH(tcp->read_slice);
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx);
...@@ -130,7 +130,7 @@ static void alloc_uv_buf(uv_handle_t *handle, size_t suggested_size, ...@@ -130,7 +130,7 @@ static void alloc_uv_buf(uv_handle_t *handle, size_t suggested_size,
static void read_callback(uv_stream_t *stream, ssize_t nread, static void read_callback(uv_stream_t *stream, ssize_t nread,
const uv_buf_t *buf) { const uv_buf_t *buf) {
GRPC_SLICE sub; grpc_slice sub;
grpc_error *error; grpc_error *error;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_tcp *tcp = stream->data; grpc_tcp *tcp = stream->data;
...@@ -139,7 +139,7 @@ static void read_callback(uv_stream_t *stream, ssize_t nread, ...@@ -139,7 +139,7 @@ static void read_callback(uv_stream_t *stream, ssize_t nread,
// Nothing happened. Wait for the next callback // Nothing happened. Wait for the next callback
return; return;
} }
TCP_UNREF(tcp, "read"); TCP_UNREF(&exec_ctx, tcp, "read");
tcp->read_cb = NULL; tcp->read_cb = NULL;
// TODO(murgatroid99): figure out what the return value here means // TODO(murgatroid99): figure out what the return value here means
uv_read_stop(stream); uv_read_stop(stream);
...@@ -147,8 +147,8 @@ static void read_callback(uv_stream_t *stream, ssize_t nread, ...@@ -147,8 +147,8 @@ static void read_callback(uv_stream_t *stream, ssize_t nread,
error = GRPC_ERROR_CREATE("EOF"); error = GRPC_ERROR_CREATE("EOF");
} else if (nread > 0) { } else if (nread > 0) {
// Successful read // Successful read
sub = GRPC_SLICE_sub_no_ref(tcp->read_slice, 0, (size_t)nread); sub = grpc_slice_sub_no_ref(tcp->read_slice, 0, (size_t)nread);
GRPC_SLICE_buffer_add(tcp->read_slices, sub); grpc_slice_buffer_add(tcp->read_slices, sub);
error = GRPC_ERROR_NONE; error = GRPC_ERROR_NONE;
if (grpc_tcp_trace) { if (grpc_tcp_trace) {
size_t i; size_t i;
...@@ -156,7 +156,7 @@ static void read_callback(uv_stream_t *stream, ssize_t nread, ...@@ -156,7 +156,7 @@ static void read_callback(uv_stream_t *stream, ssize_t nread,
gpr_log(GPR_DEBUG, "read: error=%s", str); gpr_log(GPR_DEBUG, "read: error=%s", str);
grpc_error_free_string(str); grpc_error_free_string(str);
for (i = 0; i < tcp->read_slices->count; i++) { for (i = 0; i < tcp->read_slices->count; i++) {
char *dump = gpr_dump_slice(tcp->read_slices->slices[i], char *dump = grpc_dump_slice(tcp->read_slices->slices[i],
GPR_DUMP_HEX | GPR_DUMP_ASCII); GPR_DUMP_HEX | GPR_DUMP_ASCII);
gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", tcp, tcp->peer_string, gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", tcp, tcp->peer_string,
dump); dump);
...@@ -172,14 +172,14 @@ static void read_callback(uv_stream_t *stream, ssize_t nread, ...@@ -172,14 +172,14 @@ static void read_callback(uv_stream_t *stream, ssize_t nread,
} }
static void uv_endpoint_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, static void uv_endpoint_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
GRPC_SLICE_buffer *read_slices, grpc_closure *cb) { grpc_slice_buffer *read_slices, grpc_closure *cb) {
grpc_tcp *tcp = (grpc_tcp *)ep; grpc_tcp *tcp = (grpc_tcp *)ep;
int status; int status;
grpc_error *error = GRPC_ERROR_NONE; grpc_error *error = GRPC_ERROR_NONE;
GPR_ASSERT(tcp->read_cb == NULL); GPR_ASSERT(tcp->read_cb == NULL);
tcp->read_cb = cb; tcp->read_cb = cb;
tcp->read_slices = read_slices; tcp->read_slices = read_slices;
GRPC_SLICE_buffer_reset_and_unref(read_slices); grpc_slice_buffer_reset_and_unref(read_slices);
TCP_REF(tcp, "read"); TCP_REF(tcp, "read");
// TODO(murgatroid99): figure out what the return value here means // TODO(murgatroid99): figure out what the return value here means
status = status =
...@@ -202,7 +202,7 @@ static void write_callback(uv_write_t *req, int status) { ...@@ -202,7 +202,7 @@ static void write_callback(uv_write_t *req, int status) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_closure *cb = tcp->write_cb; grpc_closure *cb = tcp->write_cb;
tcp->write_cb = NULL; tcp->write_cb = NULL;
TCP_UNREF(tcp, "write"); TCP_UNREF(&exec_ctx, tcp, "write");
if (status == 0) { if (status == 0) {
error = GRPC_ERROR_NONE; error = GRPC_ERROR_NONE;
} else { } else {
...@@ -213,27 +213,27 @@ static void write_callback(uv_write_t *req, int status) { ...@@ -213,27 +213,27 @@ static void write_callback(uv_write_t *req, int status) {
gpr_log(GPR_DEBUG, "write complete on %p: error=%s", tcp, str); gpr_log(GPR_DEBUG, "write complete on %p: error=%s", tcp, str);
} }
gpr_free(tcp->write_buffers); gpr_free(tcp->write_buffers);
grpc_resource_user_free(&exec_ctx, &tcp->resource_user, grpc_resource_user_free(&exec_ctx, tcp->resource_user,
sizeof(uv_buf_t) * tcp->write_slices->count); sizeof(uv_buf_t) * tcp->write_slices->count);
grpc_exec_ctx_sched(&exec_ctx, cb, error, NULL); grpc_exec_ctx_sched(&exec_ctx, cb, error, NULL);
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx);
} }
static void uv_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, static void uv_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
GRPC_SLICE_buffer *write_slices, grpc_slice_buffer *write_slices,
grpc_closure *cb) { grpc_closure *cb) {
grpc_tcp *tcp = (grpc_tcp *)ep; grpc_tcp *tcp = (grpc_tcp *)ep;
uv_buf_t *buffers; uv_buf_t *buffers;
unsigned int buffer_count; unsigned int buffer_count;
unsigned int i; unsigned int i;
GRPC_SLICE *slice; grpc_slice *slice;
uv_write_t *write_req; uv_write_t *write_req;
if (grpc_tcp_trace) { if (grpc_tcp_trace) {
size_t j; size_t j;
for (j = 0; j < write_slices->count; j++) { for (j = 0; j < write_slices->count; j++) {
char *data = gpr_dump_slice(write_slices->slices[j], char *data = grpc_dump_slice(write_slices->slices[j],
GPR_DUMP_HEX | GPR_DUMP_ASCII); GPR_DUMP_HEX | GPR_DUMP_ASCII);
gpr_log(GPR_DEBUG, "WRITE %p (peer=%s): %s", tcp, tcp->peer_string, data); gpr_log(GPR_DEBUG, "WRITE %p (peer=%s): %s", tcp, tcp->peer_string, data);
gpr_free(data); gpr_free(data);
...@@ -259,7 +259,7 @@ static void uv_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, ...@@ -259,7 +259,7 @@ static void uv_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
tcp->write_cb = cb; tcp->write_cb = cb;
buffer_count = (unsigned int)tcp->write_slices->count; buffer_count = (unsigned int)tcp->write_slices->count;
buffers = gpr_malloc(sizeof(uv_buf_t) * buffer_count); buffers = gpr_malloc(sizeof(uv_buf_t) * buffer_count);
grpc_resource_user_alloc(exec_ctx, &tcp->resource_user, grpc_resource_user_alloc(exec_ctx, tcp->resource_user,
sizeof(uv_buf_t) * buffer_count, NULL); sizeof(uv_buf_t) * buffer_count, NULL);
for (i = 0; i < buffer_count; i++) { for (i = 0; i < buffer_count; i++) {
slice = &tcp->write_slices->slices[i]; slice = &tcp->write_slices->slices[i];
...@@ -295,22 +295,6 @@ static void uv_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, ...@@ -295,22 +295,6 @@ static void uv_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
static void shutdown_callback(uv_shutdown_t *req, int status) {} static void shutdown_callback(uv_shutdown_t *req, int status) {}
static void resource_user_shutdown_done(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
TCP_UNREF(arg, "resource_user");
}
static void uv_resource_user_maybe_shutdown(grpc_exec_ctx *exec_ctx,
grpc_tcp *tcp) {
if (!tcp->resource_user_shutting_down) {
tcp->resource_user_shutting_down = true;
TCP_REF(tcp, "resource_user");
grpc_resource_user_shutdown(
exec_ctx, &tcp->resource_user,
grpc_closure_create(resource_user_shutdown_done, tcp));
}
}
static void uv_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { static void uv_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep; grpc_tcp *tcp = (grpc_tcp *)ep;
if (!tcp->shutting_down) { if (!tcp->shutting_down) {
...@@ -324,8 +308,7 @@ static void uv_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { ...@@ -324,8 +308,7 @@ static void uv_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
grpc_network_status_unregister_endpoint(ep); grpc_network_status_unregister_endpoint(ep);
grpc_tcp *tcp = (grpc_tcp *)ep; grpc_tcp *tcp = (grpc_tcp *)ep;
uv_close((uv_handle_t *)tcp->handle, uv_close_callback); uv_close((uv_handle_t *)tcp->handle, uv_close_callback);
uv_resource_user_maybe_shutdown(exec_ctx, tcp); TCP_UNREF(exec_ctx, tcp, "destroy");
TCP_UNREF(tcp, "destroy");
} }
static char *uv_get_peer(grpc_endpoint *ep) { static char *uv_get_peer(grpc_endpoint *ep) {
...@@ -335,7 +318,7 @@ static char *uv_get_peer(grpc_endpoint *ep) { ...@@ -335,7 +318,7 @@ static char *uv_get_peer(grpc_endpoint *ep) {
static grpc_resource_user *uv_get_resource_user(grpc_endpoint *ep) { static grpc_resource_user *uv_get_resource_user(grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep; grpc_tcp *tcp = (grpc_tcp *)ep;
return &tcp->resource_user; return tcp->resource_user;
} }
static grpc_workqueue *uv_get_workqueue(grpc_endpoint *ep) { return NULL; } static grpc_workqueue *uv_get_workqueue(grpc_endpoint *ep) { return NULL; }
...@@ -364,8 +347,7 @@ grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, ...@@ -364,8 +347,7 @@ grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle,
gpr_ref_init(&tcp->refcount, 1); gpr_ref_init(&tcp->refcount, 1);
tcp->peer_string = gpr_strdup(peer_string); tcp->peer_string = gpr_strdup(peer_string);
tcp->shutting_down = false; tcp->shutting_down = false;
tcp->resource_user_shutting_down = false; tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string);
grpc_resource_user_init(&tcp->resource_user, resource_quota, peer_string);
/* Tell network status tracking code about the new endpoint */ /* Tell network status tracking code about the new endpoint */
grpc_network_status_register_endpoint(&tcp->base); grpc_network_status_register_endpoint(&tcp->base);
......
...@@ -37,7 +37,7 @@ ...@@ -37,7 +37,7 @@
#include <nan.h> #include <nan.h>
#include "grpc/grpc.h" #include "grpc/grpc.h"
#include "grpc/byte_buffer_reader.h" #include "grpc/byte_buffer_reader.h"
#include "grpc/support/slice.h" #include "grpc/slice.h"
#include "byte_buffer.h" #include "byte_buffer.h"
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment