diff --git a/src/python/grpcio/grpc/framework/core/_end.py b/src/python/grpcio/grpc/framework/core/_end.py index 8e07d9061e017a1d6dab8f85d833c4be9cde89a6..9c615672aa71ee843f976cce15b97f68012bb6e1 100644 --- a/src/python/grpcio/grpc/framework/core/_end.py +++ b/src/python/grpcio/grpc/framework/core/_end.py @@ -85,35 +85,6 @@ def _future_shutdown(lock, cycle, event): return in_future -def _termination_action(lock, stats, operation_id, cycle): - """Constructs the termination action for a single operation. - - Args: - lock: A lock to hold during the termination action. - stats: A mapping from base.Outcome.Kind values to integers to increment - with the outcome kind given to the termination action. - operation_id: The operation ID for the termination action. - cycle: A _Cycle value to be updated during the termination action. - - Returns: - A callable that takes an operation outcome kind as its sole parameter and - that should be used as the termination action for the operation - associated with the given operation ID. - """ - def termination_action(outcome_kind): - with lock: - stats[outcome_kind] += 1 - cycle.operations.pop(operation_id, None) - if not cycle.operations: - for action in cycle.idle_actions: - cycle.pool.submit(action) - cycle.idle_actions = [] - if cycle.grace: - _cancel_futures(cycle.futures) - cycle.pool.shutdown(wait=False) - return termination_action - - class _End(End): """An End implementation.""" @@ -133,6 +104,31 @@ class _End(End): self._cycle = None + def _termination_action(self, operation_id): + """Constructs the termination action for a single operation. + + Args: + operation_id: The operation ID for the termination action. + + Returns: + A callable that takes an operation outcome kind as its sole parameter and + that should be used as the termination action for the operation + associated with the given operation ID. + """ + def termination_action(outcome_kind): + with self._lock: + self._stats[outcome_kind] += 1 + self._cycle.operations.pop(operation_id, None) + if not self._cycle.operations: + for action in self._cycle.idle_actions: + self._cycle.pool.submit(action) + self._cycle.idle_actions = [] + if self._cycle.grace: + _cancel_futures(self._cycle.futures) + self._cycle.pool.shutdown(wait=False) + self._cycle = None + return termination_action + def start(self): """See base.End.start for specification.""" with self._lock: @@ -174,8 +170,7 @@ class _End(End): with self._lock: if self._cycle is None or self._cycle.grace: raise ValueError('Can\'t operate on stopped or stopping End!') - termination_action = _termination_action( - self._lock, self._stats, operation_id, self._cycle) + termination_action = self._termination_action(operation_id) operation = _operation.invocation_operate( operation_id, group, method, subscription, timeout, protocol_options, initial_metadata, payload, completion, self._mate.accept_ticket, @@ -208,8 +203,7 @@ class _End(End): if operation is not None: operation.handle_ticket(ticket) elif self._servicer_package is not None and not self._cycle.grace: - termination_action = _termination_action( - self._lock, self._stats, ticket.operation_id, self._cycle) + termination_action = self._termination_action(ticket.operation_id) operation = _operation.service_operate( self._servicer_package, ticket, self._mate.accept_ticket, termination_action, self._cycle.pool)