Skip to content
Snippets Groups Projects
Commit 5f84c847 authored by Craig Tiller's avatar Craig Tiller
Browse files

Connector progress

parent 0c5cf25d
No related branches found
No related tags found
No related merge requests found
......@@ -249,7 +249,6 @@ static void started_call(void *arg, int iomgr_success) {
static void picked_target(void *arg, int iomgr_success) {
call_data *calld = arg;
channel_data *chand = calld->elem->channel_data;
grpc_transport_stream_op op;
if (calld->picked_channel == NULL) {
......@@ -268,7 +267,9 @@ static void picked_target(void *arg, int iomgr_success) {
memset(&calld->waiting_op, 0, sizeof(calld->waiting_op));
gpr_mu_unlock(&calld->mu_state);
grpc_iomgr_closure_init(&calld->async_setup_task, started_call, calld);
grpc_subchannel_create_call(calld->picked_channel, chand->mdctx, &op, &calld->subchannel_call, &calld->async_setup_task);
grpc_subchannel_create_call(calld->picked_channel, &op,
&calld->subchannel_call,
&calld->async_setup_task);
}
}
}
......
......@@ -41,11 +41,11 @@ void grpc_connector_unref(grpc_connector *connector) {
connector->vtable->unref(connector);
}
void grpc_connector_connect(grpc_connector *connector,
const grpc_channel_args *channel_args,
grpc_mdctx *metadata_context,
grpc_transport **transport,
grpc_iomgr_closure *notify) {
connector->vtable->connect(connector, channel_args, metadata_context,
transport, notify);
void grpc_connector_connect(
grpc_connector *connector, grpc_pollset_set *pollset_set,
const struct sockaddr *addr, int addr_len, gpr_timespec deadline,
const grpc_channel_args *channel_args, grpc_mdctx *metadata_context,
grpc_transport **transport, grpc_iomgr_closure *notify) {
connector->vtable->connect(connector, pollset_set, addr, addr_len, deadline,
channel_args, metadata_context, transport, notify);
}
......@@ -34,6 +34,7 @@
#ifndef GRPC_INTERNAL_CORE_CLIENT_CONFIG_CONNECTOR_H
#define GRPC_INTERNAL_CORE_CLIENT_CONFIG_CONNECTOR_H
#include "src/core/iomgr/sockaddr.h"
#include "src/core/transport/transport.h"
typedef struct grpc_connector grpc_connector;
......@@ -46,18 +47,19 @@ struct grpc_connector {
struct grpc_connector_vtable {
void (*ref)(grpc_connector *connector);
void (*unref)(grpc_connector *connector);
void (*connect)(grpc_connector *connector,
const grpc_channel_args *channel_args,
void (*connect)(grpc_connector *connector, grpc_pollset_set *pollset_set,
const struct sockaddr *addr, int addr_len,
gpr_timespec deadline, const grpc_channel_args *channel_args,
grpc_mdctx *metadata_context, grpc_transport **transport,
grpc_iomgr_closure *notify);
};
void grpc_connector_ref(grpc_connector *connector);
void grpc_connector_unref(grpc_connector *connector);
void grpc_connector_connect(grpc_connector *connector,
const grpc_channel_args *channel_args,
grpc_mdctx *metadata_context,
grpc_transport **transport,
grpc_iomgr_closure *notify);
void grpc_connector_connect(
grpc_connector *connector, grpc_pollset_set *pollset_set,
const struct sockaddr *addr, int addr_len, gpr_timespec deadline,
const grpc_channel_args *channel_args, grpc_mdctx *metadata_context,
grpc_transport **transport, grpc_iomgr_closure *notify);
#endif
......@@ -44,6 +44,19 @@ typedef struct {
grpc_subchannel *subchannel;
} connection;
typedef struct waiting_for_connect {
struct waiting_for_connect *next;
grpc_iomgr_closure *notify;
grpc_transport_stream_op *initial_op;
grpc_subchannel_call **target;
} waiting_for_connect;
typedef struct connectivity_state_watcher {
struct connectivity_state_watcher *next;
grpc_iomgr_closure *notify;
grpc_connectivity_state *current;
} connectivity_state_watcher;
struct grpc_subchannel {
gpr_refcount refs;
grpc_connector *connector;
......@@ -56,6 +69,8 @@ struct grpc_subchannel {
/** address to connect to */
struct sockaddr *addr;
size_t addr_len;
/** metadata context */
grpc_mdctx *mdctx;
/** set during connection */
grpc_transport *connecting_transport;
......@@ -63,6 +78,10 @@ struct grpc_subchannel {
/** callback for connection finishing */
grpc_iomgr_closure connected;
/** pollset_set tracking who's interested in a connection
being setup */
grpc_pollset_set pollset_set;
/** mutex protecting remaining elements */
gpr_mu mu;
......@@ -70,8 +89,10 @@ struct grpc_subchannel {
connection *active;
/** are we connecting */
int connecting;
/** closures waiting for a connection */
grpc_iomgr_closure *waiting;
/** things waiting for a connection */
waiting_for_connect *waiting;
/** things watching the connectivity state */
connectivity_state_watcher *watchers;
};
struct grpc_subchannel_call {
......@@ -82,6 +103,9 @@ struct grpc_subchannel_call {
#define SUBCHANNEL_CALL_TO_CALL_STACK(call) (((grpc_call_stack *)(call)) + 1)
static grpc_subchannel_call *create_call(connection *con, grpc_transport_stream_op *initial_op);
static void connectivity_state_changed_locked(grpc_subchannel *c);
static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c);
static gpr_timespec compute_connect_deadline(grpc_subchannel *c);
/*
* grpc_subchannel implementation
......@@ -94,10 +118,21 @@ void grpc_subchannel_unref(grpc_subchannel *c) {
gpr_free(c->filters);
grpc_channel_args_destroy(c->args);
gpr_free(c->addr);
grpc_mdctx_unref(c->mdctx);
gpr_free(c);
}
}
void grpc_subchannel_add_interested_party(grpc_subchannel *c,
grpc_pollset *pollset) {
grpc_pollset_set_add_pollset(&c->pollset_set, pollset);
}
void grpc_subchannel_del_interested_party(grpc_subchannel *c,
grpc_pollset *pollset) {
grpc_pollset_set_del_pollset(&c->pollset_set, pollset);
}
grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
grpc_subchannel_args *args) {
grpc_subchannel *c = gpr_malloc(sizeof(*c));
......@@ -113,12 +148,13 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
memcpy(c->addr, args->addr, args->addr_len);
c->addr_len = args->addr_len;
c->args = grpc_channel_args_copy(args->args);
c->mdctx = args->mdctx;
grpc_mdctx_ref(c->mdctx);
gpr_mu_init(&c->mu);
return c;
}
void grpc_subchannel_create_call(grpc_subchannel *c,
grpc_mdctx *mdctx,
grpc_transport_stream_op *initial_op,
grpc_subchannel_call **target,
grpc_iomgr_closure *notify) {
......@@ -132,19 +168,101 @@ void grpc_subchannel_create_call(grpc_subchannel *c,
*target = create_call(con, initial_op);
notify->cb(notify->cb_arg, 1);
} else {
notify->next = c->waiting;
c->waiting = notify;
waiting_for_connect *w4c = gpr_malloc(sizeof(*w4c));
w4c->next = c->waiting;
w4c->notify = notify;
w4c->initial_op = initial_op;
w4c->target = target;
c->waiting = w4c;
grpc_subchannel_add_interested_party(c, initial_op->bind_pollset);
if (!c->connecting) {
c->connecting = 1;
connectivity_state_changed_locked(c);
gpr_mu_unlock(&c->mu);
grpc_connector_connect(c->connector, c->args, mdctx, &c->connecting_transport, &c->connected);
grpc_connector_connect(c->connector, &c->pollset_set, c->addr,
c->addr_len, compute_connect_deadline(c), c->args,
c->mdctx, &c->connecting_transport, &c->connected);
} else {
gpr_mu_unlock(&c->mu);
}
}
}
grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) {
grpc_connectivity_state state;
gpr_mu_lock(&c->mu);
state = compute_connectivity_locked(c);
gpr_mu_unlock(&c->mu);
return state;
}
void grpc_subchannel_notify_on_state_change(grpc_subchannel *c,
grpc_connectivity_state *state,
grpc_iomgr_closure *notify) {
grpc_connectivity_state current;
int do_connect = 0;
connectivity_state_watcher *w = gpr_malloc(sizeof(*w));
w->current = state;
w->notify = notify;
gpr_mu_lock(&c->mu);
current = compute_connectivity_locked(c);
if (current == GRPC_CHANNEL_IDLE) {
current = GRPC_CHANNEL_CONNECTING;
c->connecting = 1;
do_connect = 1;
connectivity_state_changed_locked(c);
}
if (current != *state) {
gpr_mu_unlock(&c->mu);
*state = current;
grpc_iomgr_add_callback(notify);
gpr_free(w);
} else {
w->next = c->watchers;
c->watchers = w;
gpr_mu_unlock(&c->mu);
}
if (do_connect) {
grpc_connector_connect(c->connector, &c->pollset_set, c->addr, c->addr_len,
compute_connect_deadline(c), c->args, c->mdctx,
&c->connecting_transport, &c->connected);
}
}
static gpr_timespec compute_connect_deadline(grpc_subchannel *c) {
return gpr_time_add(gpr_now(), gpr_time_from_seconds(60));
}
static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c) {
if (c->connecting) {
return GRPC_CHANNEL_CONNECTING;
}
if (c->active) {
return GRPC_CHANNEL_READY;
}
return GRPC_CHANNEL_IDLE;
}
static void connectivity_state_changed_locked(grpc_subchannel *c) {
grpc_connectivity_state current = compute_connectivity_locked(c);
connectivity_state_watcher *new = NULL;
connectivity_state_watcher *w;
while ((w = c->watchers)) {
c->watchers = w->next;
if (current != *w->current) {
*w->current = current;
grpc_iomgr_add_callback(w->notify);
gpr_free(w);
} else {
w->next = new;
new = w;
}
}
c->watchers = new;
}
/*
* grpc_subchannel_call implementation
*/
......
......@@ -64,7 +64,6 @@ void grpc_subchannel_del_interested_party(grpc_subchannel *channel, grpc_pollset
/** construct a call (possibly asynchronously) */
void grpc_subchannel_create_call(grpc_subchannel *subchannel,
grpc_mdctx *mdctx,
grpc_transport_stream_op *initial_op,
grpc_subchannel_call **target,
grpc_iomgr_closure *notify);
......@@ -84,6 +83,8 @@ struct grpc_subchannel_args {
/** Address to connect to */
struct sockaddr *addr;
size_t addr_len;
/** metadata context to use */
grpc_mdctx *mdctx;
};
/** create a subchannel given a connector */
......
......@@ -41,11 +41,18 @@
#include "src/core/channel/channel_args.h"
#include "src/core/channel/client_channel.h"
#include "src/core/client_config/resolver_registry.h"
#include "src/core/iomgr/tcp_client.h"
#include "src/core/surface/channel.h"
#include "src/core/transport/chttp2_transport.h"
typedef struct {
grpc_connector base;
gpr_refcount refs;
grpc_transport **transport;
grpc_iomgr_closure *notify;
const grpc_channel_args *args;
grpc_mdctx *mdctx;
} connector;
static void connector_ref(grpc_connector *con) {
......@@ -60,28 +67,68 @@ static void connector_unref(grpc_connector *con) {
}
}
static void connector_connect(grpc_connector *connector, const grpc_channel_args *channel_args, grpc_mdctx *metadata_context, grpc_transport **transport, grpc_iomgr_closure *notify) {
abort();
static void connected(void *arg, grpc_endpoint *tcp) {
connector *c = arg;
grpc_iomgr_closure *notify;
if (tcp != NULL) {
*c->transport =
grpc_create_chttp2_transport(c->args, tcp, NULL, 0, c->mdctx, 1);
} else {
*c->transport = NULL;
}
notify = c->notify;
c->notify = NULL;
grpc_iomgr_add_callback(notify);
}
static void connector_connect(
grpc_connector *con, grpc_pollset_set *pollset_set,
const struct sockaddr *addr, int addr_len, gpr_timespec deadline,
const grpc_channel_args *channel_args, grpc_mdctx *metadata_context,
grpc_transport **transport, grpc_iomgr_closure *notify) {
connector *c = (connector *)con;
GPR_ASSERT(c->notify == NULL);
c->notify = notify;
c->args = channel_args;
c->mdctx = metadata_context;
grpc_tcp_client_connect(connected, c, pollset_set, addr, addr_len, deadline);
}
static const grpc_connector_vtable connector_vtable = {connector_ref, connector_unref, connector_connect};
static void subchannel_factory_ref(grpc_subchannel_factory *scf) {}
typedef struct {
grpc_subchannel_factory base;
gpr_refcount refs;
grpc_mdctx *mdctx;
} subchannel_factory;
static void subchannel_factory_ref(grpc_subchannel_factory *scf) {
subchannel_factory *f = (subchannel_factory *)scf;
gpr_ref(&f->refs);
}
static void subchannel_factory_unref(grpc_subchannel_factory *scf) {}
static void subchannel_factory_unref(grpc_subchannel_factory *scf) {
subchannel_factory *f = (subchannel_factory *)scf;
if (gpr_unref(&f->refs)) {
grpc_mdctx_unref(f->mdctx);
gpr_free(f);
}
}
static grpc_subchannel *subchannel_factory_create_subchannel(grpc_subchannel_factory *scf, grpc_subchannel_args *args) {
subchannel_factory *f = (subchannel_factory *)scf;
connector *c = gpr_malloc(sizeof(*c));
grpc_subchannel *s;
memset(c, 0, sizeof(*c));
c->base.vtable = &connector_vtable;
gpr_ref_init(&c->refs, 1);
args->mdctx = f->mdctx;
s = grpc_subchannel_create(&c->base, args);
grpc_connector_unref(&c->base);
return s;
}
static const grpc_subchannel_factory_vtable subchannel_factory_vtable = {subchannel_factory_ref, subchannel_factory_unref, subchannel_factory_create_subchannel};
static grpc_subchannel_factory subchannel_factory = {&subchannel_factory_vtable};
/* Create a client channel:
Asynchronously: - resolve target
......@@ -93,6 +140,8 @@ grpc_channel *grpc_channel_create(const char *target,
#define MAX_FILTERS 3
const grpc_channel_filter *filters[MAX_FILTERS];
grpc_resolver *resolver;
subchannel_factory *f;
grpc_mdctx *mdctx = grpc_mdctx_create();
int n = 0;
/* TODO(census)
if (grpc_channel_args_is_census_enabled(args)) {
......@@ -101,12 +150,16 @@ grpc_channel *grpc_channel_create(const char *target,
filters[n++] = &grpc_client_channel_filter;
GPR_ASSERT(n <= MAX_FILTERS);
resolver = grpc_resolver_create(target, &subchannel_factory);
f = gpr_malloc(sizeof(*f));
f->base.vtable = &subchannel_factory_vtable;
gpr_ref_init(&f->refs, 1);
f->mdctx = mdctx;
resolver = grpc_resolver_create(target, &f->base);
if (!resolver) {
return NULL;
}
channel = grpc_channel_create_from_filters(filters, n, args, grpc_mdctx_create(), 1);
channel = grpc_channel_create_from_filters(filters, n, args, mdctx, 1);
grpc_client_channel_set_resolver(grpc_channel_get_channel_stack(channel), resolver);
return channel;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment