diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index bee32c9629e48d772bbaf9009b709330211f9340..3ac735a4ec99596d6505de97ac3995f41ca79af6 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -435,10 +435,10 @@ def _end_unary_response_blocking(state, with_call, deadline): class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): def __init__( - self, channel, create_managed_call, method, request_serializer, + self, channel, managed_call, method, request_serializer, response_deserializer): self._channel = channel - self._create_managed_call = create_managed_call + self._managed_call = managed_call self._method = method self._request_serializer = request_serializer self._response_deserializer = response_deserializer @@ -490,23 +490,24 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): if rendezvous: return rendezvous else: - call = self._create_managed_call( + call, drive_call = self._managed_call( None, 0, self._method, None, deadline_timespec) if credentials is not None: call.set_credentials(credentials._credentials) event_handler = _event_handler(state, call, self._response_deserializer) with state.condition: call.start_client_batch(cygrpc.Operations(operations), event_handler) + drive_call() return _Rendezvous(state, call, self._response_deserializer, deadline) class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): def __init__( - self, channel, create_managed_call, method, request_serializer, + self, channel, managed_call, method, request_serializer, response_deserializer): self._channel = channel - self._create_managed_call = create_managed_call + self._managed_call = managed_call self._method = method self._request_serializer = request_serializer self._response_deserializer = response_deserializer @@ -518,7 +519,7 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): raise rendezvous else: state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None) - call = self._create_managed_call( + call, drive_call = self._managed_call( None, 0, self._method, None, deadline_timespec) if credentials is not None: call.set_credentials(credentials._credentials) @@ -536,16 +537,17 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS), ) call.start_client_batch(cygrpc.Operations(operations), event_handler) + drive_call() return _Rendezvous(state, call, self._response_deserializer, deadline) class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): def __init__( - self, channel, create_managed_call, method, request_serializer, + self, channel, managed_call, method, request_serializer, response_deserializer): self._channel = channel - self._create_managed_call = create_managed_call + self._managed_call = managed_call self._method = method self._request_serializer = request_serializer self._response_deserializer = response_deserializer @@ -597,7 +599,7 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): self, request_iterator, timeout=None, metadata=None, credentials=None): deadline, deadline_timespec = _deadline(timeout) state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None) - call = self._create_managed_call( + call, drive_call = self._managed_call( None, 0, self._method, None, deadline_timespec) if credentials is not None: call.set_credentials(credentials._credentials) @@ -614,6 +616,7 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS), ) call.start_client_batch(cygrpc.Operations(operations), event_handler) + drive_call() _consume_request_iterator( request_iterator, state, call, self._request_serializer) return _Rendezvous(state, call, self._response_deserializer, deadline) @@ -622,10 +625,10 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): def __init__( - self, channel, create_managed_call, method, request_serializer, + self, channel, managed_call, method, request_serializer, response_deserializer): self._channel = channel - self._create_managed_call = create_managed_call + self._managed_call = managed_call self._method = method self._request_serializer = request_serializer self._response_deserializer = response_deserializer @@ -634,7 +637,7 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): self, request_iterator, timeout=None, metadata=None, credentials=None): deadline, deadline_timespec = _deadline(timeout) state = _RPCState(_STREAM_STREAM_INITIAL_DUE, None, None, None, None) - call = self._create_managed_call( + call, drive_call = self._managed_call( None, 0, self._method, None, deadline_timespec) if credentials is not None: call.set_credentials(credentials._credentials) @@ -650,6 +653,7 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS), ) call.start_client_batch(cygrpc.Operations(operations), event_handler) + drive_call() _consume_request_iterator( request_iterator, state, call, self._request_serializer) return _Rendezvous(state, call, self._response_deserializer, deadline) @@ -687,16 +691,13 @@ def _run_channel_spin_thread(state): channel_spin_thread.start() -def _create_channel_managed_call(state): - def create_channel_managed_call(parent, flags, method, host, deadline): - """Creates a managed cygrpc.Call. +def _channel_managed_call_management(state): + def create(parent, flags, method, host, deadline): + """Creates a managed cygrpc.Call and a function to call to drive it. - Callers of this function must conduct at least one operation on the returned - call. The tags associated with operations conducted on the returned call - must be no-argument callables that return None to indicate that this channel - should continue polling for events associated with the call and return the - call itself to indicate that no more events associated with the call will be - generated. + If operations are successfully added to the returned cygrpc.Call, the + returned function must be called. If operations are not successfully added + to the returned cygrpc.Call, the returned function must not be called. Args: parent: A cygrpc.Call to be used as the parent of the created call. @@ -706,18 +707,22 @@ def _create_channel_managed_call(state): deadline: A cygrpc.Timespec to be the deadline of the created call. Returns: - A cygrpc.Call with which to conduct an RPC. + A cygrpc.Call with which to conduct an RPC and a function to call if + operations are successfully started on the call. """ - with state.lock: - call = state.channel.create_call( - parent, flags, state.completion_queue, method, host, deadline) - if state.managed_calls is None: - state.managed_calls = set((call,)) - _run_channel_spin_thread(state) - else: - state.managed_calls.add(call) - return call - return create_channel_managed_call + call = state.channel.create_call( + parent, flags, state.completion_queue, method, host, deadline) + + def drive(): + with state.lock: + if state.managed_calls is None: + state.managed_calls = set((call,)) + _run_channel_spin_thread(state) + else: + state.managed_calls.add(call) + + return call, drive + return create class _ChannelConnectivityState(object): @@ -881,25 +886,25 @@ class Channel(grpc.Channel): def unary_unary( self, method, request_serializer=None, response_deserializer=None): return _UnaryUnaryMultiCallable( - self._channel, _create_channel_managed_call(self._call_state), + self._channel, _channel_managed_call_management(self._call_state), _common.encode(method), request_serializer, response_deserializer) def unary_stream( self, method, request_serializer=None, response_deserializer=None): return _UnaryStreamMultiCallable( - self._channel, _create_channel_managed_call(self._call_state), + self._channel, _channel_managed_call_management(self._call_state), _common.encode(method), request_serializer, response_deserializer) def stream_unary( self, method, request_serializer=None, response_deserializer=None): return _StreamUnaryMultiCallable( - self._channel, _create_channel_managed_call(self._call_state), + self._channel, _channel_managed_call_management(self._call_state), _common.encode(method), request_serializer, response_deserializer) def stream_stream( self, method, request_serializer=None, response_deserializer=None): return _StreamStreamMultiCallable( - self._channel, _create_channel_managed_call(self._call_state), + self._channel, _channel_managed_call_management(self._call_state), _common.encode(method), request_serializer, response_deserializer) def __del__(self):