diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c index 872f8e35eac6e7bba214f92e8e27da381f4a5a94..bf292fac757204515ef7dc5ea6d91f86431e2a6e 100644 --- a/src/ruby/ext/grpc/rb_call.c +++ b/src/ruby/ext/grpc/rb_call.c @@ -38,7 +38,6 @@ #include <grpc/grpc.h> #include "rb_byte_buffer.h" #include "rb_completion_queue.h" -#include "rb_event.h" #include "rb_metadata.h" #include "rb_grpc.h" diff --git a/src/ruby/ext/grpc/rb_completion_queue.c b/src/ruby/ext/grpc/rb_completion_queue.c index dfde44218b940268c75392a9ab0bc25448fa73a4..dc95838ef58c3fe75efd62a5cab9d126332abcd2 100644 --- a/src/ruby/ext/grpc/rb_completion_queue.c +++ b/src/ruby/ext/grpc/rb_completion_queue.c @@ -141,8 +141,7 @@ static VALUE grpc_rb_completion_queue_next(VALUE self, VALUE timeout) { if (next_call.event == NULL) { return Qnil; } - return Data_Wrap_Struct(rb_cEvent, GC_NOT_MARKED, grpc_rb_event_finish, - next_call.event); + return grpc_rb_new_event(next_call.event); } /* Blocks until the next event for given tag is available, and returns the @@ -160,8 +159,7 @@ static VALUE grpc_rb_completion_queue_pluck(VALUE self, VALUE tag, if (next_call.event == NULL) { return Qnil; } - return Data_Wrap_Struct(rb_cEvent, GC_NOT_MARKED, grpc_rb_event_finish, - next_call.event); + return grpc_rb_new_event(next_call.event); } /* rb_cCompletionQueue is the ruby class that proxies grpc_completion_queue. */ diff --git a/src/ruby/ext/grpc/rb_event.c b/src/ruby/ext/grpc/rb_event.c index 76ea6ad10337ffd2b6b57eb23b8188a6238c9231..9200f923cc77f85576a1c497eb3b7f8b3031e17d 100644 --- a/src/ruby/ext/grpc/rb_event.c +++ b/src/ruby/ext/grpc/rb_event.c @@ -41,12 +41,49 @@ #include "rb_call.h" #include "rb_metadata.h" +/* grpc_rb_event wraps a grpc_event. It provides a peer ruby object, + * 'mark' to minimize copying when an event is created from ruby. */ +typedef struct grpc_rb_event { + /* Holder of ruby objects involved in constructing the channel */ + VALUE mark; + /* The actual event */ + grpc_event *wrapped; +} grpc_rb_event; + + /* rb_mCompletionType is a ruby module that holds the completion type values */ VALUE rb_mCompletionType = Qnil; -/* Helper function to free an event. */ -void grpc_rb_event_finish(void *p) { - grpc_event_finish(p); +/* Destroys Event instances. */ +static void grpc_rb_event_free(void *p) { + grpc_rb_event *ev = NULL; + if (p == NULL) { + return; + }; + ev = (grpc_rb_event *)p; + + /* Deletes the wrapped object if the mark object is Qnil, which indicates + * that no other object is the actual owner. */ + if (ev->wrapped != NULL && ev->mark == Qnil) { + grpc_event_finish(ev->wrapped); + rb_warning("event gc: destroyed the c event"); + } else { + rb_warning("event gc: did not destroy the c event"); + } + + xfree(p); +} + +/* Protects the mark object from GC */ +static void grpc_rb_event_mark(void *p) { + grpc_rb_event *event = NULL; + if (p == NULL) { + return; + } + event = (grpc_rb_event *)p; + if (event->mark != Qnil) { + rb_gc_mark(event->mark); + } } static VALUE grpc_rb_event_result(VALUE self); @@ -54,7 +91,14 @@ static VALUE grpc_rb_event_result(VALUE self); /* Obtains the type of an event. */ static VALUE grpc_rb_event_type(VALUE self) { grpc_event *event = NULL; - Data_Get_Struct(self, grpc_event, event); + grpc_rb_event *wrapper = NULL; + Data_Get_Struct(self, grpc_rb_event, wrapper); + if (wrapper->wrapped == NULL) { + rb_raise(rb_eRuntimeError, "finished!"); + return Qnil; + } + + event = wrapper->wrapped; switch (event->type) { case GRPC_QUEUE_SHUTDOWN: return rb_const_get(rb_mCompletionType, rb_intern("QUEUE_SHUTDOWN")); @@ -94,7 +138,14 @@ static VALUE grpc_rb_event_type(VALUE self) { /* Obtains the tag associated with an event. */ static VALUE grpc_rb_event_tag(VALUE self) { grpc_event *event = NULL; - Data_Get_Struct(self, grpc_event, event); + grpc_rb_event *wrapper = NULL; + Data_Get_Struct(self, grpc_rb_event, wrapper); + if (wrapper->wrapped == NULL) { + rb_raise(rb_eRuntimeError, "finished!"); + return Qnil; + } + + event = wrapper->wrapped; if (event->tag == NULL) { return Qnil; } @@ -103,10 +154,17 @@ static VALUE grpc_rb_event_tag(VALUE self) { /* Obtains the call associated with an event. */ static VALUE grpc_rb_event_call(VALUE self) { - grpc_event *ev = NULL; - Data_Get_Struct(self, grpc_event, ev); - if (ev->call != NULL) { - return grpc_rb_wrap_call(ev->call); + grpc_event *event = NULL; + grpc_rb_event *wrapper = NULL; + Data_Get_Struct(self, grpc_rb_event, wrapper); + if (wrapper->wrapped == NULL) { + rb_raise(rb_eRuntimeError, "finished!"); + return Qnil; + } + + event = wrapper->wrapped; + if (event->call != NULL) { + return grpc_rb_wrap_call(event->call); } return Qnil; } @@ -114,6 +172,7 @@ static VALUE grpc_rb_event_call(VALUE self) { /* Obtains the metadata associated with an event. */ static VALUE grpc_rb_event_metadata(VALUE self) { grpc_event *event = NULL; + grpc_rb_event *wrapper = NULL; grpc_metadata *metadata = NULL; VALUE key = Qnil; VALUE new_ary = Qnil; @@ -121,9 +180,14 @@ static VALUE grpc_rb_event_metadata(VALUE self) { VALUE value = Qnil; size_t count = 0; size_t i = 0; + Data_Get_Struct(self, grpc_rb_event, wrapper); + if (wrapper->wrapped == NULL) { + rb_raise(rb_eRuntimeError, "finished!"); + return Qnil; + } /* Figure out which metadata to read. */ - Data_Get_Struct(self, grpc_event, event); + event = wrapper->wrapped; switch (event->type) { case GRPC_CLIENT_METADATA_READ: @@ -179,7 +243,13 @@ static VALUE grpc_rb_event_metadata(VALUE self) { /* Obtains the data associated with an event. */ static VALUE grpc_rb_event_result(VALUE self) { grpc_event *event = NULL; - Data_Get_Struct(self, grpc_event, event); + grpc_rb_event *wrapper = NULL; + Data_Get_Struct(self, grpc_rb_event, wrapper); + if (wrapper->wrapped == NULL) { + rb_raise(rb_eRuntimeError, "finished!"); + return Qnil; + } + event = wrapper->wrapped; switch (event->type) { @@ -245,11 +315,19 @@ static VALUE grpc_rb_event_result(VALUE self) { return Qfalse; } -/* rb_sNewServerRpc is the struct that holds new server rpc details. */ -VALUE rb_sNewServerRpc = Qnil; - -/* rb_sStatus is the struct that holds status details. */ -VALUE rb_sStatus = Qnil; +static VALUE grpc_rb_event_finish(VALUE self) { + grpc_event *event = NULL; + grpc_rb_event *wrapper = NULL; + Data_Get_Struct(self, grpc_rb_event, wrapper); + if (wrapper->wrapped == NULL) { /* already closed */ + return Qnil; + } + event = wrapper->wrapped; + grpc_event_finish(event); + wrapper->wrapped = NULL; + wrapper->mark = Qnil; + return Qnil; +} /* rb_cEvent is the Event class whose instances proxy grpc_event */ VALUE rb_cEvent = Qnil; @@ -262,9 +340,6 @@ void Init_google_rpc_event() { rb_eEventError = rb_define_class_under(rb_mGoogleRpcCore, "EventError", rb_eStandardError); rb_cEvent = rb_define_class_under(rb_mGoogleRpcCore, "Event", rb_cObject); - rb_sNewServerRpc = rb_struct_define("NewServerRpc", "method", "host", - "deadline", "metadata", NULL); - rb_sStatus = rb_struct_define("Status", "code", "details", "metadata", NULL); /* Prevent allocation or inialization from ruby. */ rb_define_alloc_func(rb_cEvent, grpc_rb_cannot_alloc); @@ -276,6 +351,8 @@ void Init_google_rpc_event() { rb_define_method(rb_cEvent, "result", grpc_rb_event_result, 0); rb_define_method(rb_cEvent, "tag", grpc_rb_event_tag, 0); rb_define_method(rb_cEvent, "type", grpc_rb_event_type, 0); + rb_define_method(rb_cEvent, "finish", grpc_rb_event_finish, 0); + rb_define_alias(rb_cEvent, "close", "finish"); /* Constants representing the completion types */ rb_mCompletionType = rb_define_module_under(rb_mGoogleRpcCore, @@ -298,3 +375,11 @@ void Init_google_rpc_event() { rb_define_const(rb_mCompletionType, "RESERVED", INT2NUM(GRPC_COMPLETION_DO_NOT_USE)); } + +VALUE grpc_rb_new_event(grpc_event *ev) { + grpc_rb_event *wrapper = ALLOC(grpc_rb_event); + wrapper->wrapped = ev; + wrapper->mark = Qnil; + return Data_Wrap_Struct(rb_cEvent, grpc_rb_event_mark, grpc_rb_event_free, + wrapper); +} diff --git a/src/ruby/ext/grpc/rb_event.h b/src/ruby/ext/grpc/rb_event.h index 459502c64796644981a218df33be69eac3105b66..e4e4a796c22376fed34f55a6417b66ce74d1a968 100644 --- a/src/ruby/ext/grpc/rb_event.h +++ b/src/ruby/ext/grpc/rb_event.h @@ -35,12 +35,7 @@ #define GRPC_RB_EVENT_H_ #include <ruby.h> - -/* rb_sNewServerRpc is the struct that holds new server rpc details. */ -extern VALUE rb_sNewServerRpc; - -/* rb_sStruct is the struct that holds status details. */ -extern VALUE rb_sStatus; +#include <grpc/grpc.h> /* rb_cEvent is the Event class whose instances proxy grpc_event. */ extern VALUE rb_cEvent; @@ -49,8 +44,8 @@ extern VALUE rb_cEvent; event processing. */ extern VALUE rb_eEventError; -/* Helper function to free an event. */ -void grpc_rb_event_finish(void *p); +/* Used to create new ruby event objects */ +VALUE grpc_rb_new_event(grpc_event *ev); /* Initializes the Event and EventError classes. */ void Init_google_rpc_event(); diff --git a/src/ruby/ext/grpc/rb_grpc.c b/src/ruby/ext/grpc/rb_grpc.c index f0e432a6bcad3de9981898af272b4337e91982e7..eae011d33bdf7835ec090e257dde011f299ef6d6 100644 --- a/src/ruby/ext/grpc/rb_grpc.c +++ b/src/ruby/ext/grpc/rb_grpc.c @@ -245,16 +245,27 @@ void grpc_rb_shutdown(void *vm) { grpc_shutdown(); } +/* Initialize the Google RPC module structs */ + +/* rb_sNewServerRpc is the struct that holds new server rpc details. */ +VALUE rb_sNewServerRpc = Qnil; +/* rb_sStatus is the struct that holds status details. */ +VALUE rb_sStatus = Qnil; + /* Initialize the Google RPC module. */ VALUE rb_mGoogle = Qnil; VALUE rb_mGoogleRPC = Qnil; VALUE rb_mGoogleRpcCore = Qnil; + void Init_grpc() { grpc_init(); ruby_vm_at_exit(grpc_rb_shutdown); rb_mGoogle = rb_define_module("Google"); rb_mGoogleRPC = rb_define_module_under(rb_mGoogle, "RPC"); rb_mGoogleRpcCore = rb_define_module_under(rb_mGoogleRPC, "Core"); + rb_sNewServerRpc = rb_struct_define("NewServerRpc", "method", "host", + "deadline", "metadata", NULL); + rb_sStatus = rb_struct_define("Status", "code", "details", "metadata", NULL); Init_google_rpc_byte_buffer(); Init_google_rpc_event(); diff --git a/src/ruby/ext/grpc/rb_grpc.h b/src/ruby/ext/grpc/rb_grpc.h index 68f8a06c30dbed159d731053d14e055bc0bca350..c2c894244fe2ffd413d6761acc3932c256d0cf77 100644 --- a/src/ruby/ext/grpc/rb_grpc.h +++ b/src/ruby/ext/grpc/rb_grpc.h @@ -47,6 +47,12 @@ extern VALUE rb_mGoogleRpcCore; /* Class used to wrap timeval structs. */ extern VALUE rb_cTimeVal; +/* rb_sNewServerRpc is the struct that holds new server rpc details. */ +extern VALUE rb_sNewServerRpc; + +/* rb_sStruct is the struct that holds status details. */ +extern VALUE rb_sStatus; + /* GC_NOT_MARKED is used in calls to Data_Wrap_Struct to indicate that the wrapped struct does not need to participate in ruby gc. */ extern const RUBY_DATA_FUNC GC_NOT_MARKED; diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index b16c8f85632fd5e1674c77d86533c05dc9b38081..288ea083e6999ccfd20e79d13c24767b079f281e 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -73,6 +73,7 @@ module Google::RPC # wait for the invocation to be accepted ev = q.pluck(invoke_accepted, INFINITE_FUTURE) raise OutOfTime if ev.nil? + ev.close [finished_tag, client_metadata_read] end @@ -191,11 +192,17 @@ module Google::RPC def writes_done(assert_finished=true) @call.writes_done(self) ev = @cq.pluck(self, INFINITE_FUTURE) - assert_event_type(ev, FINISH_ACCEPTED) - logger.debug("Writes done: waiting for finish? #{assert_finished}") + begin + assert_event_type(ev, FINISH_ACCEPTED) + logger.debug("Writes done: waiting for finish? #{assert_finished}") + ensure + ev.close + end + if assert_finished ev = @cq.pluck(@finished_tag, INFINITE_FUTURE) raise "unexpected event: #{ev.inspect}" if ev.nil? + ev.close return @call.status end end @@ -206,22 +213,21 @@ module Google::RPC # event. def finished ev = @cq.pluck(@finished_tag, INFINITE_FUTURE) - raise "unexpected event: #{ev.inspect}" unless ev.type == FINISHED - if @call.metadata.nil? - @call.metadata = ev.result.metadata - else - @call.metadata.merge!(ev.result.metadata) - end + begin + raise "unexpected event: #{ev.inspect}" unless ev.type == FINISHED + if @call.metadata.nil? + @call.metadata = ev.result.metadata + else + @call.metadata.merge!(ev.result.metadata) + end - if ev.result.code != Core::StatusCodes::OK - raise BadStatus.new(ev.result.code, ev.result.details) + if ev.result.code != Core::StatusCodes::OK + raise BadStatus.new(ev.result.code, ev.result.details) + end + res = ev.result + ensure + ev.close end - res = ev.result - - # NOTE(temiola): This is necessary to allow the C call struct wrapped - # within the active_call to be GCed; this is necessary so that other - # C-level destructors get called in the required order. - ev = nil # allow the event to be GCed res end @@ -246,8 +252,11 @@ module Google::RPC # call queue#pluck, and wait for WRITE_ACCEPTED, so as not to return # until the flow control allows another send on this call. ev = @cq.pluck(self, INFINITE_FUTURE) - assert_event_type(ev, WRITE_ACCEPTED) - ev = nil + begin + assert_event_type(ev, WRITE_ACCEPTED) + ensure + ev.close + end end # send_status sends a status to the remote endpoint @@ -260,7 +269,11 @@ module Google::RPC assert_queue_is_ready @call.start_write_status(code, details, self) ev = @cq.pluck(self, INFINITE_FUTURE) - assert_event_type(ev, FINISH_ACCEPTED) + begin + assert_event_type(ev, FINISH_ACCEPTED) + ensure + ev.close + end logger.debug("Status sent: #{code}:'#{details}'") if assert_finished return finished @@ -283,13 +296,17 @@ module Google::RPC @call.start_read(self) ev = @cq.pluck(self, INFINITE_FUTURE) - assert_event_type(ev, READ) - logger.debug("received req: #{ev.result.inspect}") - if !ev.result.nil? - logger.debug("received req.to_s: #{ev.result.to_s}") - res = @unmarshal.call(ev.result.to_s) - logger.debug("received_req (unmarshalled): #{res.inspect}") - return res + begin + assert_event_type(ev, READ) + logger.debug("received req: #{ev.result.inspect}") + if !ev.result.nil? + logger.debug("received req.to_s: #{ev.result.to_s}") + res = @unmarshal.call(ev.result.to_s) + logger.debug("received_req (unmarshalled): #{res.inspect}") + return res + end + ensure + ev.close end logger.debug('found nil; the final response has been sent') nil @@ -515,12 +532,15 @@ module Google::RPC # confirms that no events are enqueued, and that the queue is not # shutdown. def assert_queue_is_ready + ev = nil begin ev = @cq.pluck(self, ZERO) raise "unexpected event #{ev.inspect}" unless ev.nil? rescue OutOfTime # expected, nothing should be on the queue and the deadline was ZERO, # except things using another tag + ensure + ev.close unless ev.nil? end end diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb index fc9bb851aa958cd1ca1f77b41ee47b4e50c81673..066ec851acd4ec00d203e59baccf7fdb5b83e982 100644 --- a/src/ruby/lib/grpc/generic/bidi_call.rb +++ b/src/ruby/lib/grpc/generic/bidi_call.rb @@ -149,15 +149,27 @@ module Google::RPC payload = @marshal.call(req) @call.start_write(Core::ByteBuffer.new(payload), write_tag) ev = @cq.pluck(write_tag, INFINITE_FUTURE) - assert_event_type(ev, WRITE_ACCEPTED) + begin + assert_event_type(ev, WRITE_ACCEPTED) + ensure + ev.close + end end if is_client @call.writes_done(write_tag) ev = @cq.pluck(write_tag, INFINITE_FUTURE) - assert_event_type(ev, FINISH_ACCEPTED) + begin + assert_event_type(ev, FINISH_ACCEPTED) + ensure + ev.close + end logger.debug("bidi-client: sent #{count} reqs, waiting to finish") ev = @cq.pluck(@finished_tag, INFINITE_FUTURE) - assert_event_type(ev, FINISHED) + begin + assert_event_type(ev, FINISHED) + ensure + ev.close + end logger.debug('bidi-client: finished received') end rescue StandardError => e @@ -180,19 +192,23 @@ module Google::RPC count += 1 @call.start_read(read_tag) ev = @cq.pluck(read_tag, INFINITE_FUTURE) - assert_event_type(ev, READ) - - # handle the next event. - if ev.result.nil? - @readq.push(END_OF_READS) - logger.debug('done reading!') - break + begin + assert_event_type(ev, READ) + + # handle the next event. + if ev.result.nil? + @readq.push(END_OF_READS) + logger.debug('done reading!') + break + end + + # push the latest read onto the queue and continue reading + logger.debug("received req.to_s: #{ev.result.to_s}") + res = @unmarshal.call(ev.result.to_s) + @readq.push(res) + ensure + ev.close end - - # push the latest read onto the queue and continue reading - logger.debug("received req.to_s: #{ev.result.to_s}") - res = @unmarshal.call(ev.result.to_s) - @readq.push(res) end rescue StandardError => e diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index 2054d73b48a3f3ad228d4240655bcb72d7df7057..81db68804ec10dfd865df54080cb76fdce7d1ca1 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -217,18 +217,13 @@ module Google::RPC next if ev.nil? if ev.type != SERVER_RPC_NEW logger.warn("bad evt: got:#{ev.type}, want:#{SERVER_RPC_NEW}") + ev.close next end c = new_active_server_call(ev.call, ev.result) if !c.nil? mth = ev.result.method.to_sym - - # NOTE(temiola): This is necessary to allow the C call struct wrapped - # within the active_call created by the event to be GCed; this is - # necessary so that other C-level destructors get called in the - # required order. - ev = nil - + ev.close @pool.schedule(c) do |call| rpc_descs[mth].run_server_method(call, rpc_handlers[mth]) end