Skip to content
Snippets Groups Projects
Commit 6e9fab5e authored by Adam Michalik's avatar Adam Michalik
Browse files

Add support for establishing client channel using existing FD

parent d30d4e27
No related branches found
No related tags found
No related merge requests found
...@@ -240,6 +240,12 @@ GRPCAPI char *grpc_channel_get_target(grpc_channel *channel); ...@@ -240,6 +240,12 @@ GRPCAPI char *grpc_channel_get_target(grpc_channel *channel);
GRPCAPI grpc_channel *grpc_insecure_channel_create( GRPCAPI grpc_channel *grpc_insecure_channel_create(
const char *target, const grpc_channel_args *args, void *reserved); const char *target, const grpc_channel_args *args, void *reserved);
/** Create a client channel to 'target' using file descriptor 'fd'. The 'target'
argument will be used to indicate the name for this channel. See the comment
for grpc_insecure_channel_create for description of 'args' argument. */
GRPCAPI grpc_channel *grpc_insecure_channel_create_from_fd(
const char *target, int fd, const grpc_channel_args *args);
/** Create a lame client: this client fails every operation attempted on it. */ /** Create a lame client: this client fails every operation attempted on it. */
GRPCAPI grpc_channel *grpc_lame_client_channel_create( GRPCAPI grpc_channel *grpc_lame_client_channel_create(
const char *target, grpc_status_code error_code, const char *error_message); const char *target, grpc_status_code error_code, const char *error_message);
......
...@@ -33,6 +33,7 @@ ...@@ -33,6 +33,7 @@
#include <grpc/grpc.h> #include <grpc/grpc.h>
#include <fcntl.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
...@@ -47,6 +48,7 @@ ...@@ -47,6 +48,7 @@
#include "src/core/lib/channel/compress_filter.h" #include "src/core/lib/channel/compress_filter.h"
#include "src/core/lib/channel/http_client_filter.h" #include "src/core/lib/channel/http_client_filter.h"
#include "src/core/lib/iomgr/tcp_client.h" #include "src/core/lib/iomgr/tcp_client.h"
#include "src/core/lib/iomgr/tcp_posix.h"
#include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/channel.h" #include "src/core/lib/surface/channel.h"
...@@ -139,31 +141,31 @@ typedef struct { ...@@ -139,31 +141,31 @@ typedef struct {
gpr_refcount refs; gpr_refcount refs;
grpc_channel_args *merge_args; grpc_channel_args *merge_args;
grpc_channel *master; grpc_channel *master;
} client_channel_factory; } client_tcp_channel_factory;
static void client_channel_factory_ref( static void client_tcp_channel_factory_ref(
grpc_client_channel_factory *cc_factory) { grpc_client_channel_factory *cc_factory) {
client_channel_factory *f = (client_channel_factory *)cc_factory; client_tcp_channel_factory *f = (client_tcp_channel_factory *)cc_factory;
gpr_ref(&f->refs); gpr_ref(&f->refs);
} }
static void client_channel_factory_unref( static void client_tcp_channel_factory_unref(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory) { grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory) {
client_channel_factory *f = (client_channel_factory *)cc_factory; client_tcp_channel_factory *f = (client_tcp_channel_factory *)cc_factory;
if (gpr_unref(&f->refs)) { if (gpr_unref(&f->refs)) {
if (f->master != NULL) { if (f->master != NULL) {
GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, f->master, GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, f->master,
"client_channel_factory"); "client_tcp_channel_factory");
} }
grpc_channel_args_destroy(f->merge_args); grpc_channel_args_destroy(f->merge_args);
gpr_free(f); gpr_free(f);
} }
} }
static grpc_subchannel *client_channel_factory_create_subchannel( static grpc_subchannel *client_tcp_channel_factory_create_subchannel(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory, grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
grpc_subchannel_args *args) { grpc_subchannel_args *args) {
client_channel_factory *f = (client_channel_factory *)cc_factory; client_tcp_channel_factory *f = (client_tcp_channel_factory *)cc_factory;
connector *c = gpr_malloc(sizeof(*c)); connector *c = gpr_malloc(sizeof(*c));
grpc_channel_args *final_args = grpc_channel_args *final_args =
grpc_channel_args_merge(args->args, f->merge_args); grpc_channel_args_merge(args->args, f->merge_args);
...@@ -178,11 +180,11 @@ static grpc_subchannel *client_channel_factory_create_subchannel( ...@@ -178,11 +180,11 @@ static grpc_subchannel *client_channel_factory_create_subchannel(
return s; return s;
} }
static grpc_channel *client_channel_factory_create_channel( static grpc_channel *client_tcp_channel_factory_create_channel(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory, grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
const char *target, grpc_client_channel_type type, const char *target, grpc_client_channel_type type,
grpc_channel_args *args) { grpc_channel_args *args) {
client_channel_factory *f = (client_channel_factory *)cc_factory; client_tcp_channel_factory *f = (client_tcp_channel_factory *)cc_factory;
grpc_channel_args *final_args = grpc_channel_args_merge(args, f->merge_args); grpc_channel_args *final_args = grpc_channel_args_merge(args, f->merge_args);
grpc_channel *channel = grpc_channel_create(exec_ctx, target, final_args, grpc_channel *channel = grpc_channel_create(exec_ctx, target, final_args,
GRPC_CLIENT_CHANNEL, NULL); GRPC_CLIENT_CHANNEL, NULL);
...@@ -190,7 +192,7 @@ static grpc_channel *client_channel_factory_create_channel( ...@@ -190,7 +192,7 @@ static grpc_channel *client_channel_factory_create_channel(
grpc_resolver *resolver = grpc_resolver_create(target, &f->base); grpc_resolver *resolver = grpc_resolver_create(target, &f->base);
if (!resolver) { if (!resolver) {
GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel,
"client_channel_factory_create_channel"); "client_tcp_channel_factory_create_channel");
return NULL; return NULL;
} }
...@@ -201,10 +203,11 @@ static grpc_channel *client_channel_factory_create_channel( ...@@ -201,10 +203,11 @@ static grpc_channel *client_channel_factory_create_channel(
return channel; return channel;
} }
static const grpc_client_channel_factory_vtable client_channel_factory_vtable = static const grpc_client_channel_factory_vtable
{client_channel_factory_ref, client_channel_factory_unref, client_tcp_channel_factory_vtable =
client_channel_factory_create_subchannel, {client_tcp_channel_factory_ref, client_tcp_channel_factory_unref,
client_channel_factory_create_channel}; client_tcp_channel_factory_create_subchannel,
client_tcp_channel_factory_create_channel};
/* Create a client channel: /* Create a client channel:
Asynchronously: - resolve target Asynchronously: - resolve target
...@@ -219,13 +222,115 @@ grpc_channel *grpc_insecure_channel_create(const char *target, ...@@ -219,13 +222,115 @@ grpc_channel *grpc_insecure_channel_create(const char *target,
(target, args, reserved)); (target, args, reserved));
GPR_ASSERT(!reserved); GPR_ASSERT(!reserved);
client_channel_factory *f = gpr_malloc(sizeof(*f)); client_tcp_channel_factory *f = gpr_malloc(sizeof(*f));
memset(f, 0, sizeof(*f)); memset(f, 0, sizeof(*f));
f->base.vtable = &client_channel_factory_vtable; f->base.vtable = &client_tcp_channel_factory_vtable;
gpr_ref_init(&f->refs, 1); gpr_ref_init(&f->refs, 1);
f->merge_args = grpc_channel_args_copy(args); f->merge_args = grpc_channel_args_copy(args);
grpc_channel *channel = client_channel_factory_create_channel( grpc_channel *channel = client_tcp_channel_factory_create_channel(
&exec_ctx, &f->base, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, NULL);
if (channel != NULL) {
f->master = channel;
GRPC_CHANNEL_INTERNAL_REF(f->master, "grpc_insecure_channel_create");
}
grpc_client_channel_factory_unref(&exec_ctx, &f->base);
grpc_exec_ctx_finish(&exec_ctx);
return channel; /* may be NULL */
}
typedef struct {
grpc_client_channel_factory base;
int fd;
gpr_refcount refs;
grpc_channel_args *merge_args;
grpc_channel *master;
} client_fd_channel_factory;
static void client_fd_channel_factory_ref(
grpc_client_channel_factory *cc_factory) {
client_fd_channel_factory *f = (client_fd_channel_factory *)cc_factory;
gpr_ref(&f->refs);
}
static void client_fd_channel_factory_unref(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory) {
client_fd_channel_factory *f = (client_fd_channel_factory *)cc_factory;
if (gpr_unref(&f->refs)) {
if (f->master != NULL) {
GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, f->master,
"client_fd_channel_factory");
}
grpc_channel_args_destroy(f->merge_args);
gpr_free(f);
}
}
static grpc_subchannel *client_fd_channel_factory_create_subchannel(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
grpc_subchannel_args *args) {
return NULL;
}
static grpc_channel *client_fd_channel_factory_create_channel(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
const char *target, grpc_client_channel_type type,
grpc_channel_args *args) {
client_fd_channel_factory *f = (client_fd_channel_factory *)cc_factory;
// FIXME(xyzzyz): hack to get the authority sent.
grpc_arg default_authority_arg;
default_authority_arg.type = GRPC_ARG_STRING;
default_authority_arg.key = GRPC_ARG_DEFAULT_AUTHORITY;
default_authority_arg.value.string = "test.authority";
grpc_channel_args *merged_args = grpc_channel_args_merge(args, f->merge_args);
grpc_channel_args *final_args = grpc_channel_args_copy_and_add(
merged_args, &default_authority_arg, 1);
grpc_channel_args_destroy(merged_args);
int flags = fcntl(f->fd, F_GETFL, 0);
GPR_ASSERT(fcntl(f->fd, F_SETFL, flags | O_NONBLOCK) == 0);
grpc_endpoint *client = grpc_tcp_create(
grpc_fd_create(f->fd, "client"), GRPC_TCP_DEFAULT_READ_SLICE_SIZE,
"fd-client");
grpc_transport *transport =
grpc_create_chttp2_transport(exec_ctx, final_args, client, 1);
GPR_ASSERT(transport);
grpc_channel *channel = grpc_channel_create(
exec_ctx, target, final_args, GRPC_CLIENT_DIRECT_CHANNEL, transport);
grpc_channel_args_destroy(final_args);
grpc_chttp2_transport_start_reading(exec_ctx, transport, NULL, 0);
return channel;
}
static const grpc_client_channel_factory_vtable
client_fd_channel_factory_vtable =
{client_fd_channel_factory_ref, client_fd_channel_factory_unref,
client_fd_channel_factory_create_subchannel,
client_fd_channel_factory_create_channel};
grpc_channel *grpc_insecure_channel_create_from_fd(
const char *target, int fd, const grpc_channel_args *args) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GRPC_API_TRACE(
"grpc_insecure_channel_create(target=%p, fd=%d, args=%p)", 3,
(target, fd, args));
client_fd_channel_factory *f = gpr_malloc(sizeof(*f));
memset(f, 0, sizeof(*f));
f->base.vtable = &client_fd_channel_factory_vtable;
gpr_ref_init(&f->refs, 1);
f->merge_args = grpc_channel_args_copy(args);
f->fd = fd;
grpc_channel *channel = client_fd_channel_factory_create_channel(
&exec_ctx, &f->base, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, NULL); &exec_ctx, &f->base, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, NULL);
if (channel != NULL) { if (channel != NULL) {
f->master = channel; f->master = channel;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment