diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c index 7f7fbf420fb32c8003693e8374038482079dc66d..53433a09230fe978beee55701e74b10a29670f3a 100644 --- a/src/core/channel/channel_stack.c +++ b/src/core/channel/channel_stack.c @@ -101,9 +101,10 @@ grpc_call_element *grpc_call_stack_element(grpc_call_stack *call_stack, return CALL_ELEMS_FROM_STACK(call_stack) + index; } -void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, +void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx,int initial_refs, + grpc_iomgr_cb_func destroy, void *destroy_arg, const grpc_channel_filter **filters, - size_t filter_count, grpc_channel *master, + size_t filter_count, const grpc_channel_args *channel_args, grpc_channel_stack *stack) { size_t call_size = @@ -115,6 +116,8 @@ void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, size_t i; stack->count = filter_count; + gpr_ref_init(&stack->refcount.refs, initial_refs); + grpc_closure_init(&stack->refcount.destroy, destroy, destroy_arg); elems = CHANNEL_ELEMS_FROM_STACK(stack); user_data = ((char *)elems) + @@ -122,7 +125,7 @@ void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, /* init per-filter data */ for (i = 0; i < filter_count; i++) { - args.master = master; + args.channel_stack = stack; args.channel_args = channel_args; args.is_first = i == 0; args.is_last = i == (filter_count - 1); @@ -174,7 +177,7 @@ void grpc_call_stack_init(grpc_exec_ctx *exec_ctx, /* init per-filter data */ for (i = 0; i < count; i++) { - args.refcount = &call_stack->refcount; + args.call_stack = call_stack; args.server_transport_data = transport_server_data; args.context = context; call_elems[i].filter = channel_elems[i].filter; diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h index 1db12ead7ec3558c8821ed9391e29757341bfcff..593adcd7b56cd8dd32965ce33d9e19d07021c6f6 100644 --- a/src/core/channel/channel_stack.h +++ b/src/core/channel/channel_stack.h @@ -51,15 +51,18 @@ typedef struct grpc_channel_element grpc_channel_element; typedef struct grpc_call_element grpc_call_element; +typedef struct grpc_channel_stack grpc_channel_stack; +typedef struct grpc_call_stack grpc_call_stack; + typedef struct { - grpc_channel *master; + grpc_channel_stack *channel_stack; const grpc_channel_args *channel_args; int is_first; int is_last; } grpc_channel_element_args; typedef struct { - grpc_stream_refcount *refcount; + grpc_call_stack *call_stack; const void *server_transport_data; grpc_call_context_element *context; } grpc_call_element_args; @@ -144,23 +147,24 @@ struct grpc_call_element { /* A channel stack tracks a set of related filters for one channel, and guarantees they live within a single malloc() allocation */ -typedef struct { +struct grpc_channel_stack { + grpc_stream_refcount refcount; size_t count; /* Memory required for a call stack (computed at channel stack initialization) */ size_t call_stack_size; -} grpc_channel_stack; +}; /* A call stack tracks a set of related filters for one call, and guarantees they live within a single malloc() allocation */ -typedef struct { +struct grpc_call_stack { /* shared refcount for this channel stack. MUST be the first element: the underlying code calls destroy with the address of the refcount, but higher layers prefer to think about the address of the call stack itself. */ grpc_stream_refcount refcount; size_t count; -} grpc_call_stack; +}; /* Get a channel element given a channel stack and its index */ grpc_channel_element *grpc_channel_stack_element(grpc_channel_stack *stack, @@ -175,9 +179,10 @@ grpc_call_element *grpc_call_stack_element(grpc_call_stack *stack, size_t i); size_t grpc_channel_stack_size(const grpc_channel_filter **filters, size_t filter_count); /* Initialize a channel stack given some filters */ -void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, +void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx,int initial_refs, + grpc_iomgr_cb_func destroy, void *destroy_arg, const grpc_channel_filter **filters, - size_t filter_count, grpc_channel *master, + size_t filter_count, const grpc_channel_args *args, grpc_channel_stack *stack); /* Destroy a channel stack */ @@ -199,14 +204,21 @@ void grpc_call_stack_set_pollset(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset); #ifdef GRPC_STREAM_REFCOUNT_DEBUG -#define grpc_call_stack_ref(call_stack, reason) \ +#define GRPC_CALL_STACK_REF(call_stack, reason) \ grpc_stream_ref(&(call_stack)->refcount, reason) -#define grpc_call_stack_unref(exec_ctx, call_stack, reason) \ +#define GRPC_CALL_STACK_UNREF(exec_ctx, call_stack, reason) \ grpc_stream_unref(exec_ctx, &(call_stack)->refcount, reason) +#define GRPC_CHANNEL_STACK_REF(channel_stack, reason) \ + grpc_stream_ref(&(channel_stack)->refcount, reason) +#define GRPC_CHANNEL_STACK_UNREF(exec_ctx, channel_stack, reason) \ + grpc_stream_unref(exec_ctx, &(channel_stack)->refcount, reason) #else -#define grpc_call_stack_ref(call_stack) grpc_stream_ref(&(call_stack)->refcount) -#define grpc_call_stack_unref(exec_ctx, call_stack) \ +#define GRPC_CALL_STACK_REF(call_stack, reason) grpc_stream_ref(&(call_stack)->refcount) +#define GRPC_CALL_STACK_UNREF(exec_ctx, call_stack, reason) \ grpc_stream_unref(exec_ctx, &(call_stack)->refcount) +#define GRPC_CHANNEL_STACK_REF(channel_stack, reason) grpc_stream_ref(&(channel_stack)->refcount) +#define GRPC_CHANNEL_STACK_UNREF(exec_ctx, channel_stack, reason) \ + grpc_stream_unref(exec_ctx, &(channel_stack)->refcount) #endif /* Destroy a call stack */ diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index f026d32265653be1699dbdbf6d033261f978b5e9..fe43c50018ec0ee2e2c7dd1e1b9c2f73f9afa8ea 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -59,11 +59,6 @@ typedef struct client_channel_channel_data { grpc_resolver *resolver; /** have we started resolving this channel */ int started_resolving; - /** master channel - the grpc_channel instance that ultimately owns - this channel_data via its channel stack. - We occasionally use this to bump the refcount on the master channel - to keep ourselves alive through an asynchronous operation. */ - grpc_channel *master; /** mutex protecting client configuration, including all variables below in this data structure */ @@ -81,8 +76,8 @@ typedef struct client_channel_channel_data { grpc_connectivity_state_tracker state_tracker; /** when an lb_policy arrives, should we try to exit idle */ int exit_idle_when_lb_policy_arrives; - /** pollset_set of interested parties in a new connection */ - grpc_pollset_set pollset_set; + /** owning stack */ + grpc_channel_stack *owning_stack; } channel_data; /** We create one watcher for each new lb_policy that is returned from a @@ -103,9 +98,7 @@ typedef struct { } waiting_call; static char *cc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { - channel_data *chand = elem->channel_data; - return grpc_subchannel_call_holder_get_peer(exec_ctx, elem->call_data, - chand->master); + return grpc_subchannel_call_holder_get_peer(exec_ctx, elem->call_data); } static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx, @@ -139,7 +132,7 @@ static void on_lb_policy_state_changed(grpc_exec_ctx *exec_ctx, void *arg, on_lb_policy_state_changed_locked(exec_ctx, w); gpr_mu_unlock(&w->chand->mu_config); - GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, w->chand->master, "watch_lb_policy"); + GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "watch_lb_policy"); gpr_free(w); } @@ -147,7 +140,7 @@ static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand, grpc_lb_policy *lb_policy, grpc_connectivity_state current_state) { lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w)); - GRPC_CHANNEL_INTERNAL_REF(chand->master, "watch_lb_policy"); + GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy"); w->chand = chand; grpc_closure_init(&w->on_changed, on_lb_policy_state_changed, w); @@ -200,7 +193,7 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, watch_lb_policy(exec_ctx, chand, lb_policy, state); } gpr_mu_unlock(&chand->mu_config); - GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver"); + GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); grpc_resolver_next(exec_ctx, resolver, &chand->incoming_configuration, &chand->on_config_changed); GRPC_RESOLVER_UNREF(exec_ctx, resolver, "channel-next"); @@ -230,7 +223,7 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "config_change"); } - GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, chand->master, "resolver"); + GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "resolver"); } static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, @@ -347,7 +340,7 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp, } if (chand->resolver != NULL && !chand->started_resolving) { chand->started_resolving = 1; - GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver"); + GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); grpc_resolver_next(exec_ctx, chand->resolver, &chand->incoming_configuration, &chand->on_config_changed); @@ -387,8 +380,6 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx, GPR_ASSERT(elem->filter == &grpc_client_channel_filter); gpr_mu_init(&chand->mu_config); - chand->master = args->master; - grpc_pollset_set_init(&chand->pollset_set); grpc_closure_init(&chand->on_config_changed, cc_on_config_changed, chand); grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, @@ -408,7 +399,6 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); } grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker); - grpc_pollset_set_destroy(&chand->pollset_set); gpr_mu_destroy(&chand->mu_config); } @@ -437,7 +427,7 @@ void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx, if (!grpc_closure_list_empty(chand->waiting_for_config_closures) || chand->exit_idle_when_lb_policy_arrives) { chand->started_resolving = 1; - GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver"); + GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); grpc_resolver_next(exec_ctx, resolver, &chand->incoming_configuration, &chand->on_config_changed); } @@ -456,7 +446,7 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state( } else { chand->exit_idle_when_lb_policy_arrives = 1; if (!chand->started_resolving && chand->resolver != NULL) { - GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver"); + GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); chand->started_resolving = 1; grpc_resolver_next(exec_ctx, chand->resolver, &chand->incoming_configuration, @@ -469,7 +459,7 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state( } void grpc_client_channel_watch_connectivity_state( - grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, + grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset, grpc_connectivity_state *state, grpc_closure *on_complete) { channel_data *chand = elem->channel_data; gpr_mu_lock(&chand->mu_config); @@ -477,23 +467,3 @@ void grpc_client_channel_watch_connectivity_state( exec_ctx, &chand->state_tracker, state, on_complete); gpr_mu_unlock(&chand->mu_config); } - -grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set( - grpc_channel_element *elem) { - channel_data *chand = elem->channel_data; - return &chand->pollset_set; -} - -void grpc_client_channel_add_interested_party(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, - grpc_pollset *pollset) { - channel_data *chand = elem->channel_data; - grpc_pollset_set_add_pollset(exec_ctx, &chand->pollset_set, pollset); -} - -void grpc_client_channel_del_interested_party(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, - grpc_pollset *pollset) { - channel_data *chand = elem->channel_data; - grpc_pollset_set_del_pollset(exec_ctx, &chand->pollset_set, pollset); -} diff --git a/src/core/channel/client_channel.h b/src/core/channel/client_channel.h index 5103f07a436707f1d124058ab52d373038a9f456..d9bc4971f1ae51837b858186982e62d3f58815a3 100644 --- a/src/core/channel/client_channel.h +++ b/src/core/channel/client_channel.h @@ -57,17 +57,7 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state( grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect); void grpc_client_channel_watch_connectivity_state( - grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, + grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset, grpc_connectivity_state *state, grpc_closure *on_complete); -grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set( - grpc_channel_element *elem); - -void grpc_client_channel_add_interested_party(grpc_exec_ctx *exec_ctx, - grpc_channel_element *channel, - grpc_pollset *pollset); -void grpc_client_channel_del_interested_party(grpc_exec_ctx *exec_ctx, - grpc_channel_element *channel, - grpc_pollset *pollset); - #endif /* GRPC_INTERNAL_CORE_CHANNEL_CLIENT_CHANNEL_H */ diff --git a/src/core/channel/client_uchannel.c b/src/core/channel/client_uchannel.c index 926bbde8389f48484d42e3233cf94bffcd7a5f01..b30d0301978eb5a7954e4f03e1b01a8a29c35b3a 100644 --- a/src/core/channel/client_uchannel.c +++ b/src/core/channel/client_uchannel.c @@ -58,7 +58,7 @@ typedef struct client_uchannel_channel_data { this channel_data via its channel stack. We occasionally use this to bump the refcount on the master channel to keep ourselves alive through an asynchronous operation. */ - grpc_channel *master; + grpc_channel_stack *owning_stack; /** connectivity state being tracked */ grpc_connectivity_state_tracker state_tracker; @@ -90,9 +90,7 @@ static void monitor_subchannel(grpc_exec_ctx *exec_ctx, void *arg, } static char *cuc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { - channel_data *chand = elem->channel_data; - return grpc_subchannel_call_holder_get_peer(exec_ctx, elem->call_data, - chand->master); + return grpc_subchannel_call_holder_get_peer(exec_ctx, elem->call_data); } static void cuc_start_transport_stream_op(grpc_exec_ctx *exec_ctx, @@ -158,7 +156,7 @@ static void cuc_init_channel_elem(grpc_exec_ctx *exec_ctx, grpc_closure_init(&chand->connectivity_cb, monitor_subchannel, chand); GPR_ASSERT(args->is_last); GPR_ASSERT(elem->filter == &grpc_client_uchannel_filter); - chand->master = args->master; + chand->owning_stack = args->channel_stack; grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, "client_uchannel"); gpr_mu_init(&chand->mu_state); @@ -216,33 +214,6 @@ void grpc_client_uchannel_watch_connectivity_state( gpr_mu_unlock(&chand->mu_state); } -grpc_pollset_set *grpc_client_uchannel_get_connecting_pollset_set( - grpc_channel_element *elem) { - channel_data *chand = elem->channel_data; - grpc_channel_element *parent_elem; - gpr_mu_lock(&chand->mu_state); - parent_elem = grpc_channel_stack_last_element( - grpc_channel_get_channel_stack(chand->master)); - gpr_mu_unlock(&chand->mu_state); - return grpc_client_channel_get_connecting_pollset_set(parent_elem); -} - -void grpc_client_uchannel_add_interested_party(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, - grpc_pollset *pollset) { - grpc_pollset_set *master_pollset_set = - grpc_client_uchannel_get_connecting_pollset_set(elem); - grpc_pollset_set_add_pollset(exec_ctx, master_pollset_set, pollset); -} - -void grpc_client_uchannel_del_interested_party(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, - grpc_pollset *pollset) { - grpc_pollset_set *master_pollset_set = - grpc_client_uchannel_get_connecting_pollset_set(elem); - grpc_pollset_set_del_pollset(exec_ctx, master_pollset_set, pollset); -} - grpc_channel *grpc_client_uchannel_create(grpc_subchannel *subchannel, grpc_channel_args *args) { grpc_channel *channel = NULL; diff --git a/src/core/channel/client_uchannel.h b/src/core/channel/client_uchannel.h index 120a3daf3dde7413f395d20e5feaf8114be2ed75..a5cf271042c9a572429fef3be8ffab603577b62e 100644 --- a/src/core/channel/client_uchannel.h +++ b/src/core/channel/client_uchannel.h @@ -51,16 +51,6 @@ void grpc_client_uchannel_watch_connectivity_state( grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_connectivity_state *state, grpc_closure *on_complete); -grpc_pollset_set *grpc_client_uchannel_get_connecting_pollset_set( - grpc_channel_element *elem); - -void grpc_client_uchannel_add_interested_party(grpc_exec_ctx *exec_ctx, - grpc_channel_element *channel, - grpc_pollset *pollset); -void grpc_client_uchannel_del_interested_party(grpc_exec_ctx *exec_ctx, - grpc_channel_element *channel, - grpc_pollset *pollset); - grpc_channel *grpc_client_uchannel_create(grpc_subchannel *subchannel, grpc_channel_args *args); diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c index 0e1efd965ab0e25af1620eb21950641d002df237..73a7dcc81ff5b9d43dbbedf4631e0cb9e8894c7e 100644 --- a/src/core/channel/connected_channel.c +++ b/src/core/channel/connected_channel.c @@ -91,7 +91,7 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); r = grpc_transport_init_stream(exec_ctx, chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), - args->refcount, args->server_transport_data); + &args->call_stack->refcount, args->server_transport_data); GPR_ASSERT(r == 0); } diff --git a/src/core/channel/subchannel_call_holder.c b/src/core/channel/subchannel_call_holder.c index 2e3d49e806e39373af2a77161992f0790be3f502..63069baa848312f0eca28b9c39d0058202a94105 100644 --- a/src/core/channel/subchannel_call_holder.c +++ b/src/core/channel/subchannel_call_holder.c @@ -244,13 +244,12 @@ static void fail_locked(grpc_exec_ctx *exec_ctx, } char *grpc_subchannel_call_holder_get_peer(grpc_exec_ctx *exec_ctx, - grpc_subchannel_call_holder *holder, - grpc_channel *master) { + grpc_subchannel_call_holder *holder) { grpc_subchannel_call *subchannel_call = GET_CALL(holder); if (subchannel_call) { return grpc_subchannel_call_get_peer(exec_ctx, subchannel_call); } else { - return grpc_channel_get_target(master); + return NULL; } } diff --git a/src/core/channel/subchannel_call_holder.h b/src/core/channel/subchannel_call_holder.h index 6328f353443774af657aed15e32dde1fcf021027..692c5e5316f7318154f5cf4fa468ae1d02d4a738 100644 --- a/src/core/channel/subchannel_call_holder.h +++ b/src/core/channel/subchannel_call_holder.h @@ -91,7 +91,6 @@ void grpc_subchannel_call_holder_perform_op(grpc_exec_ctx *exec_ctx, grpc_subchannel_call_holder *holder, grpc_transport_stream_op *op); char *grpc_subchannel_call_holder_get_peer(grpc_exec_ctx *exec_ctx, - grpc_subchannel_call_holder *holder, - grpc_channel *master); + grpc_subchannel_call_holder *holder); #endif diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 07a74e250f0d8e0d1228f171f236f38e3f775957..6cc2364eaa54e84a68a5d3d11749695cf6a135b7 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -56,11 +56,6 @@ ((grpc_connected_subchannel *)(gpr_atm_##barrier##_load( \ &(subchannel)->connected_subchannel))) -struct grpc_connected_subchannel { - /** refcount */ - gpr_refcount refs; -}; - typedef struct { grpc_closure closure; union { @@ -84,11 +79,6 @@ struct grpc_subchannel { /** address to connect to */ struct sockaddr *addr; size_t addr_len; - /** master channel - the grpc_channel instance that ultimately owns - this channel_data via its channel stack. - We occasionally use this to bump the refcount on the master channel - to keep ourselves alive through an asynchronous operation. */ - grpc_channel *master; /** set during connection */ grpc_connect_out_args connecting_result; @@ -97,10 +87,8 @@ struct grpc_subchannel { grpc_closure connected; /** pollset_set tracking who's interested in a connection - being setup - owned by the master channel (in particular the - client_channel - filter there-in) */ - grpc_pollset_set *pollset_set; + being setup */ + grpc_pollset_set pollset_set; /** active connection, or null; of type grpc_connected_subchannel */ gpr_atm connected_subchannel; @@ -132,7 +120,7 @@ struct grpc_subchannel_call { }; #define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1)) -#define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack *)((con) + 1)) +#define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack *)(con)) #define CALLSTACK_TO_SUBCHANNEL_CALL(callstack) \ (((grpc_subchannel_call *)(callstack)) - 1) @@ -151,6 +139,7 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel, connection_unref_locked((cl), (p), __FILE__, __LINE__, (r)) #define REF_PASS_ARGS , file, line, reason #define REF_PASS_REASON , reason +#define REF_REASON reason #define REF_LOG(name, p) \ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p ref %d -> %d %s", \ (name), (p), (p)->refs.count, (p)->refs.count + 1, reason) @@ -164,6 +153,7 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel, #define CONNECTION_UNREF_LOCKED(cl, p, r) connection_unref_locked((cl), (p)) #define REF_PASS_ARGS #define REF_PASS_REASON +#define REF_REASON "" #define REF_LOG(name, p) \ do { \ } while (0) @@ -185,18 +175,13 @@ static void connection_destroy(grpc_exec_ctx *exec_ctx, void *arg, void grpc_connected_subchannel_ref( grpc_connected_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - REF_LOG("CONNECTION", c); - gpr_ref(&c->refs); + GRPC_CHANNEL_STACK_REF(CHANNEL_STACK_FROM_CONNECTION(c), REF_REASON); } void grpc_connected_subchannel_unref(grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - UNREF_LOG("CONNECTION", c); - if (gpr_unref(&c->refs)) { - grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(connection_destroy, c), - 1); - } + GRPC_CHANNEL_STACK_UNREF(exec_ctx, CHANNEL_STACK_FROM_CONNECTION(c), REF_REASON); } /* @@ -215,6 +200,7 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg, gpr_free(c->addr); grpc_connectivity_state_destroy(exec_ctx, &c->state_tracker); grpc_connector_unref(exec_ctx, c->connector); + grpc_pollset_set_destroy(&c->pollset_set); gpr_free(c); } @@ -235,13 +221,13 @@ void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx, void grpc_subchannel_add_interested_party(grpc_exec_ctx *exec_ctx, grpc_subchannel *c, grpc_pollset *pollset) { - grpc_pollset_set_add_pollset(exec_ctx, c->pollset_set, pollset); + grpc_pollset_set_add_pollset(exec_ctx, &c->pollset_set, pollset); } void grpc_subchannel_del_interested_party(grpc_exec_ctx *exec_ctx, grpc_subchannel *c, grpc_pollset *pollset) { - grpc_pollset_set_del_pollset(exec_ctx, c->pollset_set, pollset); + grpc_pollset_set_del_pollset(exec_ctx, &c->pollset_set, pollset); } static gpr_uint32 random_seed() { @@ -251,8 +237,6 @@ static gpr_uint32 random_seed() { grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, grpc_subchannel_args *args) { grpc_subchannel *c = gpr_malloc(sizeof(*c)); - grpc_channel_element *parent_elem = grpc_channel_stack_last_element( - grpc_channel_get_channel_stack(args->master)); memset(c, 0, sizeof(*c)); gpr_ref_init(&c->refs, 1); c->connector = connector; @@ -263,10 +247,9 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, sizeof(grpc_channel_filter *) * c->num_filters); c->addr = gpr_malloc(args->addr_len); memcpy(c->addr, args->addr, args->addr_len); + grpc_pollset_set_init(&c->pollset_set); c->addr_len = args->addr_len; c->args = grpc_channel_args_copy(args->args); - c->master = args->master; - c->pollset_set = grpc_client_channel_get_connecting_pollset_set(parent_elem); c->random = random_seed(); grpc_closure_init(&c->connected, subchannel_connected, c); grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE, @@ -278,7 +261,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { grpc_connect_in_args args; - args.interested_parties = c->pollset_set; + args.interested_parties = &c->pollset_set; args.addr = c->addr; args.addr_len = c->addr_len; args.deadline = compute_connect_deadline(c); @@ -318,7 +301,6 @@ void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx, c->connecting = 1; /* released by connection */ GRPC_SUBCHANNEL_REF(c, "connecting"); - GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting"); } gpr_mu_unlock(&c->mu); @@ -448,10 +430,9 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { /* construct channel stack */ channel_stack_size = grpc_channel_stack_size(filters, num_filters); - con = gpr_malloc(sizeof(grpc_connected_subchannel) + channel_stack_size); - stk = (grpc_channel_stack *)(con + 1); - gpr_ref_init(&con->refs, 1); - grpc_channel_stack_init(exec_ctx, filters, num_filters, c->master, c->args, + con = gpr_malloc(channel_stack_size); + stk = CHANNEL_STACK_FROM_CONNECTION(con); + grpc_channel_stack_init(exec_ctx, 1, connection_destroy, con, filters, num_filters, c->args, stk); grpc_connected_channel_bind_transport(stk, c->connecting_result.transport); gpr_free((void *)c->connecting_result.filters); @@ -471,7 +452,6 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { gpr_free(sw_subchannel); gpr_free((void *)filters); grpc_channel_stack_destroy(exec_ctx, stk); - GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->master, "connecting"); GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting"); return; } @@ -495,7 +475,6 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { gpr_mu_unlock(&c->mu); gpr_free((void *)filters); - GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->master, "connecting"); } /* Generate a random number between 0 and 1. */ @@ -554,7 +533,6 @@ static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) { update_reconnect_parameters(c); continue_connect(exec_ctx, c); } else { - GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->master, "connecting"); GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting"); } } @@ -605,21 +583,13 @@ static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call, void grpc_subchannel_call_ref(grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { -#ifdef GRPC_STREAM_REFCOUNT_DEBUG - grpc_call_stack_ref(SUBCHANNEL_CALL_TO_CALL_STACK(c), reason); -#else - grpc_call_stack_ref(SUBCHANNEL_CALL_TO_CALL_STACK(c)); -#endif + GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON); } void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx, grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { -#ifdef GRPC_STREAM_REFCOUNT_DEBUG - grpc_call_stack_unref(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), reason); -#else - grpc_call_stack_unref(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c)); -#endif + GRPC_CALL_STACK_UNREF(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON); } char *grpc_subchannel_call_get_peer(grpc_exec_ctx *exec_ctx, diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index 14eb4baa1fde43c03a1868f7733edd8e41131292..b7db3638666939f45a31549d05f6580c7c87918d 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -161,15 +161,10 @@ struct grpc_subchannel_args { /** Address to connect to */ struct sockaddr *addr; size_t addr_len; - /** master channel */ - grpc_channel *master; }; /** create a subchannel given a connector */ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, grpc_subchannel_args *args); -/** Return the master channel associated with the subchannel */ -grpc_channel *grpc_subchannel_get_master(grpc_subchannel *subchannel); - #endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_H */ diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index 859197412bb4ee9a95fde563bd804c1d743346bd..1632b79d0e737f6a5b326c31dc803139ac579db7 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -63,7 +63,6 @@ typedef struct registered_call { struct grpc_channel { int is_client; - gpr_refcount refs; gpr_uint32 max_message_length; grpc_mdelem *default_authority; diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c index 471b5a71e73b355cc3c69731e177cffb470a87a0..344b4d79f7e396967229590b4b1b3b9dd48b7250 100644 --- a/src/core/surface/secure_channel_create.c +++ b/src/core/surface/secure_channel_create.c @@ -207,7 +207,6 @@ static grpc_subchannel *subchannel_factory_create_subchannel( gpr_mu_init(&c->mu); gpr_ref_init(&c->refs, 1); args->args = final_args; - args->master = f->master; s = grpc_subchannel_create(&c->base, args); grpc_connector_unref(exec_ctx, &c->base); grpc_channel_args_destroy(final_args); diff --git a/test/core/end2end/fixtures/h2_uchannel.c b/test/core/end2end/fixtures/h2_uchannel.c index 3add8e800794413b50602e2959a2a8adda0052bb..56d7d295fb26ef8e42b51138a16d9a2aa965d2f8 100644 --- a/test/core/end2end/fixtures/h2_uchannel.c +++ b/test/core/end2end/fixtures/h2_uchannel.c @@ -159,7 +159,6 @@ static grpc_subchannel *subchannel_factory_create_subchannel( c->base.vtable = &connector_vtable; gpr_ref_init(&c->refs, 1); args->args = final_args; - args->master = f->master; s = grpc_subchannel_create(&c->base, args); grpc_connector_unref(exec_ctx, &c->base); grpc_channel_args_destroy(final_args);