diff --git a/src/python/grpcio_testing/grpc_testing/_server/_servicer_context.py b/src/python/grpcio_testing/grpc_testing/_server/_servicer_context.py index 6fa8c6b3ba894f15f40f0ca6d6b8e16983f07d46..c63750f978597cd8046a802c541c0e7866778bbd 100644 --- a/src/python/grpcio_testing/grpc_testing/_server/_servicer_context.py +++ b/src/python/grpcio_testing/grpc_testing/_server/_servicer_context.py @@ -76,6 +76,7 @@ class ServicerContext(grpc.ServicerContext): def abort(self, code, details): with self._rpc._condition: self._rpc._abort(code, details) + raise Exception() def abort_with_status(self, status): raise NotImplementedError() diff --git a/src/python/grpcio_tests/tests/testing/_application_common.py b/src/python/grpcio_tests/tests/testing/_application_common.py index 4e98879607a54832398d61b8e5cf85809c646016..b7d5b236051182fc204eea7a4a78bdb2bdfba2c0 100644 --- a/src/python/grpcio_tests/tests/testing/_application_common.py +++ b/src/python/grpcio_tests/tests/testing/_application_common.py @@ -32,5 +32,10 @@ STREAM_UNARY_RESPONSE = services_pb2.Strange(first_strange_field=17) STREAM_STREAM_REQUEST = requests_pb2.Top(first_top_field=19) STREAM_STREAM_RESPONSE = services_pb2.Bottom(first_bottom_field=23) TWO_STREAM_STREAM_RESPONSES = (STREAM_STREAM_RESPONSE,) * 2 +ABORT_REQUEST = requests_pb2.Up(first_up_field=42) +ABORT_SUCCESS_QUERY = requests_pb2.Up(first_up_field=43) +ABORT_NO_STATUS_RESPONSE = services_pb2.Down(first_down_field=50) +ABORT_SUCCESS_RESPONSE = services_pb2.Down(first_down_field=51) +ABORT_FAILURE_RESPONSE = services_pb2.Down(first_down_field=52) INFINITE_REQUEST_STREAM_TIMEOUT = 0.2 diff --git a/src/python/grpcio_tests/tests/testing/_server_application.py b/src/python/grpcio_tests/tests/testing/_server_application.py index 243c385dafdb5944e4ab57b06826e4f1ab08469a..1dc5e8f391775edd49db1be612670af082b49b26 100644 --- a/src/python/grpcio_tests/tests/testing/_server_application.py +++ b/src/python/grpcio_tests/tests/testing/_server_application.py @@ -15,6 +15,8 @@ import grpc +import threading + # requests_pb2 is a semantic dependency of this module. from tests.testing import _application_common from tests.testing.proto import requests_pb2 # pylint: disable=unused-import @@ -25,9 +27,26 @@ from tests.testing.proto import services_pb2_grpc class FirstServiceServicer(services_pb2_grpc.FirstServiceServicer): """Services RPCs.""" + def __init__(self): + self._abort_lock = threading.RLock() + self._abort_response = _application_common.ABORT_NO_STATUS_RESPONSE + def UnUn(self, request, context): - if _application_common.UNARY_UNARY_REQUEST == request: + if request == _application_common.UNARY_UNARY_REQUEST: return _application_common.UNARY_UNARY_RESPONSE + elif request == _application_common.ABORT_REQUEST: + with self._abort_lock: + try: + context.abort(grpc.StatusCode.PERMISSION_DENIED, + "Denying permission to test abort.") + except Exception as e: # pylint: disable=broad-except + self._abort_response = _application_common.ABORT_SUCCESS_RESPONSE + else: + self._abort_status = _application_common.ABORT_FAILURE_RESPONSE + return None # NOTE: For the linter. + elif request == _application_common.ABORT_SUCCESS_QUERY: + with self._abort_lock: + return self._abort_response else: context.set_code(grpc.StatusCode.INVALID_ARGUMENT) context.set_details('Something is wrong with your request!') diff --git a/src/python/grpcio_tests/tests/testing/_server_test.py b/src/python/grpcio_tests/tests/testing/_server_test.py index 88e3a79ae5961b78f6c31b285aff0a3c609d8e44..45975f229bc8a99654df8f640790dcd35a4fa7bd 100644 --- a/src/python/grpcio_tests/tests/testing/_server_test.py +++ b/src/python/grpcio_tests/tests/testing/_server_test.py @@ -164,6 +164,19 @@ class FirstServiceServicerTest(unittest.TestCase): self.assertIs(code, grpc.StatusCode.DEADLINE_EXCEEDED) + def test_servicer_context_abort(self): + rpc = self._real_time_server.invoke_unary_unary( + _application_testing_common.FIRST_SERVICE_UNUN, (), + _application_common.ABORT_REQUEST, None) + _, _, code, _ = rpc.termination() + self.assertIs(code, grpc.StatusCode.PERMISSION_DENIED) + rpc = self._real_time_server.invoke_unary_unary( + _application_testing_common.FIRST_SERVICE_UNUN, (), + _application_common.ABORT_SUCCESS_QUERY, None) + response, _, code, _ = rpc.termination() + self.assertEqual(_application_common.ABORT_SUCCESS_RESPONSE, response) + self.assertIs(code, grpc.StatusCode.OK) + if __name__ == '__main__': unittest.main(verbosity=2)