diff --git a/test/compiler/python_plugin_test.py b/test/compiler/python_plugin_test.py index 653a5ac58c6aeab3c358867b49b794ffe50fd3e2..0e58d912b9a44753a9d18d03e910853b87c44e1a 100644 --- a/test/compiler/python_plugin_test.py +++ b/test/compiler/python_plugin_test.py @@ -36,6 +36,7 @@ import shutil import subprocess import sys import tempfile +import threading import time import unittest @@ -49,13 +50,13 @@ STUB_IDENTIFIER = 'EarlyAdopterTestServiceStub' SERVER_FACTORY_IDENTIFIER = 'early_adopter_create_TestService_server' STUB_FACTORY_IDENTIFIER = 'early_adopter_create_TestService_stub' -# Timeouts and delays. -SHORT_TIMEOUT = 0.1 -NORMAL_TIMEOUT = 1 -LONG_TIMEOUT = 2 -DOES_NOT_MATTER_DELAY = 0 +# The timeout used in tests of RPCs that are supposed to expire. +SHORT_TIMEOUT = 2 +# The timeout used in tests of RPCs that are not supposed to expire. The +# absurdly large value doesn't matter since no passing execution of this test +# module will ever wait the duration. +LONG_TIMEOUT = 600 NO_DELAY = 0 -LONG_DELAY = 1 # Build mode environment variable set by tools/run_tests/run_tests.py. _build_mode = os.environ['CONFIG'] @@ -64,29 +65,36 @@ _build_mode = os.environ['CONFIG'] class _ServicerMethods(object): def __init__(self, test_pb2, delay): + self._condition = threading.Condition() + self._delay = delay self._paused = False - self._failed = False + self._fail = False self._test_pb2 = test_pb2 - self._delay = delay @contextlib.contextmanager def pause(self): # pylint: disable=invalid-name - self._paused = True + with self._condition: + self._paused = True yield - self._paused = False + with self._condition: + self._paused = False + self._condition.notify_all() @contextlib.contextmanager def fail(self): # pylint: disable=invalid-name - self._failed = True + with self._condition: + self._fail = True yield - self._failed = False + with self._condition: + self._fail = False def _control(self): # pylint: disable=invalid-name - if self._failed: - raise ValueError() + with self._condition: + if self._fail: + raise ValueError() + while self._paused: + self._condition.wait() time.sleep(self._delay) - while self._paused: - time.sleep(0) def UnaryCall(self, request, unused_rpc_context): response = self._test_pb2.SimpleResponse() @@ -147,9 +155,8 @@ def _CreateService(test_pb2, delay): waiting for the service. Args: - test_pb2: the test_pb2 module generated by this test - delay: delay in seconds per response from the servicer - timeout: how long the stub will wait for the servicer by default. + test_pb2: The test_pb2 module generated by this test. + delay: Delay in seconds per response from the servicer. Yields: A (servicer_methods, servicer, stub) three-tuple where servicer_methods is @@ -250,7 +257,7 @@ class PythonPluginTest(unittest.TestCase): if exc.errno != errno.ENOENT: raise - # TODO(atash): Figure out which of theses tests is hanging flakily with small + # TODO(atash): Figure out which of these tests is hanging flakily with small # probability. def testImportAttributes(self): @@ -265,34 +272,33 @@ class PythonPluginTest(unittest.TestCase): def testUpDown(self): import test_pb2 with _CreateService( - test_pb2, DOES_NOT_MATTER_DELAY) as (servicer, stub, unused_server): + test_pb2, NO_DELAY) as (servicer, stub, unused_server): request = test_pb2.SimpleRequest(response_size=13) def testUnaryCall(self): import test_pb2 # pylint: disable=g-import-not-at-top with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server): + timeout = 6 # TODO(issue 2039): LONG_TIMEOUT like the other methods. request = test_pb2.SimpleRequest(response_size=13) - response = stub.UnaryCall(request, NORMAL_TIMEOUT) + response = stub.UnaryCall(request, timeout) expected_response = methods.UnaryCall(request, 'not a real RpcContext!') self.assertEqual(expected_response, response) def testUnaryCallAsync(self): import test_pb2 # pylint: disable=g-import-not-at-top request = test_pb2.SimpleRequest(response_size=13) - with _CreateService(test_pb2, LONG_DELAY) as ( + with _CreateService(test_pb2, NO_DELAY) as ( methods, stub, unused_server): - start_time = time.clock() - response_future = stub.UnaryCall.async(request, LONG_TIMEOUT) - # Check that we didn't block on the asynchronous call. - self.assertGreater(LONG_DELAY, time.clock() - start_time) + # Check that the call does not block waiting for the server to respond. + with methods.pause(): + response_future = stub.UnaryCall.async(request, LONG_TIMEOUT) response = response_future.result() expected_response = methods.UnaryCall(request, 'not a real RpcContext!') self.assertEqual(expected_response, response) def testUnaryCallAsyncExpired(self): import test_pb2 # pylint: disable=g-import-not-at-top - # set the timeout super low... - with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( + with _CreateService(test_pb2, NO_DELAY) as ( methods, stub, unused_server): request = test_pb2.SimpleRequest(response_size=13) with methods.pause(): @@ -305,7 +311,7 @@ class PythonPluginTest(unittest.TestCase): def testUnaryCallAsyncCancelled(self): import test_pb2 # pylint: disable=g-import-not-at-top request = test_pb2.SimpleRequest(response_size=13) - with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( + with _CreateService(test_pb2, NO_DELAY) as ( methods, stub, unused_server): with methods.pause(): response_future = stub.UnaryCall.async(request, 1) @@ -315,17 +321,17 @@ class PythonPluginTest(unittest.TestCase): def testUnaryCallAsyncFailed(self): import test_pb2 # pylint: disable=g-import-not-at-top request = test_pb2.SimpleRequest(response_size=13) - with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( + with _CreateService(test_pb2, NO_DELAY) as ( methods, stub, unused_server): with methods.fail(): - response_future = stub.UnaryCall.async(request, NORMAL_TIMEOUT) + response_future = stub.UnaryCall.async(request, LONG_TIMEOUT) self.assertIsNotNone(response_future.exception()) def testStreamingOutputCall(self): import test_pb2 # pylint: disable=g-import-not-at-top request = _streaming_output_request(test_pb2) with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server): - responses = stub.StreamingOutputCall(request, NORMAL_TIMEOUT) + responses = stub.StreamingOutputCall(request, LONG_TIMEOUT) expected_responses = methods.StreamingOutputCall( request, 'not a real RpcContext!') for expected_response, response in itertools.izip_longest( @@ -337,7 +343,7 @@ class PythonPluginTest(unittest.TestCase): def testStreamingOutputCallExpired(self): import test_pb2 # pylint: disable=g-import-not-at-top request = _streaming_output_request(test_pb2) - with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( + with _CreateService(test_pb2, NO_DELAY) as ( methods, stub, unused_server): with methods.pause(): responses = stub.StreamingOutputCall(request, SHORT_TIMEOUT) @@ -349,7 +355,7 @@ class PythonPluginTest(unittest.TestCase): def testStreamingOutputCallCancelled(self): import test_pb2 # pylint: disable=g-import-not-at-top request = _streaming_output_request(test_pb2) - with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( + with _CreateService(test_pb2, NO_DELAY) as ( unused_methods, stub, unused_server): responses = stub.StreamingOutputCall(request, SHORT_TIMEOUT) next(responses) @@ -362,7 +368,7 @@ class PythonPluginTest(unittest.TestCase): def testStreamingOutputCallFailed(self): import test_pb2 # pylint: disable=g-import-not-at-top request = _streaming_output_request(test_pb2) - with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( + with _CreateService(test_pb2, NO_DELAY) as ( methods, stub, unused_server): with methods.fail(): responses = stub.StreamingOutputCall(request, 1) @@ -375,20 +381,19 @@ class PythonPluginTest(unittest.TestCase): def testStreamingInputCall(self): import test_pb2 # pylint: disable=g-import-not-at-top with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server): - response = stub.StreamingInputCall(StreamingInputRequest(test_pb2), - NORMAL_TIMEOUT) + response = stub.StreamingInputCall( + _streaming_input_request_iterator(test_pb2), LONG_TIMEOUT) expected_response = methods.StreamingInputCall( _streaming_input_request_iterator(test_pb2), 'not a real RpcContext!') self.assertEqual(expected_response, response) def testStreamingInputCallAsync(self): import test_pb2 # pylint: disable=g-import-not-at-top - with _CreateService(test_pb2, LONG_DELAY) as ( + with _CreateService(test_pb2, NO_DELAY) as ( methods, stub, unused_server): - start_time = time.clock() - response_future = stub.StreamingInputCall.async( - _streaming_input_request_iterator(test_pb2), LONG_TIMEOUT) - self.assertGreater(LONG_DELAY, time.clock() - start_time) + with methods.pause(): + response_future = stub.StreamingInputCall.async( + _streaming_input_request_iterator(test_pb2), LONG_TIMEOUT) response = response_future.result() expected_response = methods.StreamingInputCall( _streaming_input_request_iterator(test_pb2), 'not a real RpcContext!') @@ -396,8 +401,7 @@ class PythonPluginTest(unittest.TestCase): def testStreamingInputCallAsyncExpired(self): import test_pb2 # pylint: disable=g-import-not-at-top - # set the timeout super low... - with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( + with _CreateService(test_pb2, NO_DELAY) as ( methods, stub, unused_server): with methods.pause(): response_future = stub.StreamingInputCall.async( @@ -409,11 +413,12 @@ class PythonPluginTest(unittest.TestCase): def testStreamingInputCallAsyncCancelled(self): import test_pb2 # pylint: disable=g-import-not-at-top - with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( + with _CreateService(test_pb2, NO_DELAY) as ( methods, stub, unused_server): with methods.pause(): + timeout = 6 # TODO(issue 2039): LONG_TIMEOUT like the other methods. response_future = stub.StreamingInputCall.async( - _streaming_input_request_iterator(test_pb2), NORMAL_TIMEOUT) + _streaming_input_request_iterator(test_pb2), timeout) response_future.cancel() self.assertTrue(response_future.cancelled()) with self.assertRaises(future.CancelledError): @@ -421,7 +426,7 @@ class PythonPluginTest(unittest.TestCase): def testStreamingInputCallAsyncFailed(self): import test_pb2 # pylint: disable=g-import-not-at-top - with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( + with _CreateService(test_pb2, NO_DELAY) as ( methods, stub, unused_server): with methods.fail(): response_future = stub.StreamingInputCall.async( @@ -432,7 +437,7 @@ class PythonPluginTest(unittest.TestCase): import test_pb2 # pylint: disable=g-import-not-at-top with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server): responses = stub.FullDuplexCall( - _full_duplex_request_iterator(test_pb2), NORMAL_TIMEOUT) + _full_duplex_request_iterator(test_pb2), LONG_TIMEOUT) expected_responses = methods.FullDuplexCall( _full_duplex_request_iterator(test_pb2), 'not a real RpcContext!') for expected_response, response in itertools.izip_longest( @@ -444,7 +449,7 @@ class PythonPluginTest(unittest.TestCase): def testFullDuplexCallExpired(self): import test_pb2 # pylint: disable=g-import-not-at-top request_iterator = _full_duplex_request_iterator(test_pb2) - with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( + with _CreateService(test_pb2, NO_DELAY) as ( methods, stub, unused_server): with methods.pause(): responses = stub.FullDuplexCall(request_iterator, SHORT_TIMEOUT) @@ -457,7 +462,7 @@ class PythonPluginTest(unittest.TestCase): import test_pb2 # pylint: disable=g-import-not-at-top with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server): request_iterator = _full_duplex_request_iterator(test_pb2) - responses = stub.FullDuplexCall(request_iterator, NORMAL_TIMEOUT) + responses = stub.FullDuplexCall(request_iterator, LONG_TIMEOUT) next(responses) responses.cancel() with self.assertRaises(future.CancelledError): @@ -468,10 +473,10 @@ class PythonPluginTest(unittest.TestCase): def testFullDuplexCallFailed(self): import test_pb2 # pylint: disable=g-import-not-at-top request_iterator = _full_duplex_request_iterator(test_pb2) - with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( + with _CreateService(test_pb2, NO_DELAY) as ( methods, stub, unused_server): with methods.fail(): - responses = stub.FullDuplexCall(request_iterator, NORMAL_TIMEOUT) + responses = stub.FullDuplexCall(request_iterator, LONG_TIMEOUT) self.assertIsNotNone(responses) with self.assertRaises(exceptions.ServicerError): next(responses) @@ -480,7 +485,7 @@ class PythonPluginTest(unittest.TestCase): 'forever and fix.') def testHalfDuplexCall(self): import test_pb2 # pylint: disable=g-import-not-at-top - with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( + with _CreateService(test_pb2, NO_DELAY) as ( methods, stub, unused_server): def half_duplex_request_iterator(): request = test_pb2.StreamingOutputCallRequest() @@ -491,32 +496,37 @@ class PythonPluginTest(unittest.TestCase): request.response_parameters.add(size=3, interval_us=0) yield request responses = stub.HalfDuplexCall( - half_duplex_request_iterator(), NORMAL_TIMEOUT) + half_duplex_request_iterator(), LONG_TIMEOUT) expected_responses = methods.HalfDuplexCall( - HalfDuplexRequest(), 'not a real RpcContext!') + half_duplex_request_iterator(), 'not a real RpcContext!') for check in itertools.izip_longest(expected_responses, responses): expected_response, response = check self.assertEqual(expected_response, response) def testHalfDuplexCallWedged(self): import test_pb2 # pylint: disable=g-import-not-at-top + condition = threading.Condition() wait_cell = [False] @contextlib.contextmanager def wait(): # pylint: disable=invalid-name # Where's Python 3's 'nonlocal' statement when you need it? - wait_cell[0] = True + with condition: + wait_cell[0] = True yield - wait_cell[0] = False + with condition: + wait_cell[0] = False + condition.notify_all() def half_duplex_request_iterator(): request = test_pb2.StreamingOutputCallRequest() request.response_parameters.add(size=1, interval_us=0) yield request - while wait_cell[0]: - time.sleep(0.1) + with condition: + while wait_cell[0]: + condition.wait() with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server): with wait(): responses = stub.HalfDuplexCall( - half_duplex_request_iterator(), NORMAL_TIMEOUT) + half_duplex_request_iterator(), SHORT_TIMEOUT) # half-duplex waits for the client to send all info with self.assertRaises(exceptions.ExpirationError): next(responses)