Skip to content
Snippets Groups Projects
Commit 81bcc4ca authored by Craig Tiller's avatar Craig Tiller
Browse files

Make endpoint peer API work on Windows

parent 1b22b9db
No related branches found
No related tags found
No related merge requests found
...@@ -37,7 +37,9 @@ ...@@ -37,7 +37,9 @@
#include <stdio.h> #include <stdio.h>
#include <string.h> #include <string.h>
#ifdef GPR_POSIX_SOCKET
#include <sys/un.h> #include <sys/un.h>
#endif
#include <grpc/support/alloc.h> #include <grpc/support/alloc.h>
#include <grpc/support/host_port.h> #include <grpc/support/host_port.h>
...@@ -172,7 +174,7 @@ static int parse_ipv4(grpc_uri *uri, struct sockaddr_storage *addr, int *len) { ...@@ -172,7 +174,7 @@ static int parse_ipv4(grpc_uri *uri, struct sockaddr_storage *addr, int *len) {
memset(in, 0, sizeof(*in)); memset(in, 0, sizeof(*in));
*len = sizeof(*in); *len = sizeof(*in);
in->sin_family = AF_INET; in->sin_family = AF_INET;
if (inet_aton(host, &in->sin_addr) == 0) { if (inet_pton(AF_INET, host, &in->sin_addr) == 0) {
gpr_log(GPR_ERROR, "invalid ipv4 address: '%s'", host); gpr_log(GPR_ERROR, "invalid ipv4 address: '%s'", host);
goto done; goto done;
} }
......
...@@ -81,8 +81,8 @@ grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, size_t read ...@@ -81,8 +81,8 @@ grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, size_t read
SOCKET sv[2]; SOCKET sv[2];
grpc_endpoint_pair p; grpc_endpoint_pair p;
create_sockets(sv); create_sockets(sv);
p.client = grpc_tcp_create(grpc_winsocket_create(sv[1], "endpoint:client")); p.client = grpc_tcp_create(grpc_winsocket_create(sv[1], "endpoint:client"), "endpoint:server");
p.server = grpc_tcp_create(grpc_winsocket_create(sv[0], "endpoint:server")); p.server = grpc_tcp_create(grpc_winsocket_create(sv[0], "endpoint:server"), "endpoint:client");
return p; return p;
} }
......
...@@ -58,6 +58,7 @@ typedef struct { ...@@ -58,6 +58,7 @@ typedef struct {
grpc_winsocket *socket; grpc_winsocket *socket;
gpr_timespec deadline; gpr_timespec deadline;
grpc_alarm alarm; grpc_alarm alarm;
char *addr_name;
int refs; int refs;
int aborted; int aborted;
} async_connect; } async_connect;
...@@ -67,6 +68,7 @@ static void async_connect_cleanup(async_connect *ac) { ...@@ -67,6 +68,7 @@ static void async_connect_cleanup(async_connect *ac) {
gpr_mu_unlock(&ac->mu); gpr_mu_unlock(&ac->mu);
if (done) { if (done) {
gpr_mu_destroy(&ac->mu); gpr_mu_destroy(&ac->mu);
gpr_free(ac->addr_name);
gpr_free(ac); gpr_free(ac);
} }
} }
...@@ -107,7 +109,7 @@ static void on_connect(void *acp, int from_iocp) { ...@@ -107,7 +109,7 @@ static void on_connect(void *acp, int from_iocp) {
gpr_log(GPR_ERROR, "on_connect error: %s", utf8_message); gpr_log(GPR_ERROR, "on_connect error: %s", utf8_message);
gpr_free(utf8_message); gpr_free(utf8_message);
} else if (!aborted) { } else if (!aborted) {
ep = grpc_tcp_create(ac->socket); ep = grpc_tcp_create(ac->socket, ac->addr_name);
} }
} else { } else {
gpr_log(GPR_ERROR, "on_connect is shutting down"); gpr_log(GPR_ERROR, "on_connect is shutting down");
...@@ -213,6 +215,7 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *tcp), ...@@ -213,6 +215,7 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *tcp),
ac->socket = socket; ac->socket = socket;
gpr_mu_init(&ac->mu); gpr_mu_init(&ac->mu);
ac->refs = 2; ac->refs = 2;
ac->addr_name = grpc_sockaddr_to_uri(addr);
ac->aborted = 0; ac->aborted = 0;
grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac, grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac,
......
...@@ -243,6 +243,10 @@ static void on_accept(void *arg, int from_iocp) { ...@@ -243,6 +243,10 @@ static void on_accept(void *arg, int from_iocp) {
SOCKET sock = sp->new_socket; SOCKET sock = sp->new_socket;
grpc_winsocket_callback_info *info = &sp->socket->read_info; grpc_winsocket_callback_info *info = &sp->socket->read_info;
grpc_endpoint *ep = NULL; grpc_endpoint *ep = NULL;
struct sockaddr_storage peer_name;
char *peer_name_string;
char *fd_name;
int peer_name_len = sizeof(peer_name);
DWORD transfered_bytes; DWORD transfered_bytes;
DWORD flags; DWORD flags;
BOOL wsa_success; BOOL wsa_success;
...@@ -277,8 +281,12 @@ static void on_accept(void *arg, int from_iocp) { ...@@ -277,8 +281,12 @@ static void on_accept(void *arg, int from_iocp) {
} }
} else { } else {
if (!sp->shutting_down) { if (!sp->shutting_down) {
/* TODO(ctiller): add sockaddr address to label */ getpeername(sock, (struct sockaddr*)&peer_name, &peer_name_len);
ep = grpc_tcp_create(grpc_winsocket_create(sock, "server")); peer_name_string = grpc_sockaddr_to_uri((struct sockaddr*)&peer_name);
gpr_asprintf(&fd_name, "tcp_server:%s", peer_name_string);
ep = grpc_tcp_create(grpc_winsocket_create(sock, fd_name), peer_name_string);
gpr_free(fd_name);
gpr_free(peer_name_string);
} }
} }
......
...@@ -96,6 +96,8 @@ typedef struct grpc_tcp { ...@@ -96,6 +96,8 @@ typedef struct grpc_tcp {
to protect ourselves when requesting a shutdown. */ to protect ourselves when requesting a shutdown. */
gpr_mu mu; gpr_mu mu;
int shutting_down; int shutting_down;
char *peer_string;
} grpc_tcp; } grpc_tcp;
static void tcp_ref(grpc_tcp *tcp) { static void tcp_ref(grpc_tcp *tcp) {
...@@ -107,6 +109,7 @@ static void tcp_unref(grpc_tcp *tcp) { ...@@ -107,6 +109,7 @@ static void tcp_unref(grpc_tcp *tcp) {
gpr_slice_buffer_destroy(&tcp->write_slices); gpr_slice_buffer_destroy(&tcp->write_slices);
grpc_winsocket_orphan(tcp->socket); grpc_winsocket_orphan(tcp->socket);
gpr_mu_destroy(&tcp->mu); gpr_mu_destroy(&tcp->mu);
gpr_free(tcp->peer_string);
gpr_free(tcp); gpr_free(tcp);
} }
} }
...@@ -393,11 +396,16 @@ static void win_destroy(grpc_endpoint *ep) { ...@@ -393,11 +396,16 @@ static void win_destroy(grpc_endpoint *ep) {
tcp_unref(tcp); tcp_unref(tcp);
} }
static char *win_get_peer(grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
return gpr_strdup(tcp->peer_string);
}
static grpc_endpoint_vtable vtable = { static grpc_endpoint_vtable vtable = {
win_notify_on_read, win_write, win_add_to_pollset, win_shutdown, win_destroy win_notify_on_read, win_write, win_add_to_pollset, win_shutdown, win_destroy, win_get_peer
}; };
grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket) { grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string) {
grpc_tcp *tcp = (grpc_tcp *) gpr_malloc(sizeof(grpc_tcp)); grpc_tcp *tcp = (grpc_tcp *) gpr_malloc(sizeof(grpc_tcp));
memset(tcp, 0, sizeof(grpc_tcp)); memset(tcp, 0, sizeof(grpc_tcp));
tcp->base.vtable = &vtable; tcp->base.vtable = &vtable;
...@@ -405,6 +413,7 @@ grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket) { ...@@ -405,6 +413,7 @@ grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket) {
gpr_mu_init(&tcp->mu); gpr_mu_init(&tcp->mu);
gpr_slice_buffer_init(&tcp->write_slices); gpr_slice_buffer_init(&tcp->write_slices);
gpr_ref_init(&tcp->refcount, 1); gpr_ref_init(&tcp->refcount, 1);
tcp->peer_string = gpr_strdup(peer_string);
return &tcp->base; return &tcp->base;
} }
......
...@@ -50,7 +50,7 @@ ...@@ -50,7 +50,7 @@
/* Create a tcp endpoint given a winsock handle. /* Create a tcp endpoint given a winsock handle.
* Takes ownership of the handle. * Takes ownership of the handle.
*/ */
grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket); grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string);
int grpc_tcp_prepare_socket(SOCKET sock); int grpc_tcp_prepare_socket(SOCKET sock);
......
...@@ -45,6 +45,6 @@ std::shared_ptr<ChannelInterface> CreateChannel( ...@@ -45,6 +45,6 @@ std::shared_ptr<ChannelInterface> CreateChannel(
const ChannelArguments& args) { const ChannelArguments& args) {
return creds ? creds->CreateChannel(target, args) return creds ? creds->CreateChannel(target, args)
: std::shared_ptr<ChannelInterface>( : std::shared_ptr<ChannelInterface>(
new Channel(target, grpc_lame_client_channel_create())); new Channel(target, grpc_lame_client_channel_create(NULL)));
} }
} // namespace grpc } // namespace grpc
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment