Skip to content
Snippets Groups Projects
Commit 2274c7f0 authored by Nathaniel Manista's avatar Nathaniel Manista
Browse files

Merge pull request #5537 from soltanmm/triceratops-again

Maintain correct invariants against core.
parents f7d6d382 18720fff
No related branches found
No related tags found
No related merge requests found
# Copyright 2015, Google Inc. # Copyright 2015-2016, Google Inc.
# All rights reserved. # All rights reserved.
# #
# Redistribution and use in source and binary forms, with or without # Redistribution and use in source and binary forms, with or without
...@@ -31,8 +31,9 @@ ...@@ -31,8 +31,9 @@
cdef class CompletionQueue: cdef class CompletionQueue:
cdef grpc_completion_queue *c_completion_queue cdef grpc_completion_queue *c_completion_queue
cdef object poll_condition cdef object pluck_condition
cdef bint is_polling cdef int num_plucking
cdef int num_polling
cdef bint is_shutting_down cdef bint is_shutting_down
cdef bint is_shutdown cdef bint is_shutdown
......
...@@ -39,8 +39,9 @@ cdef class CompletionQueue: ...@@ -39,8 +39,9 @@ cdef class CompletionQueue:
self.c_completion_queue = grpc_completion_queue_create(NULL) self.c_completion_queue = grpc_completion_queue_create(NULL)
self.is_shutting_down = False self.is_shutting_down = False
self.is_shutdown = False self.is_shutdown = False
self.poll_condition = threading.Condition() self.pluck_condition = threading.Condition()
self.is_polling = False self.num_plucking = 0
self.num_polling = 0
cdef _interpret_event(self, grpc_event event): cdef _interpret_event(self, grpc_event event):
cdef OperationTag tag = None cdef OperationTag tag = None
...@@ -87,19 +88,15 @@ cdef class CompletionQueue: ...@@ -87,19 +88,15 @@ cdef class CompletionQueue:
c_deadline = deadline.c_time c_deadline = deadline.c_time
cdef grpc_event event cdef grpc_event event
# Poll within a critical section # Poll within a critical section to detect contention
# TODO(atash) consider making queue polling contention a hard error to with self.pluck_condition:
# enable easier bug discovery assert self.num_plucking == 0, 'cannot simultaneously pluck and poll'
with self.poll_condition: self.num_polling += 1
while self.is_polling:
self.poll_condition.wait(float(deadline) - time.time())
self.is_polling = True
with nogil: with nogil:
event = grpc_completion_queue_next( event = grpc_completion_queue_next(
self.c_completion_queue, c_deadline, NULL) self.c_completion_queue, c_deadline, NULL)
with self.poll_condition: with self.pluck_condition:
self.is_polling = False self.num_polling -= 1
self.poll_condition.notify()
return self._interpret_event(event) return self._interpret_event(event)
def pluck(self, OperationTag tag, Timespec deadline=None): def pluck(self, OperationTag tag, Timespec deadline=None):
...@@ -111,19 +108,18 @@ cdef class CompletionQueue: ...@@ -111,19 +108,18 @@ cdef class CompletionQueue:
c_deadline = deadline.c_time c_deadline = deadline.c_time
cdef grpc_event event cdef grpc_event event
# Poll within a critical section # Pluck within a critical section to detect contention
# TODO(atash) consider making queue polling contention a hard error to with self.pluck_condition:
# enable easier bug discovery assert self.num_polling == 0, 'cannot simultaneously pluck and poll'
with self.poll_condition: assert self.num_plucking < GRPC_MAX_COMPLETION_QUEUE_PLUCKERS, (
while self.is_polling: 'cannot pluck more than {} times simultaneously'.format(
self.poll_condition.wait(float(deadline) - time.time()) GRPC_MAX_COMPLETION_QUEUE_PLUCKERS))
self.is_polling = True self.num_plucking += 1
with nogil: with nogil:
event = grpc_completion_queue_pluck( event = grpc_completion_queue_pluck(
self.c_completion_queue, <cpython.PyObject *>tag, c_deadline, NULL) self.c_completion_queue, <cpython.PyObject *>tag, c_deadline, NULL)
with self.poll_condition: with self.pluck_condition:
self.is_polling = False self.num_plucking -= 1
self.poll_condition.notify()
return self._interpret_event(event) return self._interpret_event(event)
def shutdown(self): def shutdown(self):
......
...@@ -138,6 +138,8 @@ cdef extern from "grpc/_cython/loader.h": ...@@ -138,6 +138,8 @@ cdef extern from "grpc/_cython/loader.h":
const int GRPC_WRITE_NO_COMPRESS const int GRPC_WRITE_NO_COMPRESS
const int GRPC_WRITE_USED_MASK const int GRPC_WRITE_USED_MASK
const int GRPC_MAX_COMPLETION_QUEUE_PLUCKERS
ctypedef struct grpc_completion_queue: ctypedef struct grpc_completion_queue:
# We don't care about the internals (and in fact don't know them) # We don't care about the internals (and in fact don't know them)
pass pass
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment