Skip to content
Snippets Groups Projects
Commit de090655 authored by Craig Tiller's avatar Craig Tiller
Browse files

Update windows

parent 9e0b79e0
No related branches found
No related tags found
No related merge requests found
...@@ -83,15 +83,16 @@ static void create_sockets(SOCKET sv[2]) { ...@@ -83,15 +83,16 @@ static void create_sockets(SOCKET sv[2]) {
} }
grpc_endpoint_pair grpc_iomgr_create_endpoint_pair( grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(
const char *name, grpc_resource_quota *resource_quota, const char *name, grpc_channel_args *channel_args) {
size_t read_slice_size) {
SOCKET sv[2]; SOCKET sv[2];
grpc_endpoint_pair p; grpc_endpoint_pair p;
create_sockets(sv); create_sockets(sv);
p.client = grpc_tcp_create(grpc_winsocket_create(sv[1], "endpoint:client"), grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
resource_quota, "endpoint:server"); p.client = grpc_tcp_create(&exec_ctx, grpc_winsocket_create(sv[1], "endpoint:client"),
p.server = grpc_tcp_create(grpc_winsocket_create(sv[0], "endpoint:server"), channel_args, "endpoint:server");
resource_quota, "endpoint:client"); p.server = grpc_tcp_create(&exec_ctx,grpc_winsocket_create(sv[0], "endpoint:server"),
channel_args, "endpoint:client");
grpc_exec_ctx_finish(&exec_ctx);
return p; return p;
} }
......
...@@ -63,7 +63,7 @@ typedef struct { ...@@ -63,7 +63,7 @@ typedef struct {
int refs; int refs;
grpc_closure on_connect; grpc_closure on_connect;
grpc_endpoint **endpoint; grpc_endpoint **endpoint;
grpc_resource_quota *resource_quota; grpc_channel_args *channel_args;
} async_connect; } async_connect;
static void async_connect_unlock_and_cleanup(grpc_exec_ctx *exec_ctx, static void async_connect_unlock_and_cleanup(grpc_exec_ctx *exec_ctx,
...@@ -72,7 +72,7 @@ static void async_connect_unlock_and_cleanup(grpc_exec_ctx *exec_ctx, ...@@ -72,7 +72,7 @@ static void async_connect_unlock_and_cleanup(grpc_exec_ctx *exec_ctx,
int done = (--ac->refs == 0); int done = (--ac->refs == 0);
gpr_mu_unlock(&ac->mu); gpr_mu_unlock(&ac->mu);
if (done) { if (done) {
grpc_resource_quota_unref_internal(exec_ctx, ac->resource_quota); grpc_channel_args_destroy(exec_ctx, ac->channel_args);
gpr_mu_destroy(&ac->mu); gpr_mu_destroy(&ac->mu);
gpr_free(ac->addr_name); gpr_free(ac->addr_name);
gpr_free(ac); gpr_free(ac);
...@@ -119,7 +119,7 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { ...@@ -119,7 +119,7 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
if (!wsa_success) { if (!wsa_success) {
error = GRPC_WSA_ERROR(WSAGetLastError(), "ConnectEx"); error = GRPC_WSA_ERROR(WSAGetLastError(), "ConnectEx");
} else { } else {
*ep = grpc_tcp_create(socket, ac->resource_quota, ac->addr_name); *ep = grpc_tcp_create(exec_ctx, socket, ac->channel_args, ac->addr_name);
socket = NULL; socket = NULL;
} }
} else { } else {
...@@ -152,17 +152,6 @@ static void tcp_client_connect_impl( ...@@ -152,17 +152,6 @@ static void tcp_client_connect_impl(
grpc_winsocket_callback_info *info; grpc_winsocket_callback_info *info;
grpc_error *error = GRPC_ERROR_NONE; grpc_error *error = GRPC_ERROR_NONE;
grpc_resource_quota *resource_quota = grpc_resource_quota_create(NULL);
if (channel_args != NULL) {
for (size_t i = 0; i < channel_args->num_args; i++) {
if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
resource_quota = grpc_resource_quota_ref_internal(
channel_args->args[i].value.pointer.p);
}
}
}
*endpoint = NULL; *endpoint = NULL;
/* Use dualstack sockets where available. */ /* Use dualstack sockets where available. */
...@@ -225,7 +214,7 @@ static void tcp_client_connect_impl( ...@@ -225,7 +214,7 @@ static void tcp_client_connect_impl(
ac->refs = 2; ac->refs = 2;
ac->addr_name = grpc_sockaddr_to_uri(addr); ac->addr_name = grpc_sockaddr_to_uri(addr);
ac->endpoint = endpoint; ac->endpoint = endpoint;
ac->resource_quota = resource_quota; ac->channel_args = grpc_channel_args_copy(channel_args);
grpc_closure_init(&ac->on_connect, on_connect, ac, grpc_schedule_on_exec_ctx); grpc_closure_init(&ac->on_connect, on_connect, ac, grpc_schedule_on_exec_ctx);
grpc_closure_init(&ac->on_alarm, on_alarm, ac, grpc_schedule_on_exec_ctx); grpc_closure_init(&ac->on_alarm, on_alarm, ac, grpc_schedule_on_exec_ctx);
...@@ -247,7 +236,6 @@ failure: ...@@ -247,7 +236,6 @@ failure:
} else if (sock != INVALID_SOCKET) { } else if (sock != INVALID_SOCKET) {
closesocket(sock); closesocket(sock);
} }
grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
grpc_closure_sched(exec_ctx, on_done, final_error); grpc_closure_sched(exec_ctx, on_done, final_error);
} }
......
...@@ -53,6 +53,7 @@ ...@@ -53,6 +53,7 @@
#include "src/core/lib/iomgr/socket_windows.h" #include "src/core/lib/iomgr/socket_windows.h"
#include "src/core/lib/iomgr/tcp_server.h" #include "src/core/lib/iomgr/tcp_server.h"
#include "src/core/lib/iomgr/tcp_windows.h" #include "src/core/lib/iomgr/tcp_windows.h"
#include "src/core/lib/channel/channel_args.h"
#define MIN_SAFE_ACCEPT_QUEUE_SIZE 100 #define MIN_SAFE_ACCEPT_QUEUE_SIZE 100
...@@ -102,7 +103,7 @@ struct grpc_tcp_server { ...@@ -102,7 +103,7 @@ struct grpc_tcp_server {
/* shutdown callback */ /* shutdown callback */
grpc_closure *shutdown_complete; grpc_closure *shutdown_complete;
grpc_resource_quota *resource_quota; grpc_channel_args *channel_args;
}; };
/* Public function. Allocates the proper data structures to hold a /* Public function. Allocates the proper data structures to hold a
...@@ -112,21 +113,7 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, ...@@ -112,21 +113,7 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx,
const grpc_channel_args *args, const grpc_channel_args *args,
grpc_tcp_server **server) { grpc_tcp_server **server) {
grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server)); grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server));
s->resource_quota = grpc_resource_quota_create(NULL); s->channel_args = grpc_channel_args_copy(args);
for (size_t i = 0; i < (args == NULL ? 0 : args->num_args); i++) {
if (0 == strcmp(GRPC_ARG_RESOURCE_QUOTA, args->args[i].key)) {
if (args->args[i].type == GRPC_ARG_POINTER) {
grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota);
s->resource_quota =
grpc_resource_quota_ref_internal(args->args[i].value.pointer.p);
} else {
grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota);
gpr_free(s);
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
GRPC_ARG_RESOURCE_QUOTA " must be a pointer to a buffer pool");
}
}
}
gpr_ref_init(&s->refs, 1); gpr_ref_init(&s->refs, 1);
gpr_mu_init(&s->mu); gpr_mu_init(&s->mu);
s->active_ports = 0; s->active_ports = 0;
...@@ -155,7 +142,7 @@ static void destroy_server(grpc_exec_ctx *exec_ctx, void *arg, ...@@ -155,7 +142,7 @@ static void destroy_server(grpc_exec_ctx *exec_ctx, void *arg,
grpc_winsocket_destroy(sp->socket); grpc_winsocket_destroy(sp->socket);
gpr_free(sp); gpr_free(sp);
} }
grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota); grpc_channel_args_destroy(exec_ctx, s->channel_args);
gpr_free(s); gpr_free(s);
} }
...@@ -383,8 +370,8 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { ...@@ -383,8 +370,8 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
gpr_free(utf8_message); gpr_free(utf8_message);
} }
gpr_asprintf(&fd_name, "tcp_server:%s", peer_name_string); gpr_asprintf(&fd_name, "tcp_server:%s", peer_name_string);
ep = grpc_tcp_create(grpc_winsocket_create(sock, fd_name), ep = grpc_tcp_create(exec_ctx, grpc_winsocket_create(sock, fd_name),
sp->server->resource_quota, peer_name_string); sp->server->channel_args, peer_name_string);
gpr_free(fd_name); gpr_free(fd_name);
gpr_free(peer_name_string); gpr_free(peer_name_string);
} else { } else {
......
...@@ -430,10 +430,21 @@ static grpc_endpoint_vtable vtable = {win_read, ...@@ -430,10 +430,21 @@ static grpc_endpoint_vtable vtable = {win_read,
win_get_peer, win_get_peer,
win_get_fd}; win_get_fd};
grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, grpc_endpoint *grpc_tcp_create(grpc_exec_ctx *exec_ctx, grpc_winsocket *socket,
grpc_resource_quota *resource_quota, grpc_channel_args *channel_args,
char *peer_string) { char *peer_string) {
grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp)); grpc_resource_quota *resource_quota = grpc_resource_quota_create(NULL);
if (channel_args != NULL) {
for (size_t i = 0; i < channel_args->num_args; i++) {
if (0 ==
strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
resource_quota = grpc_resource_quota_ref_internal(
channel_args->args[i].value.pointer.p);
}
}
}
grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp));
memset(tcp, 0, sizeof(grpc_tcp)); memset(tcp, 0, sizeof(grpc_tcp));
tcp->base.vtable = &vtable; tcp->base.vtable = &vtable;
tcp->socket = socket; tcp->socket = socket;
......
...@@ -50,8 +50,8 @@ ...@@ -50,8 +50,8 @@
/* Create a tcp endpoint given a winsock handle. /* Create a tcp endpoint given a winsock handle.
* Takes ownership of the handle. * Takes ownership of the handle.
*/ */
grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, grpc_endpoint *grpc_tcp_create(grpc_exec_ctx *exec_ctx, grpc_winsocket *socket,
grpc_resource_quota *resource_quota, grpc_channel_args *channel_args,
char *peer_string); char *peer_string);
grpc_error *grpc_tcp_prepare_socket(SOCKET sock); grpc_error *grpc_tcp_prepare_socket(SOCKET sock);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment