Skip to content
Snippets Groups Projects
Commit d5229da0 authored by Craig Tiller's avatar Craig Tiller
Browse files

Merge pull request #16 from tbetbetbe/you-complete-me

Initial progress on ruby shutdown api migration
parents 48dd6bfb b1fa5d46
No related branches found
No related tags found
No related merge requests found
...@@ -142,8 +142,16 @@ grpc_event grpc_rb_completion_queue_pluck_event(VALUE self, VALUE tag, ...@@ -142,8 +142,16 @@ grpc_event grpc_rb_completion_queue_pluck_event(VALUE self, VALUE tag,
MEMZERO(&next_call, next_call_stack, 1); MEMZERO(&next_call, next_call_stack, 1);
TypedData_Get_Struct(self, grpc_completion_queue, TypedData_Get_Struct(self, grpc_completion_queue,
&grpc_rb_completion_queue_data_type, next_call.cq); &grpc_rb_completion_queue_data_type, next_call.cq);
next_call.timeout = grpc_rb_time_timeval(timeout, /* absolute time*/ 0); if (TYPE(timeout) == T_NIL) {
next_call.tag = ROBJECT(tag); next_call.timeout = gpr_inf_future;
} else {
next_call.timeout = grpc_rb_time_timeval(timeout, /* absolute time*/ 0);
}
if (TYPE(tag) == T_NIL) {
next_call.tag = NULL;
} else {
next_call.tag = ROBJECT(tag);
}
next_call.event.type = GRPC_QUEUE_TIMEOUT; next_call.event.type = GRPC_QUEUE_TIMEOUT;
rb_thread_call_without_gvl(grpc_rb_completion_queue_pluck_no_gil, rb_thread_call_without_gvl(grpc_rb_completion_queue_pluck_no_gil,
(void *)&next_call, NULL, NULL); (void *)&next_call, NULL, NULL);
......
...@@ -210,7 +210,7 @@ static VALUE grpc_rb_server_request_call(VALUE self, VALUE cqueue, ...@@ -210,7 +210,7 @@ static VALUE grpc_rb_server_request_call(VALUE self, VALUE cqueue,
VALUE result; VALUE result;
TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s); TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s);
if (s->wrapped == NULL) { if (s->wrapped == NULL) {
rb_raise(rb_eRuntimeError, "closed!"); rb_raise(rb_eRuntimeError, "destroyed!");
return Qnil; return Qnil;
} else { } else {
grpc_request_call_stack_init(&st); grpc_request_call_stack_init(&st);
...@@ -259,21 +259,69 @@ static VALUE grpc_rb_server_start(VALUE self) { ...@@ -259,21 +259,69 @@ static VALUE grpc_rb_server_start(VALUE self) {
grpc_rb_server *s = NULL; grpc_rb_server *s = NULL;
TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s); TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s);
if (s->wrapped == NULL) { if (s->wrapped == NULL) {
rb_raise(rb_eRuntimeError, "closed!"); rb_raise(rb_eRuntimeError, "destroyed!");
} else { } else {
grpc_server_start(s->wrapped); grpc_server_start(s->wrapped);
} }
return Qnil; return Qnil;
} }
static VALUE grpc_rb_server_destroy(VALUE self) { /*
call-seq:
cq = CompletionQueue.new
server = Server.new(cq, {'arg1': 'value1'})
... // do stuff with server
...
... // to shutdown the server
server.destroy(cq)
... // to shutdown the server with a timeout
server.destroy(cq, timeout)
Destroys server instances. */
static VALUE grpc_rb_server_destroy(int argc, VALUE *argv, VALUE self) {
VALUE cqueue = Qnil;
VALUE timeout = Qnil;
grpc_completion_queue *cq = NULL;
grpc_event ev;
grpc_rb_server *s = NULL; grpc_rb_server *s = NULL;
/* "11" == 1 mandatory args, 1 (timeout) is optional */
rb_scan_args(argc, argv, "11", &cqueue, &timeout);
cq = grpc_rb_get_wrapped_completion_queue(cqueue);
TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s); TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s);
if (s->wrapped != NULL) { if (s->wrapped != NULL) {
grpc_server_shutdown(s->wrapped); grpc_server_shutdown_and_notify(s->wrapped, cq, NULL);
ev = grpc_rb_completion_queue_pluck_event(cqueue, Qnil, timeout);
if (!ev.success) {
rb_warn("server shutdown failed, there will be a LEAKED object warning");
return Qnil;
/*
TODO: renable the rb_raise below.
At the moment if the timeout is INFINITE_FUTURE as recommended, the
pluck blocks forever, even though
the outstanding server_request_calls correctly fail on the other
thread that they are running on.
it's almost as if calls that fail on the other thread do not get
cleaned up by shutdown request, even though it caused htem to
terminate.
rb_raise(rb_eRuntimeError, "grpc server shutdown did not succeed");
return Qnil;
The workaround is just to use a timeout and return without really
shutting down the server, and rely on the grpc core garbage collection
it down as a 'LEAKED OBJECT'.
*/
}
grpc_server_destroy(s->wrapped); grpc_server_destroy(s->wrapped);
s->wrapped = NULL; s->wrapped = NULL;
s->mark = Qnil;
} }
return Qnil; return Qnil;
} }
...@@ -302,7 +350,7 @@ static VALUE grpc_rb_server_add_http2_port(int argc, VALUE *argv, VALUE self) { ...@@ -302,7 +350,7 @@ static VALUE grpc_rb_server_add_http2_port(int argc, VALUE *argv, VALUE self) {
TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s); TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s);
if (s->wrapped == NULL) { if (s->wrapped == NULL) {
rb_raise(rb_eRuntimeError, "closed!"); rb_raise(rb_eRuntimeError, "destroyed!");
return Qnil; return Qnil;
} else if (rb_creds == Qnil) { } else if (rb_creds == Qnil) {
recvd_port = grpc_server_add_http2_port(s->wrapped, StringValueCStr(port)); recvd_port = grpc_server_add_http2_port(s->wrapped, StringValueCStr(port));
...@@ -315,7 +363,7 @@ static VALUE grpc_rb_server_add_http2_port(int argc, VALUE *argv, VALUE self) { ...@@ -315,7 +363,7 @@ static VALUE grpc_rb_server_add_http2_port(int argc, VALUE *argv, VALUE self) {
creds = grpc_rb_get_wrapped_server_credentials(rb_creds); creds = grpc_rb_get_wrapped_server_credentials(rb_creds);
recvd_port = recvd_port =
grpc_server_add_secure_http2_port(s->wrapped, StringValueCStr(port), grpc_server_add_secure_http2_port(s->wrapped, StringValueCStr(port),
creds); creds);
if (recvd_port == 0) { if (recvd_port == 0) {
rb_raise(rb_eRuntimeError, rb_raise(rb_eRuntimeError,
"could not add secure port %s to server, not sure why", "could not add secure port %s to server, not sure why",
...@@ -341,7 +389,7 @@ void Init_grpc_server() { ...@@ -341,7 +389,7 @@ void Init_grpc_server() {
rb_define_method(grpc_rb_cServer, "request_call", rb_define_method(grpc_rb_cServer, "request_call",
grpc_rb_server_request_call, 3); grpc_rb_server_request_call, 3);
rb_define_method(grpc_rb_cServer, "start", grpc_rb_server_start, 0); rb_define_method(grpc_rb_cServer, "start", grpc_rb_server_start, 0);
rb_define_method(grpc_rb_cServer, "destroy", grpc_rb_server_destroy, 0); rb_define_method(grpc_rb_cServer, "destroy", grpc_rb_server_destroy, -1);
rb_define_alias(grpc_rb_cServer, "close", "destroy"); rb_define_alias(grpc_rb_cServer, "close", "destroy");
rb_define_method(grpc_rb_cServer, "add_http2_port", rb_define_method(grpc_rb_cServer, "add_http2_port",
grpc_rb_server_add_http2_port, grpc_rb_server_add_http2_port,
......
...@@ -278,7 +278,9 @@ module GRPC ...@@ -278,7 +278,9 @@ module GRPC
@stopped = true @stopped = true
end end
@pool.stop @pool.stop
@server.close deadline = from_relative_time(@poll_period)
@server.close(@cq, deadline)
end end
# determines if the server has been stopped # determines if the server has been stopped
...@@ -410,17 +412,18 @@ module GRPC ...@@ -410,17 +412,18 @@ module GRPC
# handles calls to the server # handles calls to the server
def loop_handle_server_calls def loop_handle_server_calls
fail 'not running' unless @running fail 'not running' unless @running
request_call_tag = Object.new loop_tag = Object.new
until stopped? until stopped?
deadline = from_relative_time(@poll_period) deadline = from_relative_time(@poll_period)
begin begin
an_rpc = @server.request_call(@cq, request_call_tag, deadline) an_rpc = @server.request_call(@cq, loop_tag, deadline)
c = new_active_server_call(an_rpc)
rescue Core::CallError, RuntimeError => e rescue Core::CallError, RuntimeError => e
# can happen during server shutdown # these might happen for various reasonse. The correct behaviour of
# the server is to log them and continue.
GRPC.logger.warn("server call failed: #{e}") GRPC.logger.warn("server call failed: #{e}")
next next
end end
c = new_active_server_call(an_rpc)
unless c.nil? unless c.nil?
mth = an_rpc.method.to_sym mth = an_rpc.method.to_sym
@pool.schedule(c) do |call| @pool.schedule(c) do |call|
......
...@@ -42,11 +42,8 @@ shared_context 'setup: tags' do ...@@ -42,11 +42,8 @@ shared_context 'setup: tags' do
let(:sent_message) { 'sent message' } let(:sent_message) { 'sent message' }
let(:reply_text) { 'the reply' } let(:reply_text) { 'the reply' }
before(:example) do before(:example) do
@server_finished_tag = Object.new @client_tag = Object.new
@client_finished_tag = Object.new
@client_metadata_tag = Object.new
@server_tag = Object.new @server_tag = Object.new
@tag = Object.new
end end
def deadline def deadline
...@@ -351,7 +348,7 @@ describe 'the http client/server' do ...@@ -351,7 +348,7 @@ describe 'the http client/server' do
after(:example) do after(:example) do
@ch.close @ch.close
@server.close @server.close(@server_queue, deadline)
end end
it_behaves_like 'basic GRPC message delivery is OK' do it_behaves_like 'basic GRPC message delivery is OK' do
...@@ -377,7 +374,7 @@ describe 'the secure http client/server' do ...@@ -377,7 +374,7 @@ describe 'the secure http client/server' do
end end
after(:example) do after(:example) do
@server.close @server.close(@server_queue, deadline)
end end
it_behaves_like 'basic GRPC message delivery is OK' do it_behaves_like 'basic GRPC message delivery is OK' do
......
...@@ -51,7 +51,7 @@ describe GRPC::ActiveCall do ...@@ -51,7 +51,7 @@ describe GRPC::ActiveCall do
end end
after(:each) do after(:each) do
@server.close @server.close(@server_queue, deadline)
end end
describe 'restricted view methods' do describe 'restricted view methods' do
......
...@@ -54,6 +54,7 @@ describe 'ClientStub' do ...@@ -54,6 +54,7 @@ describe 'ClientStub' do
before(:each) do before(:each) do
Thread.abort_on_exception = true Thread.abort_on_exception = true
@server = nil @server = nil
@server_queue = nil
@method = 'an_rpc_method' @method = 'an_rpc_method'
@pass = OK @pass = OK
@fail = INTERNAL @fail = INTERNAL
...@@ -61,7 +62,7 @@ describe 'ClientStub' do ...@@ -61,7 +62,7 @@ describe 'ClientStub' do
end end
after(:each) do after(:each) do
@server.close unless @server.nil? @server.close(@server_queue) unless @server_queue.nil?
end end
describe '#new' do describe '#new' do
......
...@@ -136,10 +136,6 @@ describe GRPC::RpcServer do ...@@ -136,10 +136,6 @@ describe GRPC::RpcServer do
@ch = GRPC::Core::Channel.new(@host, nil) @ch = GRPC::Core::Channel.new(@host, nil)
end end
after(:each) do
@server.close
end
describe '#new' do describe '#new' do
it 'can be created with just some args' do it 'can be created with just some args' do
opts = { a_channel_arg: 'an_arg' } opts = { a_channel_arg: 'an_arg' }
...@@ -344,10 +340,6 @@ describe GRPC::RpcServer do ...@@ -344,10 +340,6 @@ describe GRPC::RpcServer do
@srv = RpcServer.new(**server_opts) @srv = RpcServer.new(**server_opts)
end end
after(:each) do
@srv.stop
end
it 'should return NOT_FOUND status on unknown methods', server: true do it 'should return NOT_FOUND status on unknown methods', server: true do
@srv.handle(EchoService) @srv.handle(EchoService)
t = Thread.new { @srv.run } t = Thread.new { @srv.run }
...@@ -527,10 +519,6 @@ describe GRPC::RpcServer do ...@@ -527,10 +519,6 @@ describe GRPC::RpcServer do
@srv = RpcServer.new(**server_opts) @srv = RpcServer.new(**server_opts)
end end
after(:each) do
@srv.stop
end
it 'should send connect metadata to the client', server: true do it 'should send connect metadata to the client', server: true do
service = EchoService.new service = EchoService.new
@srv.handle(service) @srv.handle(service)
......
...@@ -54,7 +54,7 @@ describe Server do ...@@ -54,7 +54,7 @@ describe Server do
it 'fails if the server is closed' do it 'fails if the server is closed' do
s = Server.new(@cq, nil) s = Server.new(@cq, nil)
s.close s.close(@cq)
expect { s.start }.to raise_error(RuntimeError) expect { s.start }.to raise_error(RuntimeError)
end end
end end
...@@ -62,19 +62,19 @@ describe Server do ...@@ -62,19 +62,19 @@ describe Server do
describe '#destroy' do describe '#destroy' do
it 'destroys a server ok' do it 'destroys a server ok' do
s = start_a_server s = start_a_server
blk = proc { s.destroy } blk = proc { s.destroy(@cq) }
expect(&blk).to_not raise_error expect(&blk).to_not raise_error
end end
it 'can be called more than once without error' do it 'can be called more than once without error' do
s = start_a_server s = start_a_server
begin begin
blk = proc { s.destroy } blk = proc { s.destroy(@cq) }
expect(&blk).to_not raise_error expect(&blk).to_not raise_error
blk.call blk.call
expect(&blk).to_not raise_error expect(&blk).to_not raise_error
ensure ensure
s.close s.close(@cq)
end end
end end
end end
...@@ -83,16 +83,16 @@ describe Server do ...@@ -83,16 +83,16 @@ describe Server do
it 'closes a server ok' do it 'closes a server ok' do
s = start_a_server s = start_a_server
begin begin
blk = proc { s.close } blk = proc { s.close(@cq) }
expect(&blk).to_not raise_error expect(&blk).to_not raise_error
ensure ensure
s.close s.close(@cq)
end end
end end
it 'can be called more than once without error' do it 'can be called more than once without error' do
s = start_a_server s = start_a_server
blk = proc { s.close } blk = proc { s.close(@cq) }
expect(&blk).to_not raise_error expect(&blk).to_not raise_error
blk.call blk.call
expect(&blk).to_not raise_error expect(&blk).to_not raise_error
...@@ -105,14 +105,14 @@ describe Server do ...@@ -105,14 +105,14 @@ describe Server do
blk = proc do blk = proc do
s = Server.new(@cq, nil) s = Server.new(@cq, nil)
s.add_http2_port('localhost:0') s.add_http2_port('localhost:0')
s.close s.close(@cq)
end end
expect(&blk).to_not raise_error expect(&blk).to_not raise_error
end end
it 'fails if the server is closed' do it 'fails if the server is closed' do
s = Server.new(@cq, nil) s = Server.new(@cq, nil)
s.close s.close(@cq)
expect { s.add_http2_port('localhost:0') }.to raise_error(RuntimeError) expect { s.add_http2_port('localhost:0') }.to raise_error(RuntimeError)
end end
end end
...@@ -123,14 +123,14 @@ describe Server do ...@@ -123,14 +123,14 @@ describe Server do
blk = proc do blk = proc do
s = Server.new(@cq, nil) s = Server.new(@cq, nil)
s.add_http2_port('localhost:0', cert) s.add_http2_port('localhost:0', cert)
s.close s.close(@cq)
end end
expect(&blk).to_not raise_error expect(&blk).to_not raise_error
end end
it 'fails if the server is closed' do it 'fails if the server is closed' do
s = Server.new(@cq, nil) s = Server.new(@cq, nil)
s.close s.close(@cq)
blk = proc { s.add_http2_port('localhost:0', cert) } blk = proc { s.add_http2_port('localhost:0', cert) }
expect(&blk).to raise_error(RuntimeError) expect(&blk).to raise_error(RuntimeError)
end end
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment