Skip to content
Snippets Groups Projects
Commit 0c3ea08b authored by Jan Tattermusch's avatar Jan Tattermusch Committed by GitHub
Browse files

Merge pull request #6970 from nathanielmanistaatgoogle/with_call

Change with_call from (Unary, Stream)UnaryMultiCallable parameter to (Unary, Stream)UnaryMultiCallable attribute
parents 5988716d ffaafe6f
No related branches found
No related tags found
No related merge requests found
...@@ -436,9 +436,7 @@ class UnaryUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)): ...@@ -436,9 +436,7 @@ class UnaryUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
"""Affords invoking a unary-unary RPC.""" """Affords invoking a unary-unary RPC."""
@abc.abstractmethod @abc.abstractmethod
def __call__( def __call__(self, request, timeout=None, metadata=None, credentials=None):
self, request, timeout=None, metadata=None, credentials=None,
with_call=False):
"""Synchronously invokes the underlying RPC. """Synchronously invokes the underlying RPC.
Args: Args:
...@@ -447,12 +445,30 @@ class UnaryUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)): ...@@ -447,12 +445,30 @@ class UnaryUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
metadata: An optional sequence of pairs of bytes to be transmitted to the metadata: An optional sequence of pairs of bytes to be transmitted to the
service-side of the RPC. service-side of the RPC.
credentials: An optional CallCredentials for the RPC. credentials: An optional CallCredentials for the RPC.
with_call: Whether or not to include return a Call for the RPC in addition
to the response.
Returns: Returns:
The response value for the RPC, and a Call for the RPC if with_call was The response value for the RPC.
set to True at invocation.
Raises:
RpcError: Indicating that the RPC terminated with non-OK status. The
raised RpcError will also be a Call for the RPC affording the RPC's
metadata, status code, and details.
"""
raise NotImplementedError()
@abc.abstractmethod
def with_call(self, request, timeout=None, metadata=None, credentials=None):
"""Synchronously invokes the underlying RPC.
Args:
request: The request value for the RPC.
timeout: An optional durating of time in seconds to allow for the RPC.
metadata: An optional sequence of pairs of bytes to be transmitted to the
service-side of the RPC.
credentials: An optional CallCredentials for the RPC.
Returns:
The response value for the RPC and a Call value for the RPC.
Raises: Raises:
RpcError: Indicating that the RPC terminated with non-OK status. The RpcError: Indicating that the RPC terminated with non-OK status. The
...@@ -508,8 +524,7 @@ class StreamUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)): ...@@ -508,8 +524,7 @@ class StreamUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
@abc.abstractmethod @abc.abstractmethod
def __call__( def __call__(
self, request_iterator, timeout=None, metadata=None, credentials=None, self, request_iterator, timeout=None, metadata=None, credentials=None):
with_call=False):
"""Synchronously invokes the underlying RPC. """Synchronously invokes the underlying RPC.
Args: Args:
...@@ -518,8 +533,6 @@ class StreamUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)): ...@@ -518,8 +533,6 @@ class StreamUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
metadata: An optional sequence of pairs of bytes to be transmitted to the metadata: An optional sequence of pairs of bytes to be transmitted to the
service-side of the RPC. service-side of the RPC.
credentials: An optional CallCredentials for the RPC. credentials: An optional CallCredentials for the RPC.
with_call: Whether or not to include return a Call for the RPC in addition
to the response.
Returns: Returns:
The response value for the RPC, and a Call for the RPC if with_call was The response value for the RPC, and a Call for the RPC if with_call was
...@@ -532,6 +545,28 @@ class StreamUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)): ...@@ -532,6 +545,28 @@ class StreamUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
""" """
raise NotImplementedError() raise NotImplementedError()
@abc.abstractmethod
def with_call(
self, request_iterator, timeout=None, metadata=None, credentials=None):
"""Synchronously invokes the underlying RPC.
Args:
request_iterator: An iterator that yields request values for the RPC.
timeout: An optional duration of time in seconds to allow for the RPC.
metadata: An optional sequence of pairs of bytes to be transmitted to the
service-side of the RPC.
credentials: An optional CallCredentials for the RPC.
Returns:
The response value for the RPC and a Call for the RPC.
Raises:
RpcError: Indicating that the RPC terminated with non-OK status. The
raised RpcError will also be a Call for the RPC affording the RPC's
metadata, status code, and details.
"""
raise NotImplementedError()
@abc.abstractmethod @abc.abstractmethod
def future( def future(
self, request_iterator, timeout=None, metadata=None, credentials=None): self, request_iterator, timeout=None, metadata=None, credentials=None):
......
...@@ -449,9 +449,7 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): ...@@ -449,9 +449,7 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
) )
return state, operations, deadline, deadline_timespec, None return state, operations, deadline, deadline_timespec, None
def __call__( def _blocking(self, request, timeout, metadata, credentials):
self, request, timeout=None, metadata=None, credentials=None,
with_call=False):
state, operations, deadline, deadline_timespec, rendezvous = self._prepare( state, operations, deadline, deadline_timespec, rendezvous = self._prepare(
request, timeout, metadata) request, timeout, metadata)
if rendezvous: if rendezvous:
...@@ -464,7 +462,15 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): ...@@ -464,7 +462,15 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
call.set_credentials(credentials._credentials) call.set_credentials(credentials._credentials)
call.start_batch(cygrpc.Operations(operations), None) call.start_batch(cygrpc.Operations(operations), None)
_handle_event(completion_queue.poll(), state, self._response_deserializer) _handle_event(completion_queue.poll(), state, self._response_deserializer)
return _end_unary_response_blocking(state, with_call, deadline) return state, deadline
def __call__(self, request, timeout=None, metadata=None, credentials=None):
state, deadline, = self._blocking(request, timeout, metadata, credentials)
return _end_unary_response_blocking(state, False, deadline)
def with_call(self, request, timeout=None, metadata=None, credentials=None):
state, deadline, = self._blocking(request, timeout, metadata, credentials)
return _end_unary_response_blocking(state, True, deadline)
def future(self, request, timeout=None, metadata=None, credentials=None): def future(self, request, timeout=None, metadata=None, credentials=None):
state, operations, deadline, deadline_timespec, rendezvous = self._prepare( state, operations, deadline, deadline_timespec, rendezvous = self._prepare(
...@@ -532,9 +538,7 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): ...@@ -532,9 +538,7 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
self._request_serializer = request_serializer self._request_serializer = request_serializer
self._response_deserializer = response_deserializer self._response_deserializer = response_deserializer
def __call__( def _blocking(self, request_iterator, timeout, metadata, credentials):
self, request_iterator, timeout=None, metadata=None, credentials=None,
with_call=False):
deadline, deadline_timespec = _deadline(timeout) deadline, deadline_timespec = _deadline(timeout)
state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None) state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
completion_queue = cygrpc.CompletionQueue() completion_queue = cygrpc.CompletionQueue()
...@@ -563,7 +567,19 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): ...@@ -563,7 +567,19 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
state.condition.notify_all() state.condition.notify_all()
if not state.due: if not state.due:
break break
return _end_unary_response_blocking(state, with_call, deadline) return state, deadline
def __call__(
self, request_iterator, timeout=None, metadata=None, credentials=None):
state, deadline, = self._blocking(
request_iterator, timeout, metadata, credentials)
return _end_unary_response_blocking(state, False, deadline)
def with_call(
self, request_iterator, timeout=None, metadata=None, credentials=None):
state, deadline, = self._blocking(
request_iterator, timeout, metadata, credentials)
return _end_unary_response_blocking(state, True, deadline)
def future( def future(
self, request_iterator, timeout=None, metadata=None, credentials=None): self, request_iterator, timeout=None, metadata=None, credentials=None):
......
...@@ -186,9 +186,9 @@ def _blocking_unary_unary( ...@@ -186,9 +186,9 @@ def _blocking_unary_unary(
response_deserializer=response_deserializer) response_deserializer=response_deserializer)
effective_metadata = _effective_metadata(metadata, metadata_transformer) effective_metadata = _effective_metadata(metadata, metadata_transformer)
if with_call: if with_call:
response, call = multi_callable( response, call = multi_callable.with_call(
request, timeout=timeout, metadata=effective_metadata, request, timeout=timeout, metadata=effective_metadata,
credentials=_credentials(protocol_options), with_call=True) credentials=_credentials(protocol_options))
return response, _Rendezvous(None, None, call) return response, _Rendezvous(None, None, call)
else: else:
return multi_callable( return multi_callable(
...@@ -237,9 +237,9 @@ def _blocking_stream_unary( ...@@ -237,9 +237,9 @@ def _blocking_stream_unary(
response_deserializer=response_deserializer) response_deserializer=response_deserializer)
effective_metadata = _effective_metadata(metadata, metadata_transformer) effective_metadata = _effective_metadata(metadata, metadata_transformer)
if with_call: if with_call:
response, call = multi_callable( response, call = multi_callable.with_call(
request_iterator, timeout=timeout, metadata=effective_metadata, request_iterator, timeout=timeout, metadata=effective_metadata,
credentials=_credentials(protocol_options), with_call=True) credentials=_credentials(protocol_options))
return response, _Rendezvous(None, None, call) return response, _Rendezvous(None, None, call)
else: else:
return multi_callable( return multi_callable(
......
...@@ -173,8 +173,8 @@ class MetadataTest(unittest.TestCase): ...@@ -173,8 +173,8 @@ class MetadataTest(unittest.TestCase):
def testUnaryUnary(self): def testUnaryUnary(self):
multi_callable = self._channel.unary_unary(_UNARY_UNARY) multi_callable = self._channel.unary_unary(_UNARY_UNARY)
unused_response, call = multi_callable( unused_response, call = multi_callable.with_call(
_REQUEST, metadata=_CLIENT_METADATA, with_call=True) _REQUEST, metadata=_CLIENT_METADATA)
self.assertTrue(test_common.metadata_transmitted( self.assertTrue(test_common.metadata_transmitted(
_SERVER_INITIAL_METADATA, call.initial_metadata())) _SERVER_INITIAL_METADATA, call.initial_metadata()))
self.assertTrue(test_common.metadata_transmitted( self.assertTrue(test_common.metadata_transmitted(
...@@ -192,9 +192,9 @@ class MetadataTest(unittest.TestCase): ...@@ -192,9 +192,9 @@ class MetadataTest(unittest.TestCase):
def testStreamUnary(self): def testStreamUnary(self):
multi_callable = self._channel.stream_unary(_STREAM_UNARY) multi_callable = self._channel.stream_unary(_STREAM_UNARY)
unused_response, call = multi_callable( unused_response, call = multi_callable.with_call(
[_REQUEST] * test_constants.STREAM_LENGTH, [_REQUEST] * test_constants.STREAM_LENGTH,
metadata=_CLIENT_METADATA, with_call=True) metadata=_CLIENT_METADATA)
self.assertTrue(test_common.metadata_transmitted( self.assertTrue(test_common.metadata_transmitted(
_SERVER_INITIAL_METADATA, call.initial_metadata())) _SERVER_INITIAL_METADATA, call.initial_metadata()))
self.assertTrue(test_common.metadata_transmitted( self.assertTrue(test_common.metadata_transmitted(
......
...@@ -27,7 +27,7 @@ ...@@ -27,7 +27,7 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Test of gRPC Python's application-layer API.""" """Test of RPCs made against gRPC Python's application-layer API."""
import itertools import itertools
import threading import threading
...@@ -216,10 +216,9 @@ class RPCTest(unittest.TestCase): ...@@ -216,10 +216,9 @@ class RPCTest(unittest.TestCase):
expected_response = self._handler.handle_unary_unary(request, None) expected_response = self._handler.handle_unary_unary(request, None)
multi_callable = _unary_unary_multi_callable(self._channel) multi_callable = _unary_unary_multi_callable(self._channel)
response, call = multi_callable( response, call = multi_callable.with_call(
request, metadata=( request, metadata=(
(b'test', b'SuccessfulUnaryRequestBlockingUnaryResponseWithCall'),), (b'test', b'SuccessfulUnaryRequestBlockingUnaryResponseWithCall'),))
with_call=True)
self.assertEqual(expected_response, response) self.assertEqual(expected_response, response)
self.assertIs(grpc.StatusCode.OK, call.code()) self.assertIs(grpc.StatusCode.OK, call.code())
...@@ -266,11 +265,11 @@ class RPCTest(unittest.TestCase): ...@@ -266,11 +265,11 @@ class RPCTest(unittest.TestCase):
request_iterator = iter(requests) request_iterator = iter(requests)
multi_callable = _stream_unary_multi_callable(self._channel) multi_callable = _stream_unary_multi_callable(self._channel)
response, call = multi_callable( response, call = multi_callable.with_call(
request_iterator, request_iterator,
metadata=( metadata=(
(b'test', b'SuccessfulStreamRequestBlockingUnaryResponseWithCall'), (b'test', b'SuccessfulStreamRequestBlockingUnaryResponseWithCall'),
), with_call=True) ))
self.assertEqual(expected_response, response) self.assertEqual(expected_response, response)
self.assertIs(grpc.StatusCode.OK, call.code()) self.assertIs(grpc.StatusCode.OK, call.code())
...@@ -525,10 +524,9 @@ class RPCTest(unittest.TestCase): ...@@ -525,10 +524,9 @@ class RPCTest(unittest.TestCase):
multi_callable = _unary_unary_multi_callable(self._channel) multi_callable = _unary_unary_multi_callable(self._channel)
with self._control.pause(): with self._control.pause():
with self.assertRaises(grpc.RpcError) as exception_context: with self.assertRaises(grpc.RpcError) as exception_context:
multi_callable( multi_callable.with_call(
request, timeout=test_constants.SHORT_TIMEOUT, request, timeout=test_constants.SHORT_TIMEOUT,
metadata=((b'test', b'ExpiredUnaryRequestBlockingUnaryResponse'),), metadata=((b'test', b'ExpiredUnaryRequestBlockingUnaryResponse'),))
with_call=True)
self.assertIsNotNone(exception_context.exception.initial_metadata()) self.assertIsNotNone(exception_context.exception.initial_metadata())
self.assertIs( self.assertIs(
...@@ -640,10 +638,9 @@ class RPCTest(unittest.TestCase): ...@@ -640,10 +638,9 @@ class RPCTest(unittest.TestCase):
multi_callable = _unary_unary_multi_callable(self._channel) multi_callable = _unary_unary_multi_callable(self._channel)
with self._control.fail(): with self._control.fail():
with self.assertRaises(grpc.RpcError) as exception_context: with self.assertRaises(grpc.RpcError) as exception_context:
multi_callable( multi_callable.with_call(
request, request,
metadata=((b'test', b'FailedUnaryRequestBlockingUnaryResponse'),), metadata=((b'test', b'FailedUnaryRequestBlockingUnaryResponse'),))
with_call=True)
self.assertIs(grpc.StatusCode.UNKNOWN, exception_context.exception.code()) self.assertIs(grpc.StatusCode.UNKNOWN, exception_context.exception.code())
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment