Skip to content
Snippets Groups Projects
Commit 97993b65 authored by Nathaniel Manista's avatar Nathaniel Manista
Browse files

Fix a thread leak bug.

Successful operations were leaking the thread used for expiration
monitoring. This change ensures that the ExpirationManager for the
operation always has its abort() method called when the
TerminationManager for the operation judges the operation to have
terminated.
parent aad03253
No related branches found
No related tags found
No related merge requests found
...@@ -215,6 +215,7 @@ def _front_operate( ...@@ -215,6 +215,7 @@ def _front_operate(
lock, termination_manager, transmission_manager, ingestion_manager, lock, termination_manager, transmission_manager, ingestion_manager,
expiration_manager) expiration_manager)
termination_manager.set_expiration_manager(expiration_manager)
transmission_manager.set_ingestion_and_expiration_managers( transmission_manager.set_ingestion_and_expiration_managers(
ingestion_manager, expiration_manager) ingestion_manager, expiration_manager)
operation_context.set_ingestion_and_expiration_managers( operation_context.set_ingestion_and_expiration_managers(
...@@ -340,6 +341,7 @@ def _back_operate( ...@@ -340,6 +341,7 @@ def _back_operate(
lock, termination_manager, transmission_manager, ingestion_manager, lock, termination_manager, transmission_manager, ingestion_manager,
expiration_manager) expiration_manager)
termination_manager.set_expiration_manager(expiration_manager)
transmission_manager.set_ingestion_and_expiration_managers( transmission_manager.set_ingestion_and_expiration_managers(
ingestion_manager, expiration_manager) ingestion_manager, expiration_manager)
operation_context.set_ingestion_and_expiration_managers( operation_context.set_ingestion_and_expiration_managers(
......
...@@ -41,6 +41,11 @@ class TerminationManager(object): ...@@ -41,6 +41,11 @@ class TerminationManager(object):
"""An object responsible for handling the termination of an operation.""" """An object responsible for handling the termination of an operation."""
__metaclass__ = abc.ABCMeta __metaclass__ = abc.ABCMeta
@abc.abstractmethod
def set_expiration_manager(self, expiration_manager):
"""Sets the ExpirationManager with which this object will cooperate."""
raise NotImplementedError()
@abc.abstractmethod @abc.abstractmethod
def is_active(self): def is_active(self):
"""Reports whether or not the operation is active. """Reports whether or not the operation is active.
...@@ -169,6 +174,7 @@ class IngestionManager(stream.Consumer): ...@@ -169,6 +174,7 @@ class IngestionManager(stream.Consumer):
@abc.abstractmethod @abc.abstractmethod
def set_expiration_manager(self, expiration_manager): def set_expiration_manager(self, expiration_manager):
"""Sets the ExpirationManager with which this object will cooperate.""" """Sets the ExpirationManager with which this object will cooperate."""
raise NotImplementedError()
@abc.abstractmethod @abc.abstractmethod
def start(self, requirement): def start(self, requirement):
......
...@@ -86,11 +86,15 @@ class _TerminationManager(_interfaces.TerminationManager): ...@@ -86,11 +86,15 @@ class _TerminationManager(_interfaces.TerminationManager):
self._action = action self._action = action
self._local_failure = local_failure self._local_failure = local_failure
self._has_locally_failed = False self._has_locally_failed = False
self._expiration_manager = None
self._outstanding_requirements = set(requirements) self._outstanding_requirements = set(requirements)
self._kind = None self._kind = None
self._callbacks = [] self._callbacks = []
def set_expiration_manager(self, expiration_manager):
self._expiration_manager = expiration_manager
def _terminate(self, kind): def _terminate(self, kind):
"""Terminates the operation. """Terminates the operation.
...@@ -100,6 +104,7 @@ class _TerminationManager(_interfaces.TerminationManager): ...@@ -100,6 +104,7 @@ class _TerminationManager(_interfaces.TerminationManager):
packets.Kind.TRANSMISSION_FAILURE, packets.Kind.SERVICER_FAILURE, or packets.Kind.TRANSMISSION_FAILURE, packets.Kind.SERVICER_FAILURE, or
packets.Kind.SERVICED_FAILURE. packets.Kind.SERVICED_FAILURE.
""" """
self._expiration_manager.abort()
self._outstanding_requirements = None self._outstanding_requirements = None
callbacks = list(self._callbacks) callbacks = list(self._callbacks)
self._callbacks = None self._callbacks = None
......
...@@ -41,6 +41,7 @@ from grpc.framework.face.testing import stock_service ...@@ -41,6 +41,7 @@ from grpc.framework.face.testing import stock_service
from grpc.framework.face.testing import test_case from grpc.framework.face.testing import test_case
_TIMEOUT = 3 _TIMEOUT = 3
_LONG_TIMEOUT = 45
class BlockingInvocationInlineServiceTestCase( class BlockingInvocationInlineServiceTestCase(
...@@ -82,7 +83,7 @@ class BlockingInvocationInlineServiceTestCase( ...@@ -82,7 +83,7 @@ class BlockingInvocationInlineServiceTestCase(
request = test_messages.request() request = test_messages.request()
response = self.stub.blocking_value_in_value_out( response = self.stub.blocking_value_in_value_out(
name, request, _TIMEOUT) name, request, _LONG_TIMEOUT)
test_messages.verify(request, response, self) test_messages.verify(request, response, self)
...@@ -93,7 +94,7 @@ class BlockingInvocationInlineServiceTestCase( ...@@ -93,7 +94,7 @@ class BlockingInvocationInlineServiceTestCase(
request = test_messages.request() request = test_messages.request()
response_iterator = self.stub.inline_value_in_stream_out( response_iterator = self.stub.inline_value_in_stream_out(
name, request, _TIMEOUT) name, request, _LONG_TIMEOUT)
responses = list(response_iterator) responses = list(response_iterator)
test_messages.verify(request, responses, self) test_messages.verify(request, responses, self)
...@@ -105,7 +106,7 @@ class BlockingInvocationInlineServiceTestCase( ...@@ -105,7 +106,7 @@ class BlockingInvocationInlineServiceTestCase(
requests = test_messages.requests() requests = test_messages.requests()
response = self.stub.blocking_stream_in_value_out( response = self.stub.blocking_stream_in_value_out(
name, iter(requests), _TIMEOUT) name, iter(requests), _LONG_TIMEOUT)
test_messages.verify(requests, response, self) test_messages.verify(requests, response, self)
...@@ -116,7 +117,7 @@ class BlockingInvocationInlineServiceTestCase( ...@@ -116,7 +117,7 @@ class BlockingInvocationInlineServiceTestCase(
requests = test_messages.requests() requests = test_messages.requests()
response_iterator = self.stub.inline_stream_in_stream_out( response_iterator = self.stub.inline_stream_in_stream_out(
name, iter(requests), _TIMEOUT) name, iter(requests), _LONG_TIMEOUT)
responses = list(response_iterator) responses = list(response_iterator)
test_messages.verify(requests, responses, self) test_messages.verify(requests, responses, self)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment