Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
G
Grpc
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Container Registry
Model registry
Operate
Environments
Monitor
Incidents
Service Desk
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
tci-gateway-module
Grpc
Commits
d48d84da
Commit
d48d84da
authored
9 years ago
by
murgatroid99
Browse files
Options
Downloads
Patches
Plain Diff
Ruby: fix some synchronization code in server implementation
parent
e7136590
No related branches found
Branches containing commit
No related tags found
Tags containing commit
No related merge requests found
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
src/ruby/.rubocop.yml
+3
-0
3 additions, 0 deletions
src/ruby/.rubocop.yml
src/ruby/lib/grpc/generic/rpc_server.rb
+55
-41
55 additions, 41 deletions
src/ruby/lib/grpc/generic/rpc_server.rb
src/ruby/spec/generic/rpc_server_spec.rb
+2
-21
2 additions, 21 deletions
src/ruby/spec/generic/rpc_server_spec.rb
with
60 additions
and
62 deletions
src/ruby/.rubocop.yml
+
3
−
0
View file @
d48d84da
...
...
@@ -15,3 +15,6 @@ Metrics/CyclomaticComplexity:
Metrics/PerceivedComplexity
:
Max
:
8
Metrics/ClassLength
:
Max
:
250
This diff is collapsed.
Click to expand it.
src/ruby/lib/grpc/generic/rpc_server.rb
+
55
−
41
View file @
d48d84da
...
...
@@ -107,7 +107,9 @@ module GRPC
# Starts running the jobs in the thread pool.
def
start
fail
'already stopped'
if
@stopped
@stop_mutex
.
synchronize
do
fail
'already stopped'
if
@stopped
end
until
@workers
.
size
==
@size
.
to_i
next_thread
=
Thread
.
new
do
catch
(
:exit
)
do
# allows { throw :exit } to kill a thread
...
...
@@ -264,10 +266,13 @@ module GRPC
@pool
=
Pool
.
new
(
@pool_size
)
@run_cond
=
ConditionVariable
.
new
@run_mutex
=
Mutex
.
new
@running
=
false
# running_state can take 4 values: :not_started, :running, :stopping, and
# :stopped. State transitions can only proceed in that order.
@running_state
=
:not_started
@server
=
RpcServer
.
setup_srv
(
server_override
,
@cq
,
**
kw
)
@stopped
=
false
@stop_mutex
=
Mutex
.
new
# Mutex to synchronize registration of services and registered service
# count. @run_mutex should not be acquired while holding @handle_mutex
@handle_mutex
=
Mutex
.
new
end
# stops a running server
...
...
@@ -275,27 +280,28 @@ module GRPC
# the call has no impact if the server is already stopped, otherwise
# server's current call loop is it's last.
def
stop
return
unless
@running
@stop_mutex
.
synchronize
do
@stopped
=
true
@run_mutex
.
synchronize
do
fail
'Cannot stop before starting'
if
@running_state
==
:not_started
return
if
@running_state
!=
:running
@running_state
=
:stopping
end
deadline
=
from_relative_time
(
@poll_period
)
return
if
@server
.
close
(
@cq
,
deadline
)
deadline
=
from_relative_time
(
@poll_period
)
@server
.
close
(
@cq
,
deadline
)
@pool
.
stop
end
# determines if the server has been stopped
def
stopped?
@stop_mutex
.
synchronize
do
return
@stopped
def
running_state
@run_mutex
.
synchronize
do
return
@running_state
end
end
# determines if the server is currently running
def
running?
@running
running_state
==
:running
end
def
stopped?
running_state
==
:stopped
end
# Is called from other threads to wait for #run to start up the server.
...
...
@@ -304,13 +310,11 @@ module GRPC
#
# @param timeout [Numeric] number of seconds to wait
# @result [true, false] true if the server is running, false otherwise
def
wait_till_running
(
timeout
=
0.1
)
end_time
,
sleep_period
=
Time
.
now
+
timeout
,
(
1.0
*
timeout
)
/
100
while
Time
.
now
<
end_time
@run_mutex
.
synchronize
{
@run_cond
.
wait
(
@run_mutex
)
}
unless
running?
sleep
(
sleep_period
)
def
wait_till_running
(
timeout
=
nil
)
@run_mutex
.
synchronize
do
@run_cond
.
wait
(
@run_mutex
,
timeout
)
if
@running_state
==
:not_started
return
@running_state
==
:running
end
running?
end
# Runs the server in its own thread, then waits for signal INT or TERM on
...
...
@@ -360,11 +364,16 @@ module GRPC
# @param service [Object|Class] a service class or object as described
# above
def
handle
(
service
)
fail
'cannot add services if the server is running'
if
running?
fail
'cannot add services if the server is stopped'
if
stopped?
cls
=
service
.
is_a?
(
Class
)
?
service
:
service
.
class
assert_valid_service_class
(
cls
)
add_rpc_descs_for
(
service
)
@run_mutex
.
synchronize
do
unless
@running_state
==
:not_started
fail
'cannot add services if the server has been started'
end
cls
=
service
.
is_a?
(
Class
)
?
service
:
service
.
class
assert_valid_service_class
(
cls
)
@handle_mutex
.
synchronize
do
add_rpc_descs_for
(
service
)
end
end
end
# runs the server
...
...
@@ -375,16 +384,13 @@ module GRPC
# - #running? returns true after this is called, until #stop cause the
# the server to stop.
def
run
if
rpc_descs
.
size
.
zero?
GRPC
.
logger
.
warn
(
'did not run as no services were present'
)
return
end
@run_mutex
.
synchronize
do
@running
=
true
@run_cond
.
signal
fail
'cannot run without registering services'
if
rpc_descs
.
size
.
zero?
@pool
.
start
@server
.
start
@running_state
=
:running
@run_cond
.
broadcast
end
@pool
.
start
@server
.
start
loop_handle_server_calls
end
...
...
@@ -413,9 +419,9 @@ module GRPC
# handles calls to the server
def
loop_handle_server_calls
fail
'not
running'
unless
@running
fail
'not
started'
if
running_state
==
:not_started
loop_tag
=
Object
.
new
until
stopped?
while
running_state
==
:running
begin
an_rpc
=
@server
.
request_call
(
@cq
,
loop_tag
,
INFINITE_FUTURE
)
break
if
(
!
an_rpc
.
nil?
)
&&
an_rpc
.
call
.
nil?
...
...
@@ -430,11 +436,14 @@ module GRPC
rescue
Core
::
CallError
,
RuntimeError
=>
e
# these might happen for various reasonse. The correct behaviour of
# the server is to log them and continue, if it's not shutting down.
GRPC
.
logger
.
warn
(
"server call failed:
#{
e
}
"
)
unless
stopped?
if
running_state
==
:running
GRPC
.
logger
.
warn
(
"server call failed:
#{
e
}
"
)
end
next
end
end
@running
=
false
# @running_state should be :stopping here
@run_mutex
.
synchronize
{
@running_state
=
:stopped
}
GRPC
.
logger
.
info
(
"stopped:
#{
self
}
"
)
end
...
...
@@ -467,11 +476,15 @@ module GRPC
protected
def
rpc_descs
@rpc_descs
||=
{}
@handle_mutex
.
synchronize
do
return
@rpc_descs
||=
{}
end
end
def
rpc_handlers
@rpc_handlers
||=
{}
@handle_mutex
.
synchronize
do
@rpc_handlers
||=
{}
end
end
def
assert_valid_service_class
(
cls
)
...
...
@@ -484,9 +497,10 @@ module GRPC
cls
.
assert_rpc_descs_have_methods
end
# This should be called while holding @handle_mutex
def
add_rpc_descs_for
(
service
)
cls
=
service
.
is_a?
(
Class
)
?
service
:
service
.
class
specs
,
handlers
=
rpc_descs
,
rpc_handlers
specs
,
handlers
=
(
@
rpc_descs
||=
{}),
(
@
rpc_handlers
||=
{})
cls
.
rpc_descs
.
each_pair
do
|
name
,
spec
|
route
=
"/
#{
cls
.
service_name
}
/
#{
name
}
"
.
to_sym
fail
"already registered: rpc
#{
route
}
from
#{
spec
}
"
if
specs
.
key?
route
...
...
This diff is collapsed.
Click to expand it.
src/ruby/spec/generic/rpc_server_spec.rb
+
2
−
21
View file @
d48d84da
...
...
@@ -220,19 +220,10 @@ describe GRPC::RpcServer do
@srv
=
RpcServer
.
new
(
**
opts
)
end
after
(
:each
)
do
@srv
.
stop
end
it
'starts out false'
do
expect
(
@srv
.
stopped?
).
to
be
(
false
)
end
it
'stays false after a #stop is called before #run'
do
@srv
.
stop
expect
(
@srv
.
stopped?
).
to
be
(
false
)
end
it
'stays false after the server starts running'
,
server:
true
do
@srv
.
handle
(
EchoService
)
t
=
Thread
.
new
{
@srv
.
run
}
...
...
@@ -247,8 +238,8 @@ describe GRPC::RpcServer do
t
=
Thread
.
new
{
@srv
.
run
}
@srv
.
wait_till_running
@srv
.
stop
expect
(
@srv
.
stopped?
).
to
be
(
true
)
t
.
join
expect
(
@srv
.
stopped?
).
to
be
(
true
)
end
end
...
...
@@ -266,9 +257,7 @@ describe GRPC::RpcServer do
server_override:
@server
}
r
=
RpcServer
.
new
(
**
opts
)
r
.
run
expect
(
r
.
running?
).
to
be
(
false
)
r
.
stop
expect
{
r
.
run
}.
to
raise_error
(
RuntimeError
)
end
it
'is true after run is called with a registered service'
do
...
...
@@ -293,10 +282,6 @@ describe GRPC::RpcServer do
@srv
=
RpcServer
.
new
(
**
@opts
)
end
after
(
:each
)
do
@srv
.
stop
end
it
'raises if #run has already been called'
do
@srv
.
handle
(
EchoService
)
t
=
Thread
.
new
{
@srv
.
run
}
...
...
@@ -528,10 +513,6 @@ describe GRPC::RpcServer do
@srv
=
RpcServer
.
new
(
**
server_opts
)
end
after
(
:each
)
do
@srv
.
stop
end
it
'should be added to BadStatus when requests fail'
,
server:
true
do
service
=
FailingService
.
new
@srv
.
handle
(
service
)
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment