Skip to content
Snippets Groups Projects
Commit 6eade73f authored by Masood Malekghassemi's avatar Masood Malekghassemi
Browse files

Merge pull request #709 from nathanielmanistaatgoogle/thread-pool-less-links

Thread-pool-less construction of GRPC links.
parents cc69f8d9 e04e20aa
No related branches found
No related tags found
No related merge requests found
...@@ -41,6 +41,9 @@ from grpc.framework.base.packets import interfaces as ticket_interfaces ...@@ -41,6 +41,9 @@ from grpc.framework.base.packets import interfaces as ticket_interfaces
from grpc.framework.base.packets import null from grpc.framework.base.packets import null
from grpc.framework.base.packets import packets as tickets from grpc.framework.base.packets import packets as tickets
from grpc.framework.foundation import activated from grpc.framework.foundation import activated
from grpc.framework.foundation import logging_pool
_THREAD_POOL_SIZE = 100
@enum.unique @enum.unique
...@@ -353,3 +356,90 @@ class ForeLink(ticket_interfaces.ForeLink, activated.Activated): ...@@ -353,3 +356,90 @@ class ForeLink(ticket_interfaces.ForeLink, activated.Activated):
self._complete(ticket.operation_id, ticket.payload) self._complete(ticket.operation_id, ticket.payload)
else: else:
self._cancel(ticket.operation_id) self._cancel(ticket.operation_id)
class _ActivatedForeLink(ticket_interfaces.ForeLink, activated.Activated):
def __init__(
self, port, request_deserializers, response_serializers,
root_certificates, key_chain_pairs):
self._port = port
self._request_deserializers = request_deserializers
self._response_serializers = response_serializers
self._root_certificates = root_certificates
self._key_chain_pairs = key_chain_pairs
self._lock = threading.Lock()
self._pool = None
self._fore_link = None
self._rear_link = null.NULL_REAR_LINK
def join_rear_link(self, rear_link):
with self._lock:
self._rear_link = null.NULL_REAR_LINK if rear_link is None else rear_link
if self._fore_link is not None:
self._fore_link.join_rear_link(rear_link)
def _start(self):
with self._lock:
self._pool = logging_pool.pool(_THREAD_POOL_SIZE)
self._fore_link = ForeLink(
self._pool, self._request_deserializers, self._response_serializers,
self._root_certificates, self._key_chain_pairs, port=self._port)
self._fore_link.join_rear_link(self._rear_link)
self._fore_link.start()
return self
def _stop(self):
with self._lock:
self._fore_link.stop()
self._fore_link = None
self._pool.shutdown(wait=True)
self._pool = None
def __enter__(self):
return self._start()
def __exit__(self, exc_type, exc_val, exc_tb):
self._stop()
return False
def start(self):
return self._start()
def stop(self):
self._stop()
def port(self):
with self._lock:
return None if self._fore_link is None else self._fore_link.port()
def accept_back_to_front_ticket(self, ticket):
with self._lock:
if self._fore_link is not None:
self._fore_link.accept_back_to_front_ticket(ticket)
def activated_fore_link(
port, request_deserializers, response_serializers, root_certificates,
key_chain_pairs):
"""Creates a ForeLink that is also an activated.Activated.
The returned object is only valid for use between calls to its start and stop
methods (or in context when used as a context manager).
Args:
port: The port on which to serve RPCs, or None for a port to be
automatically selected.
request_deserializers: A dictionary from RPC method names to request object
deserializer behaviors.
response_serializers: A dictionary from RPC method names to response object
serializer behaviors.
root_certificates: The PEM-encoded client root certificates as a bytestring
or None.
key_chain_pairs: A sequence of PEM-encoded private key-certificate chain
pairs.
"""
return _ActivatedForeLink(
port, request_deserializers, response_serializers, root_certificates,
key_chain_pairs)
...@@ -40,6 +40,9 @@ from grpc.framework.base.packets import interfaces as ticket_interfaces ...@@ -40,6 +40,9 @@ from grpc.framework.base.packets import interfaces as ticket_interfaces
from grpc.framework.base.packets import null from grpc.framework.base.packets import null
from grpc.framework.base.packets import packets as tickets from grpc.framework.base.packets import packets as tickets
from grpc.framework.foundation import activated from grpc.framework.foundation import activated
from grpc.framework.foundation import logging_pool
_THREAD_POOL_SIZE = 100
_INVOCATION_EVENT_KINDS = ( _INVOCATION_EVENT_KINDS = (
_low.Event.Kind.METADATA_ACCEPTED, _low.Event.Kind.METADATA_ACCEPTED,
...@@ -361,3 +364,75 @@ class RearLink(ticket_interfaces.RearLink, activated.Activated): ...@@ -361,3 +364,75 @@ class RearLink(ticket_interfaces.RearLink, activated.Activated):
else: else:
# NOTE(nathaniel): All other categories are treated as cancellation. # NOTE(nathaniel): All other categories are treated as cancellation.
self._cancel(ticket.operation_id) self._cancel(ticket.operation_id)
class _ActivatedRearLink(ticket_interfaces.RearLink, activated.Activated):
def __init__(self, host, port, request_serializers, response_deserializers):
self._host = host
self._port = port
self._request_serializers = request_serializers
self._response_deserializers = response_deserializers
self._lock = threading.Lock()
self._pool = None
self._rear_link = None
self._fore_link = null.NULL_FORE_LINK
def join_fore_link(self, fore_link):
with self._lock:
self._fore_link = null.NULL_FORE_LINK if fore_link is None else fore_link
def _start(self):
with self._lock:
self._pool = logging_pool.pool(_THREAD_POOL_SIZE)
self._rear_link = RearLink(
self._host, self._port, self._pool, self._request_serializers,
self._response_deserializers)
self._rear_link.join_fore_link(self._fore_link)
self._rear_link.start()
return self
def _stop(self):
with self._lock:
self._rear_link.stop()
self._rear_link = None
self._pool.shutdown(wait=True)
self._pool = None
def __enter__(self):
return self._start()
def __exit__(self, exc_type, exc_val, exc_tb):
self._stop()
return False
def start(self):
return self._start()
def stop(self):
self._stop()
def accept_front_to_back_ticket(self, ticket):
with self._lock:
if self._rear_link is not None:
self._rear_link.accept_front_to_back_ticket(ticket)
def activated_rear_link(
host, port, request_serializers, response_deserializers):
"""Creates a RearLink that is also an activated.Activated.
The returned object is only valid for use between calls to its start and stop
methods (or in context when used as a context manager).
Args:
host: The host to which to connect for RPC service.
port: The port to which to connect for RPC service.
request_serializers: A dictionary from RPC method name to request object
serializer behavior.
response_deserializers: A dictionary from RPC method name to response
object deserializer behavior.
"""
return _ActivatedRearLink(
host, port, request_serializers, response_deserializers)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment