diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c index 4e59174c3b3d4990136c5d3273f54f791d2d4c18..449c91a0f98a2dcbe575ac6e4e1a706f849eaa13 100644 --- a/src/ruby/ext/grpc/rb_channel.c +++ b/src/ruby/ext/grpc/rb_channel.c @@ -34,9 +34,9 @@ #include <ruby/ruby.h> #include <ruby/thread.h> -#include "rb_grpc_imports.generated.h" #include "rb_byte_buffer.h" #include "rb_channel.h" +#include "rb_grpc_imports.generated.h" #include <grpc/grpc.h> #include <grpc/grpc_security.h> @@ -72,21 +72,19 @@ typedef struct bg_watched_channel { grpc_channel *channel; struct bg_watched_channel *next; int channel_destroyed; - int refcount; // must only be accessed under global_connection_polling_mu + int refcount; // must only be accessed under global_connection_polling_mu } bg_watched_channel; /* grpc_rb_channel wraps a grpc_channel. */ typedef struct grpc_rb_channel { VALUE credentials; - /* The actual channel (protected in a wrapper to tell when it's safe to destroy) */ + /* The actual channel (protected in a wrapper to tell when it's safe to + * destroy) */ bg_watched_channel *bg_wrapped; } grpc_rb_channel; -typedef enum { - CONTINUOUS_WATCH, - WATCH_STATE_API -} watch_state_op_type; +typedef enum { CONTINUOUS_WATCH, WATCH_STATE_API } watch_state_op_type; typedef struct watch_state_op { watch_state_op_type op_type; @@ -106,7 +104,7 @@ typedef struct watch_state_op { bg_watched_channel *bg_watched_channel_list_head = NULL; void grpc_rb_channel_try_register_connection_polling(bg_watched_channel *bg); -void *wait_until_channel_polling_thread_started_no_gil(void*); +void *wait_until_channel_polling_thread_started_no_gil(void *); void *channel_init_try_register_connection_polling_without_gil(void *arg); typedef struct channel_init_try_register_stack { @@ -121,12 +119,14 @@ int abort_channel_polling = 0; int channel_polling_thread_started = 0; int bg_watched_channel_list_lookup(bg_watched_channel *bg); -bg_watched_channel *bg_watched_channel_list_create_and_add(grpc_channel *channel); +bg_watched_channel *bg_watched_channel_list_create_and_add( + grpc_channel *channel); void bg_watched_channel_list_free_and_remove(bg_watched_channel *bg); -void run_poll_channels_loop_unblocking_func(void* arg); +void run_poll_channels_loop_unblocking_func(void *arg); // Needs to be called under global_connection_polling_mu -void grpc_rb_channel_watch_connection_state_op_complete(watch_state_op* op, int success) { +void grpc_rb_channel_watch_connection_state_op_complete(watch_state_op *op, + int success) { GPR_ASSERT(!op->op.api_callback_args.called_back); op->op.api_callback_args.called_back = 1; op->op.api_callback_args.success = success; @@ -150,7 +150,7 @@ void grpc_rb_channel_safe_destroy(bg_watched_channel *bg) { } void *channel_safe_destroy_without_gil(void *arg) { - grpc_rb_channel_safe_destroy((bg_watched_channel*)arg); + grpc_rb_channel_safe_destroy((bg_watched_channel *)arg); return NULL; } @@ -186,14 +186,14 @@ void grpc_rb_channel_mark(void *p) { } rb_data_type_t grpc_channel_data_type = {"grpc_channel", - {grpc_rb_channel_mark, - grpc_rb_channel_free, - GRPC_RB_MEMSIZE_UNAVAILABLE, - {NULL, NULL}}, - NULL, - NULL, + {grpc_rb_channel_mark, + grpc_rb_channel_free, + GRPC_RB_MEMSIZE_UNAVAILABLE, + {NULL, NULL}}, + NULL, + NULL, #ifdef RUBY_TYPED_FREE_IMMEDIATELY - RUBY_TYPED_FREE_IMMEDIATELY + RUBY_TYPED_FREE_IMMEDIATELY #endif }; @@ -226,8 +226,9 @@ VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) { MEMZERO(&args, grpc_channel_args, 1); grpc_ruby_once_init(); - rb_thread_call_without_gvl(wait_until_channel_polling_thread_started_no_gil, NULL, - run_poll_channels_loop_unblocking_func, NULL); + rb_thread_call_without_gvl(wait_until_channel_polling_thread_started_no_gil, + NULL, run_poll_channels_loop_unblocking_func, + NULL); /* "3" == 3 mandatory args */ rb_scan_args(argc, argv, "3", &target, &channel_args, &credentials); @@ -252,7 +253,8 @@ VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) { stack.channel = ch; stack.wrapper = wrapper; rb_thread_call_without_gvl( - channel_init_try_register_connection_polling_without_gil, &stack, NULL, NULL); + channel_init_try_register_connection_polling_without_gil, &stack, NULL, + NULL); if (args.args != NULL) { xfree(args.args); /* Allocated by grpc_rb_hash_convert_to_channel_args */ @@ -273,7 +275,7 @@ typedef struct get_state_stack { } get_state_stack; void *get_state_without_gil(void *arg) { - get_state_stack *stack = (get_state_stack*)arg; + get_state_stack *stack = (get_state_stack *)arg; gpr_mu_lock(&global_connection_polling_mu); GPR_ASSERT(abort_channel_polling || channel_polling_thread_started); @@ -282,7 +284,8 @@ void *get_state_without_gil(void *arg) { // failed to start just always shows shutdown state. stack->out = GRPC_CHANNEL_SHUTDOWN; } else { - stack->out = grpc_channel_check_connectivity_state(stack->channel, stack->try_to_connect); + stack->out = grpc_channel_check_connectivity_state(stack->channel, + stack->try_to_connect); } gpr_mu_unlock(&global_connection_polling_mu); @@ -299,7 +302,7 @@ void *get_state_without_gil(void *arg) { It also tries to connect if the chennel is idle in the second form. */ VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv, - VALUE self) { + VALUE self) { VALUE try_to_connect_param = Qfalse; int grpc_try_to_connect = 0; grpc_rb_channel *wrapper = NULL; @@ -329,30 +332,30 @@ typedef struct watch_state_stack { } watch_state_stack; void *wait_for_watch_state_op_complete_without_gvl(void *arg) { - watch_state_stack *stack = (watch_state_stack*)arg; + watch_state_stack *stack = (watch_state_stack *)arg; watch_state_op *op = NULL; - void *success = (void*)0; + void *success = (void *)0; gpr_mu_lock(&global_connection_polling_mu); // its unsafe to do a "watch" after "channel polling abort" because the cq has // been shut down. if (abort_channel_polling) { gpr_mu_unlock(&global_connection_polling_mu); - return (void*)0; + return (void *)0; } op = gpr_zalloc(sizeof(watch_state_op)); op->op_type = WATCH_STATE_API; // one ref for this thread and another for the callback-running thread - grpc_channel_watch_connectivity_state( - stack->channel, stack->last_state, stack->deadline, channel_polling_cq, op); + grpc_channel_watch_connectivity_state(stack->channel, stack->last_state, + stack->deadline, channel_polling_cq, + op); - while(!op->op.api_callback_args.called_back) { - gpr_cv_wait(&global_connection_polling_cv, - &global_connection_polling_mu, + while (!op->op.api_callback_args.called_back) { + gpr_cv_wait(&global_connection_polling_cv, &global_connection_polling_mu, gpr_inf_future(GPR_CLOCK_REALTIME)); } if (op->op.api_callback_args.success) { - success = (void*)1; + success = (void *)1; } gpr_free(op); gpr_mu_unlock(&global_connection_polling_mu); @@ -367,12 +370,11 @@ void *wait_for_watch_state_op_complete_without_gvl(void *arg) { * Returns false if "deadline" expires before the channel's connectivity * state changes from "last_state". * */ -VALUE grpc_rb_channel_watch_connectivity_state(VALUE self, - VALUE last_state, +VALUE grpc_rb_channel_watch_connectivity_state(VALUE self, VALUE last_state, VALUE deadline) { grpc_rb_channel *wrapper = NULL; watch_state_stack stack; - void* op_success = 0; + void *op_success = 0; TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper); @@ -382,7 +384,9 @@ VALUE grpc_rb_channel_watch_connectivity_state(VALUE self, } if (!FIXNUM_P(last_state)) { - rb_raise(rb_eTypeError, "bad type for last_state. want a GRPC::Core::ChannelState constant"); + rb_raise( + rb_eTypeError, + "bad type for last_state. want a GRPC::Core::ChannelState constant"); return Qnil; } @@ -391,7 +395,8 @@ VALUE grpc_rb_channel_watch_connectivity_state(VALUE self, stack.last_state = NUM2LONG(last_state); op_success = rb_thread_call_without_gvl( - wait_for_watch_state_op_complete_without_gvl, &stack, run_poll_channels_loop_unblocking_func, NULL); + wait_for_watch_state_op_complete_without_gvl, &stack, + run_poll_channels_loop_unblocking_func, NULL); return op_success ? Qtrue : Qfalse; } @@ -399,8 +404,7 @@ VALUE grpc_rb_channel_watch_connectivity_state(VALUE self, /* Create a call given a grpc_channel, in order to call method. The request is not sent until grpc_call_invoke is called. */ VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, VALUE mask, - VALUE method, VALUE host, - VALUE deadline) { + VALUE method, VALUE host, VALUE deadline) { VALUE res = Qnil; grpc_rb_channel *wrapper = NULL; grpc_call *call = NULL; @@ -434,8 +438,8 @@ VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, VALUE mask, method_slice = grpc_slice_from_copied_buffer(RSTRING_PTR(method), RSTRING_LEN(method)); - call = grpc_channel_create_call(wrapper->bg_wrapped->channel, parent_call, flags, cq, method_slice, - host_slice_ptr, + call = grpc_channel_create_call(wrapper->bg_wrapped->channel, parent_call, + flags, cq, method_slice, host_slice_ptr, grpc_rb_time_timeval(deadline, /* absolute time */ 0), NULL); @@ -467,8 +471,8 @@ VALUE grpc_rb_channel_destroy(VALUE self) { TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper); if (wrapper->bg_wrapped != NULL) { - rb_thread_call_without_gvl( - channel_safe_destroy_without_gil, wrapper->bg_wrapped, NULL, NULL); + rb_thread_call_without_gvl(channel_safe_destroy_without_gil, + wrapper->bg_wrapped, NULL, NULL); wrapper->bg_wrapped = NULL; } @@ -504,7 +508,8 @@ int bg_watched_channel_list_lookup(bg_watched_channel *target) { } /* Needs to be called under global_connection_polling_mu */ -bg_watched_channel *bg_watched_channel_list_create_and_add(grpc_channel *channel) { +bg_watched_channel *bg_watched_channel_list_create_and_add( + grpc_channel *channel) { bg_watched_channel *watched = gpr_zalloc(sizeof(bg_watched_channel)); watched->channel = channel; @@ -540,10 +545,12 @@ void bg_watched_channel_list_free_and_remove(bg_watched_channel *target) { /* Initialize a grpc_rb_channel's "protected grpc_channel" and try to push * it onto the background thread for constant watches. */ void *channel_init_try_register_connection_polling_without_gil(void *arg) { - channel_init_try_register_stack *stack = (channel_init_try_register_stack*)arg; + channel_init_try_register_stack *stack = + (channel_init_try_register_stack *)arg; gpr_mu_lock(&global_connection_polling_mu); - stack->wrapper->bg_wrapped = bg_watched_channel_list_create_and_add(stack->channel); + stack->wrapper->bg_wrapped = + bg_watched_channel_list_create_and_add(stack->channel); grpc_rb_channel_try_register_connection_polling(stack->wrapper->bg_wrapped); gpr_mu_unlock(&global_connection_polling_mu); return NULL; @@ -581,8 +588,9 @@ void grpc_rb_channel_try_register_connection_polling(bg_watched_channel *bg) { op = gpr_zalloc(sizeof(watch_state_op)); op->op_type = CONTINUOUS_WATCH; op->op.continuous_watch_callback_args.bg = bg; - grpc_channel_watch_connectivity_state( - bg->channel, conn_state, gpr_inf_future(GPR_CLOCK_REALTIME), channel_polling_cq, op); + grpc_channel_watch_connectivity_state(bg->channel, conn_state, + gpr_inf_future(GPR_CLOCK_REALTIME), + channel_polling_cq, op); } // Note this loop breaks out with a single call of @@ -612,14 +620,15 @@ void *run_poll_channels_loop_no_gil(void *arg) { } gpr_mu_lock(&global_connection_polling_mu); if (event.type == GRPC_OP_COMPLETE) { - op = (watch_state_op*)event.tag; + op = (watch_state_op *)event.tag; if (op->op_type == CONTINUOUS_WATCH) { - bg = (bg_watched_channel*)op->op.continuous_watch_callback_args.bg; + bg = (bg_watched_channel *)op->op.continuous_watch_callback_args.bg; bg->refcount--; grpc_rb_channel_try_register_connection_polling(bg); gpr_free(op); - } else if(op->op_type == WATCH_STATE_API) { - grpc_rb_channel_watch_connection_state_op_complete((watch_state_op*)event.tag, event.success); + } else if (op->op_type == WATCH_STATE_API) { + grpc_rb_channel_watch_connection_state_op_complete( + (watch_state_op *)event.tag, event.success); } else { GPR_ASSERT(0); } @@ -627,7 +636,9 @@ void *run_poll_channels_loop_no_gil(void *arg) { gpr_mu_unlock(&global_connection_polling_mu); } grpc_completion_queue_destroy(channel_polling_cq); - gpr_log(GPR_DEBUG, "GRPC_RUBY: run_poll_channels_loop_no_gil - exit connection polling loop"); + gpr_log(GPR_DEBUG, + "GRPC_RUBY: run_poll_channels_loop_no_gil - exit connection polling " + "loop"); return NULL; } @@ -637,7 +648,9 @@ void run_poll_channels_loop_unblocking_func(void *arg) { (void)arg; gpr_mu_lock(&global_connection_polling_mu); - gpr_log(GPR_DEBUG, "GRPC_RUBY: run_poll_channels_loop_unblocking_func - begin aborting connection polling"); + gpr_log(GPR_DEBUG, + "GRPC_RUBY: run_poll_channels_loop_unblocking_func - begin aborting " + "connection polling"); // early out after first time through if (abort_channel_polling) { gpr_mu_unlock(&global_connection_polling_mu); @@ -647,7 +660,7 @@ void run_poll_channels_loop_unblocking_func(void *arg) { // force pending watches to end by switching to shutdown state bg = bg_watched_channel_list_head; - while(bg != NULL) { + while (bg != NULL) { if (!bg->channel_destroyed) { grpc_channel_destroy(bg->channel); bg->channel_destroyed = 1; @@ -658,13 +671,17 @@ void run_poll_channels_loop_unblocking_func(void *arg) { grpc_completion_queue_shutdown(channel_polling_cq); gpr_cv_broadcast(&global_connection_polling_cv); gpr_mu_unlock(&global_connection_polling_mu); - gpr_log(GPR_DEBUG, "GRPC_RUBY: run_poll_channels_loop_unblocking_func - begin aborting connection polling 22222"); + gpr_log(GPR_DEBUG, + "GRPC_RUBY: run_poll_channels_loop_unblocking_func - end aborting " + "connection polling"); } // Poll channel connectivity states in background thread without the GIL. VALUE run_poll_channels_loop(VALUE arg) { (void)arg; - gpr_log(GPR_DEBUG, "GRPC_RUBY: run_poll_channels_loop - create connection polling thread"); + gpr_log( + GPR_DEBUG, + "GRPC_RUBY: run_poll_channels_loop - create connection polling thread"); rb_thread_call_without_gvl(run_poll_channels_loop_no_gil, NULL, run_poll_channels_loop_unblocking_func, NULL); @@ -718,7 +735,8 @@ void grpc_rb_channel_polling_thread_start() { if (!RTEST(background_thread)) { gpr_log(GPR_DEBUG, "GRPC_RUBY: failed to spawn channel polling thread"); - rb_thread_call_without_gvl(set_abort_channel_polling_without_gil, NULL, NULL, NULL); + rb_thread_call_without_gvl(set_abort_channel_polling_without_gil, NULL, + NULL, NULL); } }