Skip to content
Snippets Groups Projects
Commit 18720fff authored by Masood Malekghassemi's avatar Masood Malekghassemi
Browse files

Maintain correct queue invariants against core

parent 16e8bbbf
No related branches found
No related tags found
No related merge requests found
# Copyright 2015, Google Inc.
# Copyright 2015-2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
......@@ -31,8 +31,9 @@
cdef class CompletionQueue:
cdef grpc_completion_queue *c_completion_queue
cdef object poll_condition
cdef bint is_polling
cdef object pluck_condition
cdef int num_plucking
cdef int num_polling
cdef bint is_shutting_down
cdef bint is_shutdown
......
......@@ -39,8 +39,9 @@ cdef class CompletionQueue:
self.c_completion_queue = grpc_completion_queue_create(NULL)
self.is_shutting_down = False
self.is_shutdown = False
self.poll_condition = threading.Condition()
self.is_polling = False
self.pluck_condition = threading.Condition()
self.num_plucking = 0
self.num_polling = 0
cdef _interpret_event(self, grpc_event event):
cdef OperationTag tag = None
......@@ -87,19 +88,15 @@ cdef class CompletionQueue:
c_deadline = deadline.c_time
cdef grpc_event event
# Poll within a critical section
# TODO(atash) consider making queue polling contention a hard error to
# enable easier bug discovery
with self.poll_condition:
while self.is_polling:
self.poll_condition.wait(float(deadline) - time.time())
self.is_polling = True
# Poll within a critical section to detect contention
with self.pluck_condition:
assert self.num_plucking == 0, 'cannot simultaneously pluck and poll'
self.num_polling += 1
with nogil:
event = grpc_completion_queue_next(
self.c_completion_queue, c_deadline, NULL)
with self.poll_condition:
self.is_polling = False
self.poll_condition.notify()
with self.pluck_condition:
self.num_polling -= 1
return self._interpret_event(event)
def pluck(self, OperationTag tag, Timespec deadline=None):
......@@ -111,19 +108,18 @@ cdef class CompletionQueue:
c_deadline = deadline.c_time
cdef grpc_event event
# Poll within a critical section
# TODO(atash) consider making queue polling contention a hard error to
# enable easier bug discovery
with self.poll_condition:
while self.is_polling:
self.poll_condition.wait(float(deadline) - time.time())
self.is_polling = True
# Pluck within a critical section to detect contention
with self.pluck_condition:
assert self.num_polling == 0, 'cannot simultaneously pluck and poll'
assert self.num_plucking < GRPC_MAX_COMPLETION_QUEUE_PLUCKERS, (
'cannot pluck more than {} times simultaneously'.format(
GRPC_MAX_COMPLETION_QUEUE_PLUCKERS))
self.num_plucking += 1
with nogil:
event = grpc_completion_queue_pluck(
self.c_completion_queue, <cpython.PyObject *>tag, c_deadline, NULL)
with self.poll_condition:
self.is_polling = False
self.poll_condition.notify()
with self.pluck_condition:
self.num_plucking -= 1
return self._interpret_event(event)
def shutdown(self):
......
......@@ -138,6 +138,8 @@ cdef extern from "grpc/_cython/loader.h":
const int GRPC_WRITE_NO_COMPRESS
const int GRPC_WRITE_USED_MASK
const int GRPC_MAX_COMPLETION_QUEUE_PLUCKERS
ctypedef struct grpc_completion_queue:
# We don't care about the internals (and in fact don't know them)
pass
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment