Skip to content
Snippets Groups Projects
Commit e273b033 authored by Mark D. Roth's avatar Mark D. Roth
Browse files

Plumb server name down into the subchannel.

parent 77613b20
No related branches found
No related tags found
No related merge requests found
Showing with 32 additions and 16 deletions
...@@ -46,7 +46,9 @@ typedef struct http_connect_handshaker { ...@@ -46,7 +46,9 @@ typedef struct http_connect_handshaker {
// Base class. Must be first. // Base class. Must be first.
grpc_handshaker base; grpc_handshaker base;
// These pointers are borrowed, we don't own them.
char* proxy_server; char* proxy_server;
char* server_name;
// State saved while performing the handshake. // State saved while performing the handshake.
grpc_endpoint* endpoint; grpc_endpoint* endpoint;
...@@ -67,7 +69,6 @@ typedef struct http_connect_handshaker { ...@@ -67,7 +69,6 @@ typedef struct http_connect_handshaker {
static void on_write_done(grpc_exec_ctx* exec_ctx, void* arg, static void on_write_done(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) { grpc_error* error) {
http_connect_handshaker* h = arg; http_connect_handshaker* h = arg;
// Read HTTP CONNECT response.
grpc_endpoint_read(exec_ctx, h->endpoint, &h->response_buffer, grpc_endpoint_read(exec_ctx, h->endpoint, &h->response_buffer,
&h->response_read_closure); &h->response_read_closure);
} }
...@@ -114,7 +115,6 @@ static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg, ...@@ -114,7 +115,6 @@ static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg,
static void http_connect_handshaker_destroy(grpc_exec_ctx* exec_ctx, static void http_connect_handshaker_destroy(grpc_exec_ctx* exec_ctx,
grpc_handshaker* handshaker) { grpc_handshaker* handshaker) {
http_connect_handshaker* h = (http_connect_handshaker*)handshaker; http_connect_handshaker* h = (http_connect_handshaker*)handshaker;
gpr_free(h->proxy_server);
gpr_slice_buffer_destroy(&h->request_buffer); gpr_slice_buffer_destroy(&h->request_buffer);
gpr_slice_buffer_destroy(&h->response_buffer); gpr_slice_buffer_destroy(&h->response_buffer);
grpc_http_parser_destroy(&h->http_parser); grpc_http_parser_destroy(&h->http_parser);
...@@ -149,8 +149,7 @@ static void http_connect_handshaker_do_handshake( ...@@ -149,8 +149,7 @@ static void http_connect_handshaker_do_handshake(
memset(&request, 0, sizeof(request)); memset(&request, 0, sizeof(request));
request.host = gpr_strdup(h->proxy_server); request.host = gpr_strdup(h->proxy_server);
request.http.method = gpr_strdup("CONNECT"); request.http.method = gpr_strdup("CONNECT");
// FIXME: get server name from somewhere... request.http.path = gpr_strdup(h->server_name);
request.http.path = gpr_strdup("");
request.handshaker = &grpc_httpcli_plaintext; request.handshaker = &grpc_httpcli_plaintext;
gpr_slice request_slice = grpc_httpcli_format_connect_request(&request); gpr_slice request_slice = grpc_httpcli_format_connect_request(&request);
gpr_slice_buffer_add(&h->request_buffer, request_slice); gpr_slice_buffer_add(&h->request_buffer, request_slice);
...@@ -170,17 +169,19 @@ char* grpc_get_http_connect_proxy_server_from_args(grpc_channel_args* args) { ...@@ -170,17 +169,19 @@ char* grpc_get_http_connect_proxy_server_from_args(grpc_channel_args* args) {
GRPC_ARG_HTTP_CONNECT_PROXY_SERVER); GRPC_ARG_HTTP_CONNECT_PROXY_SERVER);
break; break;
} }
return gpr_strdup(args->args[i].value.string); return args->args[i].value.string;
} }
} }
return NULL; return NULL;
} }
grpc_handshaker* grpc_http_connect_handshaker_create(char* proxy_server) { grpc_handshaker* grpc_http_connect_handshaker_create(char* proxy_server,
char* server_name) {
http_connect_handshaker* handshaker = http_connect_handshaker* handshaker =
gpr_malloc(sizeof(http_connect_handshaker)); gpr_malloc(sizeof(http_connect_handshaker));
memset(handshaker, 0, sizeof(*handshaker)); memset(handshaker, 0, sizeof(*handshaker));
grpc_handshaker_init(&http_connect_handshaker_vtable, &handshaker->base); grpc_handshaker_init(&http_connect_handshaker_vtable, &handshaker->base);
handshaker->proxy_server = proxy_server; handshaker->proxy_server = proxy_server;
handshaker->server_name = server_name;
return (grpc_handshaker*)handshaker; return (grpc_handshaker*)handshaker;
} }
...@@ -36,10 +36,13 @@ ...@@ -36,10 +36,13 @@
#include "src/core/lib/channel/handshaker.h" #include "src/core/lib/channel/handshaker.h"
/// Caller takes ownership of returned string. /// Caller does NOT take ownership of returned string.
char* grpc_get_http_connect_proxy_server_from_args(grpc_channel_args *args); char* grpc_get_http_connect_proxy_server_from_args(grpc_channel_args *args);
/// Takes ownership of \a proxy_server. /// Borrows references to \a proxy_server or \a server_name.
grpc_handshaker* grpc_http_connect_handshaker_create(char* proxy_server); /// The caller must ensure that they remain alive until handshaking is
/// complete.
grpc_handshaker* grpc_http_connect_handshaker_create(char* proxy_server,
char* server_name);
#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_HTTP_CONNECT_HANDSHAKER_H */ #endif /* GRPC_CORE_EXT_CLIENT_CONFIG_HTTP_CONNECT_HANDSHAKER_H */
...@@ -50,6 +50,7 @@ struct grpc_lb_policy_factory { ...@@ -50,6 +50,7 @@ struct grpc_lb_policy_factory {
}; };
typedef struct grpc_lb_policy_args { typedef struct grpc_lb_policy_args {
char *server_name; // Does not own.
grpc_resolved_addresses *addresses; grpc_resolved_addresses *addresses;
grpc_client_channel_factory *client_channel_factory; grpc_client_channel_factory *client_channel_factory;
} grpc_lb_policy_args; } grpc_lb_policy_args;
......
...@@ -162,6 +162,8 @@ struct grpc_subchannel_args { ...@@ -162,6 +162,8 @@ struct grpc_subchannel_args {
size_t filter_count; size_t filter_count;
/** Channel arguments to be supplied to the newly created channel */ /** Channel arguments to be supplied to the newly created channel */
const grpc_channel_args *args; const grpc_channel_args *args;
/** Server name */
char *server_name; // Does not own.
/** Address to connect to */ /** Address to connect to */
struct sockaddr *addr; struct sockaddr *addr;
size_t addr_len; size_t addr_len;
......
...@@ -85,6 +85,7 @@ static grpc_subchannel_key *create_key( ...@@ -85,6 +85,7 @@ static grpc_subchannel_key *create_key(
} else { } else {
k->args.filters = NULL; k->args.filters = NULL;
} }
k->args.server_name = args->server_name;
k->args.addr_len = args->addr_len; k->args.addr_len = args->addr_len;
k->args.addr = gpr_malloc(args->addr_len); k->args.addr = gpr_malloc(args->addr_len);
if (k->args.addr_len > 0) { if (k->args.addr_len > 0) {
...@@ -111,6 +112,8 @@ static int subchannel_key_compare(grpc_subchannel_key *a, ...@@ -111,6 +112,8 @@ static int subchannel_key_compare(grpc_subchannel_key *a,
if (c != 0) return c; if (c != 0) return c;
c = GPR_ICMP(a->args.filter_count, b->args.filter_count); c = GPR_ICMP(a->args.filter_count, b->args.filter_count);
if (c != 0) return c; if (c != 0) return c;
c = strcmp(a->args.server_name, b->args.server_name);
if (c != 0) return c;
if (a->args.addr_len) { if (a->args.addr_len) {
c = memcmp(a->args.addr, b->args.addr, a->args.addr_len); c = memcmp(a->args.addr, b->args.addr, a->args.addr_len);
if (c != 0) return c; if (c != 0) return c;
...@@ -126,9 +129,9 @@ static int subchannel_key_compare(grpc_subchannel_key *a, ...@@ -126,9 +129,9 @@ static int subchannel_key_compare(grpc_subchannel_key *a,
void grpc_subchannel_key_destroy(grpc_exec_ctx *exec_ctx, void grpc_subchannel_key_destroy(grpc_exec_ctx *exec_ctx,
grpc_subchannel_key *k) { grpc_subchannel_key *k) {
grpc_connector_unref(exec_ctx, k->connector); grpc_connector_unref(exec_ctx, k->connector);
gpr_free(k->args.addr);
gpr_free((grpc_channel_args *)k->args.filters); gpr_free((grpc_channel_args *)k->args.filters);
grpc_channel_args_destroy((grpc_channel_args *)k->args.args); grpc_channel_args_destroy((grpc_channel_args *)k->args.args);
gpr_free(k->args.addr);
gpr_free(k); gpr_free(k);
} }
......
...@@ -455,6 +455,7 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx, ...@@ -455,6 +455,7 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
size_t subchannel_idx = 0; size_t subchannel_idx = 0;
for (size_t i = 0; i < args->addresses->naddrs; i++) { for (size_t i = 0; i < args->addresses->naddrs; i++) {
memset(&sc_args, 0, sizeof(grpc_subchannel_args)); memset(&sc_args, 0, sizeof(grpc_subchannel_args));
sc_args.server_name = args->server_name;
sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr); sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr);
sc_args.addr_len = (size_t)args->addresses->addrs[i].len; sc_args.addr_len = (size_t)args->addresses->addrs[i].len;
......
...@@ -582,6 +582,7 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, ...@@ -582,6 +582,7 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
size_t subchannel_idx = 0; size_t subchannel_idx = 0;
for (size_t i = 0; i < args->addresses->naddrs; i++) { for (size_t i = 0; i < args->addresses->naddrs; i++) {
memset(&sc_args, 0, sizeof(grpc_subchannel_args)); memset(&sc_args, 0, sizeof(grpc_subchannel_args));
sc_args.server_name = args->server_name;
sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr); sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr);
sc_args.addr_len = (size_t)args->addresses->addrs[i].len; sc_args.addr_len = (size_t)args->addresses->addrs[i].len;
......
...@@ -66,7 +66,7 @@ typedef struct { ...@@ -66,7 +66,7 @@ typedef struct {
/** mutex guarding the rest of the state */ /** mutex guarding the rest of the state */
gpr_mu mu; gpr_mu mu;
/** are we currently resolving? */ /** are we currently resolving? */
int resolving; bool resolving;
/** which version of resolved_config have we published? */ /** which version of resolved_config have we published? */
int published_version; int published_version;
/** which version of resolved_config is current? */ /** which version of resolved_config is current? */
...@@ -169,16 +169,17 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, ...@@ -169,16 +169,17 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg,
grpc_lb_policy *lb_policy; grpc_lb_policy *lb_policy;
gpr_mu_lock(&r->mu); gpr_mu_lock(&r->mu);
GPR_ASSERT(r->resolving); GPR_ASSERT(r->resolving);
r->resolving = 0; r->resolving = false;
grpc_resolved_addresses *addresses = r->addresses; grpc_resolved_addresses *addresses = r->addresses;
if (addresses != NULL) { if (addresses != NULL) {
grpc_lb_policy_args lb_policy_args; grpc_lb_policy_args lb_policy_args;
config = grpc_client_config_create();
memset(&lb_policy_args, 0, sizeof(lb_policy_args)); memset(&lb_policy_args, 0, sizeof(lb_policy_args));
lb_policy_args.server_name = r->name;
lb_policy_args.addresses = addresses; lb_policy_args.addresses = addresses;
lb_policy_args.client_channel_factory = r->client_channel_factory; lb_policy_args.client_channel_factory = r->client_channel_factory;
lb_policy = lb_policy =
grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args); grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args);
config = grpc_client_config_create();
if (lb_policy != NULL) { if (lb_policy != NULL) {
grpc_client_config_set_lb_policy(config, lb_policy); grpc_client_config_set_lb_policy(config, lb_policy);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "construction"); GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "construction");
...@@ -218,7 +219,7 @@ static void dns_start_resolving_locked(grpc_exec_ctx *exec_ctx, ...@@ -218,7 +219,7 @@ static void dns_start_resolving_locked(grpc_exec_ctx *exec_ctx,
dns_resolver *r) { dns_resolver *r) {
GRPC_RESOLVER_REF(&r->base, "dns-resolving"); GRPC_RESOLVER_REF(&r->base, "dns-resolving");
GPR_ASSERT(!r->resolving); GPR_ASSERT(!r->resolving);
r->resolving = 1; r->resolving = true;
r->addresses = NULL; r->addresses = NULL;
grpc_resolve_address(exec_ctx, r->name, r->default_port, grpc_resolve_address(exec_ctx, r->name, r->default_port,
grpc_closure_create(dns_on_resolved, r), &r->addresses); grpc_closure_create(dns_on_resolved, r), &r->addresses);
......
...@@ -125,6 +125,7 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, ...@@ -125,6 +125,7 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
grpc_client_config *cfg = grpc_client_config_create(); grpc_client_config *cfg = grpc_client_config_create();
grpc_lb_policy_args lb_policy_args; grpc_lb_policy_args lb_policy_args;
memset(&lb_policy_args, 0, sizeof(lb_policy_args)); memset(&lb_policy_args, 0, sizeof(lb_policy_args));
lb_policy_args.server_name = "";
lb_policy_args.addresses = r->addresses; lb_policy_args.addresses = r->addresses;
lb_policy_args.client_channel_factory = r->client_channel_factory; lb_policy_args.client_channel_factory = r->client_channel_factory;
grpc_lb_policy *lb_policy = grpc_lb_policy *lb_policy =
......
...@@ -191,7 +191,8 @@ static grpc_subchannel *client_channel_factory_create_subchannel( ...@@ -191,7 +191,8 @@ static grpc_subchannel *client_channel_factory_create_subchannel(
char *proxy_server = grpc_get_http_connect_proxy_server_from_args(final_args); char *proxy_server = grpc_get_http_connect_proxy_server_from_args(final_args);
if (proxy_server != NULL) { if (proxy_server != NULL) {
grpc_handshake_manager_add( grpc_handshake_manager_add(
grpc_http_connect_handshaker_create(proxy_server), c->handshake_mgr); grpc_http_connect_handshaker_create(proxy_server, args->server_name),
c->handshake_mgr);
} }
args->args = final_args; args->args = final_args;
s = grpc_subchannel_create(exec_ctx, &c->base, args); s = grpc_subchannel_create(exec_ctx, &c->base, args);
......
...@@ -260,7 +260,8 @@ static grpc_subchannel *client_channel_factory_create_subchannel( ...@@ -260,7 +260,8 @@ static grpc_subchannel *client_channel_factory_create_subchannel(
char *proxy_server = grpc_get_http_connect_proxy_server_from_args(final_args); char *proxy_server = grpc_get_http_connect_proxy_server_from_args(final_args);
if (proxy_server != NULL) { if (proxy_server != NULL) {
grpc_handshake_manager_add( grpc_handshake_manager_add(
grpc_http_connect_handshaker_create(proxy_server), c->handshake_mgr); grpc_http_connect_handshaker_create(proxy_server, args->server_name),
c->handshake_mgr);
} }
gpr_mu_init(&c->mu); gpr_mu_init(&c->mu);
gpr_ref_init(&c->refs, 1); gpr_ref_init(&c->refs, 1);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment