Skip to content
Snippets Groups Projects
Commit 032f3985 authored by Alexander Polcyn's avatar Alexander Polcyn
Browse files

cleanup

parent 7e444806
No related branches found
No related tags found
No related merge requests found
...@@ -103,7 +103,7 @@ static void destroy_call(grpc_rb_call *call) { ...@@ -103,7 +103,7 @@ static void destroy_call(grpc_rb_call *call) {
if (call->wrapped != NULL) { if (call->wrapped != NULL) {
grpc_call_destroy(call->wrapped); grpc_call_destroy(call->wrapped);
call->wrapped = NULL; call->wrapped = NULL;
grpc_rb_completion_queue_safe_destroy(call->queue); grpc_rb_completion_queue_destroy(call->queue);
call->queue = NULL; call->queue = NULL;
} }
} }
......
...@@ -52,27 +52,28 @@ ...@@ -52,27 +52,28 @@
/* id_channel is the name of the hidden ivar that preserves a reference to the /* id_channel is the name of the hidden ivar that preserves a reference to the
* channel on a call, so that calls are not GCed before their channel. */ * channel on a call, so that calls are not GCed before their channel. */
ID id_channel; static ID id_channel;
/* id_target is the name of the hidden ivar that preserves a reference to the /* id_target is the name of the hidden ivar that preserves a reference to the
* target string used to create the call, preserved so that it does not get * target string used to create the call, preserved so that it does not get
* GCed before the channel */ * GCed before the channel */
ID id_target; static ID id_target;
/* id_insecure_channel is used to indicate that a channel is insecure */ /* id_insecure_channel is used to indicate that a channel is insecure */
VALUE id_insecure_channel; static VALUE id_insecure_channel;
/* grpc_rb_cChannel is the ruby class that proxies grpc_channel. */ /* grpc_rb_cChannel is the ruby class that proxies grpc_channel. */
VALUE grpc_rb_cChannel = Qnil; static VALUE grpc_rb_cChannel = Qnil;
/* Used during the conversion of a hash to channel args during channel setup */ /* Used during the conversion of a hash to channel args during channel setup */
VALUE grpc_rb_cChannelArgs; static VALUE grpc_rb_cChannelArgs;
typedef struct bg_watched_channel { typedef struct bg_watched_channel {
grpc_channel *channel; grpc_channel *channel;
// these fields must only be accessed under global_connection_polling_mu
struct bg_watched_channel *next; struct bg_watched_channel *next;
int channel_destroyed; int channel_destroyed;
int refcount; // must only be accessed under global_connection_polling_mu int refcount;
} bg_watched_channel; } bg_watched_channel;
/* grpc_rb_channel wraps a grpc_channel. */ /* grpc_rb_channel wraps a grpc_channel. */
...@@ -101,32 +102,34 @@ typedef struct watch_state_op { ...@@ -101,32 +102,34 @@ typedef struct watch_state_op {
} op; } op;
} watch_state_op; } watch_state_op;
bg_watched_channel *bg_watched_channel_list_head = NULL; static bg_watched_channel *bg_watched_channel_list_head = NULL;
void grpc_rb_channel_try_register_connection_polling(bg_watched_channel *bg); static void grpc_rb_channel_try_register_connection_polling(
void *wait_until_channel_polling_thread_started_no_gil(void *); bg_watched_channel *bg);
void *channel_init_try_register_connection_polling_without_gil(void *arg); static void *wait_until_channel_polling_thread_started_no_gil(void *);
static void *channel_init_try_register_connection_polling_without_gil(
void *arg);
typedef struct channel_init_try_register_stack { typedef struct channel_init_try_register_stack {
grpc_channel *channel; grpc_channel *channel;
grpc_rb_channel *wrapper; grpc_rb_channel *wrapper;
} channel_init_try_register_stack; } channel_init_try_register_stack;
grpc_completion_queue *channel_polling_cq; static grpc_completion_queue *channel_polling_cq;
gpr_mu global_connection_polling_mu; static gpr_mu global_connection_polling_mu;
gpr_cv global_connection_polling_cv; static gpr_cv global_connection_polling_cv;
int abort_channel_polling = 0; static int abort_channel_polling = 0;
int channel_polling_thread_started = 0; static int channel_polling_thread_started = 0;
int bg_watched_channel_list_lookup(bg_watched_channel *bg); static int bg_watched_channel_list_lookup(bg_watched_channel *bg);
bg_watched_channel *bg_watched_channel_list_create_and_add( static bg_watched_channel *bg_watched_channel_list_create_and_add(
grpc_channel *channel); grpc_channel *channel);
void bg_watched_channel_list_free_and_remove(bg_watched_channel *bg); static void bg_watched_channel_list_free_and_remove(bg_watched_channel *bg);
void run_poll_channels_loop_unblocking_func(void *arg); static void run_poll_channels_loop_unblocking_func(void *arg);
// Needs to be called under global_connection_polling_mu // Needs to be called under global_connection_polling_mu
void grpc_rb_channel_watch_connection_state_op_complete(watch_state_op *op, static void grpc_rb_channel_watch_connection_state_op_complete(
int success) { watch_state_op *op, int success) {
GPR_ASSERT(!op->op.api_callback_args.called_back); GPR_ASSERT(!op->op.api_callback_args.called_back);
op->op.api_callback_args.called_back = 1; op->op.api_callback_args.called_back = 1;
op->op.api_callback_args.success = success; op->op.api_callback_args.success = success;
...@@ -135,7 +138,7 @@ void grpc_rb_channel_watch_connection_state_op_complete(watch_state_op *op, ...@@ -135,7 +138,7 @@ void grpc_rb_channel_watch_connection_state_op_complete(watch_state_op *op,
} }
/* Avoids destroying a channel twice. */ /* Avoids destroying a channel twice. */
void grpc_rb_channel_safe_destroy(bg_watched_channel *bg) { static void grpc_rb_channel_safe_destroy(bg_watched_channel *bg) {
gpr_mu_lock(&global_connection_polling_mu); gpr_mu_lock(&global_connection_polling_mu);
GPR_ASSERT(bg_watched_channel_list_lookup(bg)); GPR_ASSERT(bg_watched_channel_list_lookup(bg));
if (!bg->channel_destroyed) { if (!bg->channel_destroyed) {
...@@ -149,13 +152,13 @@ void grpc_rb_channel_safe_destroy(bg_watched_channel *bg) { ...@@ -149,13 +152,13 @@ void grpc_rb_channel_safe_destroy(bg_watched_channel *bg) {
gpr_mu_unlock(&global_connection_polling_mu); gpr_mu_unlock(&global_connection_polling_mu);
} }
void *channel_safe_destroy_without_gil(void *arg) { static void *channel_safe_destroy_without_gil(void *arg) {
grpc_rb_channel_safe_destroy((bg_watched_channel *)arg); grpc_rb_channel_safe_destroy((bg_watched_channel *)arg);
return NULL; return NULL;
} }
/* Destroys Channel instances. */ /* Destroys Channel instances. */
void grpc_rb_channel_free(void *p) { static void grpc_rb_channel_free(void *p) {
grpc_rb_channel *ch = NULL; grpc_rb_channel *ch = NULL;
if (p == NULL) { if (p == NULL) {
return; return;
...@@ -165,7 +168,8 @@ void grpc_rb_channel_free(void *p) { ...@@ -165,7 +168,8 @@ void grpc_rb_channel_free(void *p) {
if (ch->bg_wrapped != NULL) { if (ch->bg_wrapped != NULL) {
/* assumption made here: it's ok to directly gpr_mu_lock the global /* assumption made here: it's ok to directly gpr_mu_lock the global
* connection polling mutex becuse we're in a finalizer, * connection polling mutex becuse we're in a finalizer,
* and we can count on this thread to not be interrupted. */ * and we can count on this thread to not be interrupted or
* yield the gil. */
grpc_rb_channel_safe_destroy(ch->bg_wrapped); grpc_rb_channel_safe_destroy(ch->bg_wrapped);
ch->bg_wrapped = NULL; ch->bg_wrapped = NULL;
} }
...@@ -174,7 +178,7 @@ void grpc_rb_channel_free(void *p) { ...@@ -174,7 +178,7 @@ void grpc_rb_channel_free(void *p) {
} }
/* Protects the mark object from GC */ /* Protects the mark object from GC */
void grpc_rb_channel_mark(void *p) { static void grpc_rb_channel_mark(void *p) {
grpc_rb_channel *channel = NULL; grpc_rb_channel *channel = NULL;
if (p == NULL) { if (p == NULL) {
return; return;
...@@ -185,20 +189,20 @@ void grpc_rb_channel_mark(void *p) { ...@@ -185,20 +189,20 @@ void grpc_rb_channel_mark(void *p) {
} }
} }
rb_data_type_t grpc_channel_data_type = {"grpc_channel", static rb_data_type_t grpc_channel_data_type = {"grpc_channel",
{grpc_rb_channel_mark, {grpc_rb_channel_mark,
grpc_rb_channel_free, grpc_rb_channel_free,
GRPC_RB_MEMSIZE_UNAVAILABLE, GRPC_RB_MEMSIZE_UNAVAILABLE,
{NULL, NULL}}, {NULL, NULL}},
NULL, NULL,
NULL, NULL,
#ifdef RUBY_TYPED_FREE_IMMEDIATELY #ifdef RUBY_TYPED_FREE_IMMEDIATELY
RUBY_TYPED_FREE_IMMEDIATELY RUBY_TYPED_FREE_IMMEDIATELY
#endif #endif
}; };
/* Allocates grpc_rb_channel instances. */ /* Allocates grpc_rb_channel instances. */
VALUE grpc_rb_channel_alloc(VALUE cls) { static VALUE grpc_rb_channel_alloc(VALUE cls) {
grpc_rb_channel *wrapper = ALLOC(grpc_rb_channel); grpc_rb_channel *wrapper = ALLOC(grpc_rb_channel);
wrapper->bg_wrapped = NULL; wrapper->bg_wrapped = NULL;
wrapper->credentials = Qnil; wrapper->credentials = Qnil;
...@@ -213,7 +217,7 @@ VALUE grpc_rb_channel_alloc(VALUE cls) { ...@@ -213,7 +217,7 @@ VALUE grpc_rb_channel_alloc(VALUE cls) {
secure_channel = Channel:new("myhost:443", {'arg1': 'value1'}, creds) secure_channel = Channel:new("myhost:443", {'arg1': 'value1'}, creds)
Creates channel instances. */ Creates channel instances. */
VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) { static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) {
VALUE channel_args = Qnil; VALUE channel_args = Qnil;
VALUE credentials = Qnil; VALUE credentials = Qnil;
VALUE target = Qnil; VALUE target = Qnil;
...@@ -274,13 +278,15 @@ typedef struct get_state_stack { ...@@ -274,13 +278,15 @@ typedef struct get_state_stack {
int out; int out;
} get_state_stack; } get_state_stack;
void *get_state_without_gil(void *arg) { static void *get_state_without_gil(void *arg) {
get_state_stack *stack = (get_state_stack *)arg; get_state_stack *stack = (get_state_stack *)arg;
gpr_mu_lock(&global_connection_polling_mu); gpr_mu_lock(&global_connection_polling_mu);
GPR_ASSERT(abort_channel_polling || channel_polling_thread_started); GPR_ASSERT(abort_channel_polling || channel_polling_thread_started);
if (abort_channel_polling) { if (abort_channel_polling) {
// the case in which the channel polling thread // Assume that this channel has been destroyed by the
// background thread.
// The case in which the channel polling thread
// failed to start just always shows shutdown state. // failed to start just always shows shutdown state.
stack->out = GRPC_CHANNEL_SHUTDOWN; stack->out = GRPC_CHANNEL_SHUTDOWN;
} else { } else {
...@@ -301,16 +307,14 @@ void *get_state_without_gil(void *arg) { ...@@ -301,16 +307,14 @@ void *get_state_without_gil(void *arg) {
constants defined in GRPC::Core::ConnectivityStates. constants defined in GRPC::Core::ConnectivityStates.
It also tries to connect if the chennel is idle in the second form. */ It also tries to connect if the chennel is idle in the second form. */
VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv, static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv,
VALUE self) { VALUE self) {
VALUE try_to_connect_param = Qfalse; VALUE try_to_connect_param = Qfalse;
int grpc_try_to_connect = 0;
grpc_rb_channel *wrapper = NULL; grpc_rb_channel *wrapper = NULL;
get_state_stack stack; get_state_stack stack;
/* "01" == 0 mandatory args, 1 (try_to_connect) is optional */ /* "01" == 0 mandatory args, 1 (try_to_connect) is optional */
rb_scan_args(argc, argv, "01", &try_to_connect_param); rb_scan_args(argc, argv, "01", &try_to_connect_param);
grpc_try_to_connect = RTEST(try_to_connect_param) ? 1 : 0;
TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper); TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
if (wrapper->bg_wrapped == NULL) { if (wrapper->bg_wrapped == NULL) {
...@@ -319,7 +323,7 @@ VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv, ...@@ -319,7 +323,7 @@ VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv,
} }
stack.channel = wrapper->bg_wrapped->channel; stack.channel = wrapper->bg_wrapped->channel;
stack.try_to_connect = grpc_try_to_connect; stack.try_to_connect = RTEST(try_to_connect_param) ? 1 : 0;
rb_thread_call_without_gvl(get_state_without_gil, &stack, NULL, NULL); rb_thread_call_without_gvl(get_state_without_gil, &stack, NULL, NULL);
return LONG2NUM(stack.out); return LONG2NUM(stack.out);
...@@ -331,7 +335,7 @@ typedef struct watch_state_stack { ...@@ -331,7 +335,7 @@ typedef struct watch_state_stack {
int last_state; int last_state;
} watch_state_stack; } watch_state_stack;
void *wait_for_watch_state_op_complete_without_gvl(void *arg) { static void *wait_for_watch_state_op_complete_without_gvl(void *arg) {
watch_state_stack *stack = (watch_state_stack *)arg; watch_state_stack *stack = (watch_state_stack *)arg;
watch_state_op *op = NULL; watch_state_op *op = NULL;
void *success = (void *)0; void *success = (void *)0;
...@@ -345,7 +349,6 @@ void *wait_for_watch_state_op_complete_without_gvl(void *arg) { ...@@ -345,7 +349,6 @@ void *wait_for_watch_state_op_complete_without_gvl(void *arg) {
} }
op = gpr_zalloc(sizeof(watch_state_op)); op = gpr_zalloc(sizeof(watch_state_op));
op->op_type = WATCH_STATE_API; op->op_type = WATCH_STATE_API;
// one ref for this thread and another for the callback-running thread
grpc_channel_watch_connectivity_state(stack->channel, stack->last_state, grpc_channel_watch_connectivity_state(stack->channel, stack->last_state,
stack->deadline, channel_polling_cq, stack->deadline, channel_polling_cq,
op); op);
...@@ -370,8 +373,9 @@ void *wait_for_watch_state_op_complete_without_gvl(void *arg) { ...@@ -370,8 +373,9 @@ void *wait_for_watch_state_op_complete_without_gvl(void *arg) {
* Returns false if "deadline" expires before the channel's connectivity * Returns false if "deadline" expires before the channel's connectivity
* state changes from "last_state". * state changes from "last_state".
* */ * */
VALUE grpc_rb_channel_watch_connectivity_state(VALUE self, VALUE last_state, static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self,
VALUE deadline) { VALUE last_state,
VALUE deadline) {
grpc_rb_channel *wrapper = NULL; grpc_rb_channel *wrapper = NULL;
watch_state_stack stack; watch_state_stack stack;
void *op_success = 0; void *op_success = 0;
...@@ -403,8 +407,9 @@ VALUE grpc_rb_channel_watch_connectivity_state(VALUE self, VALUE last_state, ...@@ -403,8 +407,9 @@ VALUE grpc_rb_channel_watch_connectivity_state(VALUE self, VALUE last_state,
/* Create a call given a grpc_channel, in order to call method. The request /* Create a call given a grpc_channel, in order to call method. The request
is not sent until grpc_call_invoke is called. */ is not sent until grpc_call_invoke is called. */
VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, VALUE mask, static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, VALUE mask,
VALUE method, VALUE host, VALUE deadline) { VALUE method, VALUE host,
VALUE deadline) {
VALUE res = Qnil; VALUE res = Qnil;
grpc_rb_channel *wrapper = NULL; grpc_rb_channel *wrapper = NULL;
grpc_call *call = NULL; grpc_call *call = NULL;
...@@ -466,7 +471,7 @@ VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, VALUE mask, ...@@ -466,7 +471,7 @@ VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, VALUE mask,
/* Closes the channel, calling it's destroy method */ /* Closes the channel, calling it's destroy method */
/* Note this is an API-level call; a wrapped channel's finalizer doesn't call /* Note this is an API-level call; a wrapped channel's finalizer doesn't call
* this */ * this */
VALUE grpc_rb_channel_destroy(VALUE self) { static VALUE grpc_rb_channel_destroy(VALUE self) {
grpc_rb_channel *wrapper = NULL; grpc_rb_channel *wrapper = NULL;
TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper); TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
...@@ -480,7 +485,7 @@ VALUE grpc_rb_channel_destroy(VALUE self) { ...@@ -480,7 +485,7 @@ VALUE grpc_rb_channel_destroy(VALUE self) {
} }
/* Called to obtain the target that this channel accesses. */ /* Called to obtain the target that this channel accesses. */
VALUE grpc_rb_channel_get_target(VALUE self) { static VALUE grpc_rb_channel_get_target(VALUE self) {
grpc_rb_channel *wrapper = NULL; grpc_rb_channel *wrapper = NULL;
VALUE res = Qnil; VALUE res = Qnil;
char *target = NULL; char *target = NULL;
...@@ -494,7 +499,7 @@ VALUE grpc_rb_channel_get_target(VALUE self) { ...@@ -494,7 +499,7 @@ VALUE grpc_rb_channel_get_target(VALUE self) {
} }
/* Needs to be called under global_connection_polling_mu */ /* Needs to be called under global_connection_polling_mu */
int bg_watched_channel_list_lookup(bg_watched_channel *target) { static int bg_watched_channel_list_lookup(bg_watched_channel *target) {
bg_watched_channel *cur = bg_watched_channel_list_head; bg_watched_channel *cur = bg_watched_channel_list_head;
while (cur != NULL) { while (cur != NULL) {
...@@ -508,7 +513,7 @@ int bg_watched_channel_list_lookup(bg_watched_channel *target) { ...@@ -508,7 +513,7 @@ int bg_watched_channel_list_lookup(bg_watched_channel *target) {
} }
/* Needs to be called under global_connection_polling_mu */ /* Needs to be called under global_connection_polling_mu */
bg_watched_channel *bg_watched_channel_list_create_and_add( static bg_watched_channel *bg_watched_channel_list_create_and_add(
grpc_channel *channel) { grpc_channel *channel) {
bg_watched_channel *watched = gpr_zalloc(sizeof(bg_watched_channel)); bg_watched_channel *watched = gpr_zalloc(sizeof(bg_watched_channel));
...@@ -520,7 +525,8 @@ bg_watched_channel *bg_watched_channel_list_create_and_add( ...@@ -520,7 +525,8 @@ bg_watched_channel *bg_watched_channel_list_create_and_add(
} }
/* Needs to be called under global_connection_polling_mu */ /* Needs to be called under global_connection_polling_mu */
void bg_watched_channel_list_free_and_remove(bg_watched_channel *target) { static void bg_watched_channel_list_free_and_remove(
bg_watched_channel *target) {
bg_watched_channel *bg = NULL; bg_watched_channel *bg = NULL;
GPR_ASSERT(bg_watched_channel_list_lookup(target)); GPR_ASSERT(bg_watched_channel_list_lookup(target));
...@@ -544,7 +550,8 @@ void bg_watched_channel_list_free_and_remove(bg_watched_channel *target) { ...@@ -544,7 +550,8 @@ void bg_watched_channel_list_free_and_remove(bg_watched_channel *target) {
/* Initialize a grpc_rb_channel's "protected grpc_channel" and try to push /* Initialize a grpc_rb_channel's "protected grpc_channel" and try to push
* it onto the background thread for constant watches. */ * it onto the background thread for constant watches. */
void *channel_init_try_register_connection_polling_without_gil(void *arg) { static void *channel_init_try_register_connection_polling_without_gil(
void *arg) {
channel_init_try_register_stack *stack = channel_init_try_register_stack *stack =
(channel_init_try_register_stack *)arg; (channel_init_try_register_stack *)arg;
...@@ -557,7 +564,8 @@ void *channel_init_try_register_connection_polling_without_gil(void *arg) { ...@@ -557,7 +564,8 @@ void *channel_init_try_register_connection_polling_without_gil(void *arg) {
} }
// Needs to be called under global_connection_poolling_mu // Needs to be called under global_connection_poolling_mu
void grpc_rb_channel_try_register_connection_polling(bg_watched_channel *bg) { static void grpc_rb_channel_try_register_connection_polling(
bg_watched_channel *bg) {
grpc_connectivity_state conn_state; grpc_connectivity_state conn_state;
watch_state_op *op = NULL; watch_state_op *op = NULL;
...@@ -599,7 +607,7 @@ void grpc_rb_channel_try_register_connection_polling(bg_watched_channel *bg) { ...@@ -599,7 +607,7 @@ void grpc_rb_channel_try_register_connection_polling(bg_watched_channel *bg) {
// indicates process shutdown. // indicates process shutdown.
// In the worst case, this stops polling channel connectivity // In the worst case, this stops polling channel connectivity
// early and falls back to current behavior. // early and falls back to current behavior.
void *run_poll_channels_loop_no_gil(void *arg) { static void *run_poll_channels_loop_no_gil(void *arg) {
grpc_event event; grpc_event event;
watch_state_op *op = NULL; watch_state_op *op = NULL;
bg_watched_channel *bg = NULL; bg_watched_channel *bg = NULL;
...@@ -643,7 +651,7 @@ void *run_poll_channels_loop_no_gil(void *arg) { ...@@ -643,7 +651,7 @@ void *run_poll_channels_loop_no_gil(void *arg) {
} }
// Notify the channel polling loop to cleanup and shutdown. // Notify the channel polling loop to cleanup and shutdown.
void run_poll_channels_loop_unblocking_func(void *arg) { static void run_poll_channels_loop_unblocking_func(void *arg) {
bg_watched_channel *bg = NULL; bg_watched_channel *bg = NULL;
(void)arg; (void)arg;
...@@ -677,7 +685,7 @@ void run_poll_channels_loop_unblocking_func(void *arg) { ...@@ -677,7 +685,7 @@ void run_poll_channels_loop_unblocking_func(void *arg) {
} }
// Poll channel connectivity states in background thread without the GIL. // Poll channel connectivity states in background thread without the GIL.
VALUE run_poll_channels_loop(VALUE arg) { static VALUE run_poll_channels_loop(VALUE arg) {
(void)arg; (void)arg;
gpr_log( gpr_log(
GPR_DEBUG, GPR_DEBUG,
...@@ -688,7 +696,7 @@ VALUE run_poll_channels_loop(VALUE arg) { ...@@ -688,7 +696,7 @@ VALUE run_poll_channels_loop(VALUE arg) {
return Qnil; return Qnil;
} }
void *wait_until_channel_polling_thread_started_no_gil(void *arg) { static void *wait_until_channel_polling_thread_started_no_gil(void *arg) {
(void)arg; (void)arg;
gpr_log(GPR_DEBUG, "GRPC_RUBY: wait for channel polling thread to start"); gpr_log(GPR_DEBUG, "GRPC_RUBY: wait for channel polling thread to start");
gpr_mu_lock(&global_connection_polling_mu); gpr_mu_lock(&global_connection_polling_mu);
...@@ -740,7 +748,7 @@ void grpc_rb_channel_polling_thread_start() { ...@@ -740,7 +748,7 @@ void grpc_rb_channel_polling_thread_start() {
} }
} }
void Init_grpc_propagate_masks() { static void Init_grpc_propagate_masks() {
/* Constants representing call propagation masks in grpc.h */ /* Constants representing call propagation masks in grpc.h */
VALUE grpc_rb_mPropagateMasks = VALUE grpc_rb_mPropagateMasks =
rb_define_module_under(grpc_rb_mGrpcCore, "PropagateMasks"); rb_define_module_under(grpc_rb_mGrpcCore, "PropagateMasks");
...@@ -756,7 +764,7 @@ void Init_grpc_propagate_masks() { ...@@ -756,7 +764,7 @@ void Init_grpc_propagate_masks() {
UINT2NUM(GRPC_PROPAGATE_DEFAULTS)); UINT2NUM(GRPC_PROPAGATE_DEFAULTS));
} }
void Init_grpc_connectivity_states() { static void Init_grpc_connectivity_states() {
/* Constants representing call propagation masks in grpc.h */ /* Constants representing call propagation masks in grpc.h */
VALUE grpc_rb_mConnectivityStates = VALUE grpc_rb_mConnectivityStates =
rb_define_module_under(grpc_rb_mGrpcCore, "ConnectivityStates"); rb_define_module_under(grpc_rb_mGrpcCore, "ConnectivityStates");
......
...@@ -71,16 +71,12 @@ static void *grpc_rb_completion_queue_pluck_no_gil(void *param) { ...@@ -71,16 +71,12 @@ static void *grpc_rb_completion_queue_pluck_no_gil(void *param) {
} }
/* Helper function to free a completion queue. */ /* Helper function to free a completion queue. */
void grpc_rb_completion_queue_safe_destroy(grpc_completion_queue *cq) { void grpc_rb_completion_queue_destroy(grpc_completion_queue *cq) {
grpc_event ev; /* Every function that adds an event to a queue also synchronously plucks
that event from the queue, and holds a reference to the Ruby object that
holds the queue, so we only get to this point if all of those functions
have completed, and the queue is empty */
grpc_completion_queue_shutdown(cq); grpc_completion_queue_shutdown(cq);
for(;;) {
ev = grpc_completion_queue_pluck(cq, NULL, gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
if (ev.type == GRPC_QUEUE_SHUTDOWN) {
break;
}
}
grpc_completion_queue_destroy(cq); grpc_completion_queue_destroy(cq);
} }
......
...@@ -38,7 +38,7 @@ ...@@ -38,7 +38,7 @@
#include <grpc/grpc.h> #include <grpc/grpc.h>
void grpc_rb_completion_queue_safe_destroy(grpc_completion_queue *cq); void grpc_rb_completion_queue_destroy(grpc_completion_queue *cq);
/** /**
* Makes the implementation of CompletionQueue#pluck available in other files * Makes the implementation of CompletionQueue#pluck available in other files
......
...@@ -42,7 +42,6 @@ ...@@ -42,7 +42,6 @@
#include <grpc/grpc.h> #include <grpc/grpc.h>
#include <grpc/support/time.h> #include <grpc/support/time.h>
#include <grpc/support/log.h>
#include "rb_call.h" #include "rb_call.h"
#include "rb_call_credentials.h" #include "rb_call_credentials.h"
#include "rb_channel.h" #include "rb_channel.h"
......
...@@ -77,7 +77,7 @@ static void destroy_server(grpc_rb_server *server, gpr_timespec deadline) { ...@@ -77,7 +77,7 @@ static void destroy_server(grpc_rb_server *server, gpr_timespec deadline) {
gpr_inf_future(GPR_CLOCK_REALTIME), NULL); gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
} }
grpc_server_destroy(server->wrapped); grpc_server_destroy(server->wrapped);
grpc_rb_completion_queue_safe_destroy(server->queue); grpc_rb_completion_queue_destroy(server->queue);
server->wrapped = NULL; server->wrapped = NULL;
server->queue = NULL; server->queue = NULL;
} }
......
...@@ -143,8 +143,7 @@ describe 'channel connection behavior' do ...@@ -143,8 +143,7 @@ describe 'channel connection behavior' do
stop_server stop_server
end end
it 'observably connects and reconnects to transient server' \ it 'concurrent watches on the same channel' do
' when using the channel state API' do
timeout(180) do timeout(180) do
port = start_server port = start_server
ch = GRPC::Core::Channel.new("localhost:#{port}", {}, ch = GRPC::Core::Channel.new("localhost:#{port}", {},
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment