Skip to content
Snippets Groups Projects
Commit 12ad5d6c authored by Craig Tiller's avatar Craig Tiller
Browse files

Strong/weak refcounting for subchannel

parent 81b64e3c
No related branches found
No related tags found
No related merge requests found
...@@ -47,6 +47,9 @@ ...@@ -47,6 +47,9 @@
#include "src/core/transport/connectivity_state.h" #include "src/core/transport/connectivity_state.h"
#include "src/core/transport/connectivity_state.h" #include "src/core/transport/connectivity_state.h"
#define INTERNAL_REF_BITS 16
#define STRONG_REF_MASK (~(gpr_atm)((1 << INTERNAL_REF_BITS) - 1))
#define GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS 20 #define GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS 20
#define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1 #define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1
#define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER 1.6 #define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER 1.6
...@@ -69,8 +72,12 @@ typedef struct { ...@@ -69,8 +72,12 @@ typedef struct {
struct grpc_subchannel { struct grpc_subchannel {
grpc_connector *connector; grpc_connector *connector;
/** refcount */ /** refcount
gpr_refcount refs; - lower INTERNAL_REF_BITS bits are for internal references:
these do not keep the subchannel open.
- upper remaining bits are for public references: these do
keep the subchannel open */
gpr_atm ref_pair;
/** non-transport related channel filters */ /** non-transport related channel filters */
const grpc_channel_filter **filters; const grpc_channel_filter **filters;
...@@ -133,16 +140,6 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel, ...@@ -133,16 +140,6 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel,
int iomgr_success); int iomgr_success);
#ifdef GRPC_STREAM_REFCOUNT_DEBUG #ifdef GRPC_STREAM_REFCOUNT_DEBUG
#define SUBCHANNEL_REF_LOCKED(p, r) \
subchannel_ref_locked((p), __FILE__, __LINE__, (r))
#define SUBCHANNEL_UNREF_LOCKED(p, r) \
subchannel_unref_locked((p), __FILE__, __LINE__, (r))
#define CONNECTION_REF_LOCKED(p, r) \
connection_ref_locked((p), __FILE__, __LINE__, (r))
#define CONNECTION_UNREF_LOCKED(cl, p, r) \
connection_unref_locked((cl), (p), __FILE__, __LINE__, (r))
#define REF_PASS_ARGS , file, line, reason
#define REF_PASS_REASON , reason
#define REF_REASON reason #define REF_REASON reason
#define REF_LOG(name, p) \ #define REF_LOG(name, p) \
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p ref %d -> %d %s", \ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p ref %d -> %d %s", \
...@@ -150,13 +147,9 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel, ...@@ -150,13 +147,9 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel,
#define UNREF_LOG(name, p) \ #define UNREF_LOG(name, p) \
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p unref %d -> %d %s", \ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p unref %d -> %d %s", \
(name), (p), (p)->refs.count, (p)->refs.count - 1, reason) (name), (p), (p)->refs.count, (p)->refs.count - 1, reason)
#define REF_MUTATE_EXTRA_ARGS GRPC_SUBCHANNEL_REF_EXTRA_ARGS, const char *purpose
#define REF_MUTATE_PURPOSE(x) , file, line, reason, x
#else #else
#define SUBCHANNEL_REF_LOCKED(p, r) subchannel_ref_locked((p))
#define SUBCHANNEL_UNREF_LOCKED(p, r) subchannel_unref_locked((p))
#define CONNECTION_REF_LOCKED(p, r) connection_ref_locked((p))
#define CONNECTION_UNREF_LOCKED(cl, p, r) connection_unref_locked((cl), (p))
#define REF_PASS_ARGS
#define REF_PASS_REASON
#define REF_REASON "" #define REF_REASON ""
#define REF_LOG(name, p) \ #define REF_LOG(name, p) \
do { \ do { \
...@@ -164,6 +157,8 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel, ...@@ -164,6 +157,8 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel,
#define UNREF_LOG(name, p) \ #define UNREF_LOG(name, p) \
do { \ do { \
} while (0) } while (0)
#define REF_MUTATE_EXTRA_ARGS
#define REF_MUTATE_PURPOSE(x)
#endif #endif
/* /*
...@@ -196,10 +191,6 @@ void grpc_connected_subchannel_unref(grpc_exec_ctx *exec_ctx, ...@@ -196,10 +191,6 @@ void grpc_connected_subchannel_unref(grpc_exec_ctx *exec_ctx,
static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg, static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg,
int success) { int success) {
grpc_subchannel *c = arg; grpc_subchannel *c = arg;
grpc_connected_subchannel *con = GET_CONNECTED_SUBCHANNEL(c, no_barrier);
if (con != NULL) {
GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, con, "connection");
}
gpr_free((void *)c->filters); gpr_free((void *)c->filters);
grpc_channel_args_destroy(c->args); grpc_channel_args_destroy(c->args);
gpr_free(c->addr); gpr_free(c->addr);
...@@ -210,15 +201,54 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg, ...@@ -210,15 +201,54 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg,
gpr_free(c); gpr_free(c);
} }
gpr_atm ref_mutate(grpc_subchannel *c, gpr_atm delta, int barrier REF_MUTATE_EXTRA_ARGS) {
gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&c->ref_pair, delta) : gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta);
#ifdef GRPC_STREAM_REFCOUNT_DEBUG
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SUBCHANNEL: %p % 12s 0x%08x -> 0x%08x [%s]", c, purpose, old_val, old_val + delta, reason);
#endif
return old_val;
}
void grpc_subchannel_ref(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { void grpc_subchannel_ref(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
REF_LOG("SUBCHANNEL", c); gpr_atm old_refs;
gpr_ref(&c->refs); old_refs = ref_mutate(c, (1 << INTERNAL_REF_BITS), 0 REF_MUTATE_PURPOSE("STRONG_REF"));
GPR_ASSERT((old_refs & STRONG_REF_MASK) != 0);
}
void grpc_subchannel_weak_ref(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
gpr_atm old_refs;
old_refs = ref_mutate(c, 1, 0 REF_MUTATE_PURPOSE("WEAK_REF"));
GPR_ASSERT(old_refs != 0);
}
static void disconnect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
grpc_connected_subchannel *con;
gpr_mu_lock(&c->mu);
GPR_ASSERT(!c->disconnected);
c->disconnected = 1;
con = GET_CONNECTED_SUBCHANNEL(c, no_barrier);
if (con != NULL) {
GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, con, "connection");
gpr_atm_no_barrier_store(&c->connected_subchannel, 0xdeadbeef);
}
gpr_mu_unlock(&c->mu);
} }
void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx, void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx,
grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
UNREF_LOG("SUBCHANNEL", c); gpr_atm old_refs;
if (gpr_unref(&c->refs)) { old_refs = ref_mutate(c, (gpr_atm)1 - (gpr_atm)(1 << INTERNAL_REF_BITS), 1 REF_MUTATE_PURPOSE("STRONG_UNREF"));
if ((old_refs & STRONG_REF_MASK) == 0) {
disconnect(exec_ctx, c);
}
GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "strong-unref");
}
void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx,
grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
gpr_atm old_refs;
old_refs = ref_mutate(c, -(gpr_atm)1, 1 REF_MUTATE_PURPOSE("WEAK_UNREF"));
if (old_refs == 0) {
grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(subchannel_destroy, c), grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(subchannel_destroy, c),
1); 1);
} }
...@@ -232,7 +262,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, ...@@ -232,7 +262,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
grpc_subchannel_args *args) { grpc_subchannel_args *args) {
grpc_subchannel *c = gpr_malloc(sizeof(*c)); grpc_subchannel *c = gpr_malloc(sizeof(*c));
memset(c, 0, sizeof(*c)); memset(c, 0, sizeof(*c));
gpr_ref_init(&c->refs, 1); gpr_atm_no_barrier_store(&c->ref_pair, 1 << INTERNAL_REF_BITS);
c->connector = connector; c->connector = connector;
grpc_connector_ref(c->connector); grpc_connector_ref(c->connector);
c->num_filters = args->filter_count; c->num_filters = args->filter_count;
...@@ -297,7 +327,7 @@ static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg, i ...@@ -297,7 +327,7 @@ static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg, i
external_state_watcher *w = arg; external_state_watcher *w = arg;
grpc_closure *follow_up = w->notify; grpc_closure *follow_up = w->notify;
grpc_pollset_set_del_pollset_set(exec_ctx, &w->subchannel->pollset_set, w->pollset_set); grpc_pollset_set_del_pollset_set(exec_ctx, &w->subchannel->pollset_set, w->pollset_set);
GRPC_SUBCHANNEL_UNREF(exec_ctx, w->subchannel, "external_state_watcher"); GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, w->subchannel, "external_state_watcher");
gpr_free(w); gpr_free(w);
follow_up->cb(exec_ctx, follow_up->cb_arg, success); follow_up->cb(exec_ctx, follow_up->cb_arg, success);
} }
...@@ -314,7 +344,7 @@ void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx, ...@@ -314,7 +344,7 @@ void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx,
w->notify = notify; w->notify = notify;
grpc_closure_init(&w->closure, on_external_state_watcher_done, w); grpc_closure_init(&w->closure, on_external_state_watcher_done, w);
grpc_pollset_set_add_pollset_set(exec_ctx, &c->pollset_set, interested_parties); grpc_pollset_set_add_pollset_set(exec_ctx, &c->pollset_set, interested_parties);
GRPC_SUBCHANNEL_REF(c, "external_state_watcher"); GRPC_SUBCHANNEL_WEAK_REF(c, "external_state_watcher");
gpr_mu_lock(&c->mu); gpr_mu_lock(&c->mu);
if (grpc_connectivity_state_notify_on_state_change( if (grpc_connectivity_state_notify_on_state_change(
exec_ctx, &c->state_tracker, state, &w->closure)) { exec_ctx, &c->state_tracker, state, &w->closure)) {
...@@ -401,13 +431,13 @@ static void subchannel_on_child_state_changed(grpc_exec_ctx *exec_ctx, void *p, ...@@ -401,13 +431,13 @@ static void subchannel_on_child_state_changed(grpc_exec_ctx *exec_ctx, void *p,
grpc_connected_subchannel_notify_on_state_change( grpc_connected_subchannel_notify_on_state_change(
exec_ctx, GET_CONNECTED_SUBCHANNEL(c, no_barrier), exec_ctx, GET_CONNECTED_SUBCHANNEL(c, no_barrier),
&sw->connectivity_state, &sw->closure); &sw->connectivity_state, &sw->closure);
GRPC_SUBCHANNEL_REF(c, "state_watcher"); GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher");
sw = NULL; sw = NULL;
} }
} }
gpr_mu_unlock(mu); gpr_mu_unlock(mu);
GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "state_watcher"); GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "state_watcher");
gpr_free(sw); gpr_free(sw);
} }
...@@ -488,7 +518,7 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { ...@@ -488,7 +518,7 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
/* setup subchannel watching connected subchannel for changes; subchannel ref /* setup subchannel watching connected subchannel for changes; subchannel ref
for connecting is donated for connecting is donated
to the state watcher */ to the state watcher */
GRPC_SUBCHANNEL_REF(c, "state_watcher"); GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher");
GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting"); GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting");
grpc_connected_subchannel_notify_on_state_change( grpc_connected_subchannel_notify_on_state_change(
exec_ctx, con, &sw_subchannel->connectivity_state, exec_ctx, con, &sw_subchannel->connectivity_state,
......
...@@ -50,6 +50,10 @@ typedef struct grpc_subchannel_args grpc_subchannel_args; ...@@ -50,6 +50,10 @@ typedef struct grpc_subchannel_args grpc_subchannel_args;
grpc_subchannel_ref((p), __FILE__, __LINE__, (r)) grpc_subchannel_ref((p), __FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_UNREF(cl, p, r) \ #define GRPC_SUBCHANNEL_UNREF(cl, p, r) \
grpc_subchannel_unref((cl), (p), __FILE__, __LINE__, (r)) grpc_subchannel_unref((cl), (p), __FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_WEAK_REF(p, r) \
grpc_subchannel_weak_ref((p), __FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_WEAK_UNREF(cl, p, r) \
grpc_subchannel_weak_unref((cl), (p), __FILE__, __LINE__, (r))
#define GRPC_CONNECTED_SUBCHANNEL_REF(p, r) \ #define GRPC_CONNECTED_SUBCHANNEL_REF(p, r) \
grpc_connected_subchannel_ref((p), __FILE__, __LINE__, (r)) grpc_connected_subchannel_ref((p), __FILE__, __LINE__, (r))
#define GRPC_CONNECTED_SUBCHANNEL_UNREF(cl, p, r) \ #define GRPC_CONNECTED_SUBCHANNEL_UNREF(cl, p, r) \
...@@ -63,6 +67,8 @@ typedef struct grpc_subchannel_args grpc_subchannel_args; ...@@ -63,6 +67,8 @@ typedef struct grpc_subchannel_args grpc_subchannel_args;
#else #else
#define GRPC_SUBCHANNEL_REF(p, r) grpc_subchannel_ref((p)) #define GRPC_SUBCHANNEL_REF(p, r) grpc_subchannel_ref((p))
#define GRPC_SUBCHANNEL_UNREF(cl, p, r) grpc_subchannel_unref((cl), (p)) #define GRPC_SUBCHANNEL_UNREF(cl, p, r) grpc_subchannel_unref((cl), (p))
#define GRPC_SUBCHANNEL_WEAK_REF(p, r) grpc_subchannel_weak_ref((p))
#define GRPC_SUBCHANNEL_WEAK_UNREF(cl, p, r) grpc_subchannel_weak_unref((cl), (p))
#define GRPC_CONNECTED_SUBCHANNEL_REF(p, r) grpc_connected_subchannel_ref((p)) #define GRPC_CONNECTED_SUBCHANNEL_REF(p, r) grpc_connected_subchannel_ref((p))
#define GRPC_CONNECTED_SUBCHANNEL_UNREF(cl, p, r) \ #define GRPC_CONNECTED_SUBCHANNEL_UNREF(cl, p, r) \
grpc_connected_subchannel_unref((cl), (p)) grpc_connected_subchannel_unref((cl), (p))
...@@ -77,6 +83,11 @@ void grpc_subchannel_ref(grpc_subchannel *channel ...@@ -77,6 +83,11 @@ void grpc_subchannel_ref(grpc_subchannel *channel
void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx, void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx,
grpc_subchannel *channel grpc_subchannel *channel
GRPC_SUBCHANNEL_REF_EXTRA_ARGS); GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_subchannel_weak_ref(grpc_subchannel *channel
GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx,
grpc_subchannel *channel
GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_connected_subchannel_ref(grpc_connected_subchannel *channel void grpc_connected_subchannel_ref(grpc_connected_subchannel *channel
GRPC_SUBCHANNEL_REF_EXTRA_ARGS); GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_connected_subchannel_unref(grpc_exec_ctx *exec_ctx, void grpc_connected_subchannel_unref(grpc_exec_ctx *exec_ctx,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment