Skip to content
Snippets Groups Projects
Commit aee9d1aa authored by Ken Payson's avatar Ken Payson
Browse files

Added check for signals in poll()

Also removed the unused pluck() function
parent 698d3e91
No related branches found
No related tags found
No related merge requests found
......@@ -31,9 +31,6 @@
cdef class CompletionQueue:
cdef grpc_completion_queue *c_completion_queue
cdef object pluck_condition
cdef int num_plucking
cdef int num_polling
cdef bint is_shutting_down
cdef bint is_shutdown
......
......@@ -32,6 +32,8 @@ cimport cpython
import threading
import time
cdef int _INTERRUPT_CHECK_PERIOD_MS = 200
cdef class CompletionQueue:
......@@ -40,9 +42,6 @@ cdef class CompletionQueue:
self.c_completion_queue = grpc_completion_queue_create(NULL)
self.is_shutting_down = False
self.is_shutdown = False
self.pluck_condition = threading.Condition()
self.num_plucking = 0
self.num_polling = 0
cdef _interpret_event(self, grpc_event event):
cdef OperationTag tag = None
......@@ -83,45 +82,27 @@ cdef class CompletionQueue:
def poll(self, Timespec deadline=None):
# We name this 'poll' to avoid problems with CPython's expectations for
# 'special' methods (like next and __next__).
cdef gpr_timespec c_increment
cdef gpr_timespec c_timeout
cdef gpr_timespec c_deadline
with nogil:
c_increment = gpr_time_from_millis(_INTERRUPT_CHECK_PERIOD_MS, GPR_TIMESPAN)
c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME)
if deadline is not None:
c_deadline = deadline.c_time
cdef grpc_event event
# Poll within a critical section to detect contention
with self.pluck_condition:
assert self.num_plucking == 0, 'cannot simultaneously pluck and poll'
self.num_polling += 1
with nogil:
event = grpc_completion_queue_next(
self.c_completion_queue, c_deadline, NULL)
with self.pluck_condition:
self.num_polling -= 1
return self._interpret_event(event)
def pluck(self, OperationTag tag, Timespec deadline=None):
# Plucking a 'None' tag is equivalent to passing control to GRPC core until
# the deadline.
cdef gpr_timespec c_deadline = gpr_inf_future(
GPR_CLOCK_REALTIME)
if deadline is not None:
c_deadline = deadline.c_time
cdef grpc_event event
# Pluck within a critical section to detect contention
with self.pluck_condition:
assert self.num_polling == 0, 'cannot simultaneously pluck and poll'
assert self.num_plucking < GRPC_MAX_COMPLETION_QUEUE_PLUCKERS, (
'cannot pluck more than {} times simultaneously'.format(
GRPC_MAX_COMPLETION_QUEUE_PLUCKERS))
self.num_plucking += 1
with nogil:
event = grpc_completion_queue_pluck(
self.c_completion_queue, <cpython.PyObject *>tag, c_deadline, NULL)
with self.pluck_condition:
self.num_plucking -= 1
if deadline is not None:
c_deadline = deadline.c_time
while True:
c_timeout = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), c_increment)
if gpr_time_cmp(c_timeout, c_deadline) > 0:
c_timeout = c_deadline
event = grpc_completion_queue_next(
self.c_completion_queue, c_timeout, NULL)
if event.type != GRPC_QUEUE_TIMEOUT or gpr_time_cmp(c_timeout, c_deadline) == 0:
break;
# Handle any signals
with gil:
cpython.PyErr_CheckSignals()
return self._interpret_event(event)
def shutdown(self):
......
......@@ -80,6 +80,12 @@ cdef extern from "grpc/_cython/loader.h":
gpr_timespec gpr_convert_clock_type(gpr_timespec t,
gpr_clock_type target_clock) nogil
gpr_timespec gpr_time_from_millis(int64_t ms, gpr_clock_type type) nogil
gpr_timespec gpr_time_add(gpr_timespec a, gpr_timespec b) nogil
int gpr_time_cmp(gpr_timespec a, gpr_timespec b) nogil
ctypedef enum grpc_status_code:
GRPC_STATUS_OK
GRPC_STATUS_CANCELLED
......
......@@ -99,7 +99,7 @@ cdef class Server:
with nogil:
grpc_server_start(self.c_server)
# Ensure the core has gotten a chance to do the start-up work
self.backup_shutdown_queue.pluck(None, Timespec(None))
self.backup_shutdown_queue.poll(Timespec(None))
def add_http2_port(self, address,
ServerCredentials server_credentials=None):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment