Skip to content
Snippets Groups Projects
Commit 2fe7cb7d authored by Nathaniel Manista's avatar Nathaniel Manista
Browse files

Add an early adopter public API.

parent 81337bb4
No related branches found
No related tags found
No related merge requests found
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import abc
import collections
from _framework.face import interfaces as face_interfaces
from grpc_early_adopter import interfaces
class _InlineUnaryUnaryMethod(face_interfaces.InlineValueInValueOutMethod):
def __init__(self, unary_unary_rpc_method):
self._method = unary_unary_rpc_method
def service(self, request, context):
"""See face_interfaces.InlineValueInValueOutMethod.service for spec."""
return self._method.service_unary_unary(request)
class _InlineUnaryStreamMethod(face_interfaces.InlineValueInStreamOutMethod):
def __init__(self, unary_stream_rpc_method):
self._method = unary_stream_rpc_method
def service(self, request, context):
"""See face_interfaces.InlineValueInStreamOutMethod.service for spec."""
return self._method.service_unary_stream(request)
class _InlineStreamUnaryMethod(face_interfaces.InlineStreamInValueOutMethod):
def __init__(self, stream_unary_rpc_method):
self._method = stream_unary_rpc_method
def service(self, request_iterator, context):
"""See face_interfaces.InlineStreamInValueOutMethod.service for spec."""
return self._method.service_stream_unary(request_iterator)
class _InlineStreamStreamMethod(face_interfaces.InlineStreamInStreamOutMethod):
def __init__(self, stream_stream_rpc_method):
self._method = stream_stream_rpc_method
def service(self, request_iterator, context):
"""See face_interfaces.InlineStreamInStreamOutMethod.service for spec."""
return self._method.service_stream_stream(request_iterator)
class Breakdown(object):
"""An intermediate representation of implementations of RPC methods.
Attributes:
unary_unary_methods:
unary_stream_methods:
stream_unary_methods:
stream_stream_methods:
request_serializers:
request_deserializers:
response_serializers:
response_deserializers:
"""
__metaclass__ = abc.ABCMeta
class _EasyBreakdown(
Breakdown,
collections.namedtuple(
'_EasyBreakdown',
['unary_unary_methods', 'unary_stream_methods', 'stream_unary_methods',
'stream_stream_methods', 'request_serializers',
'request_deserializers', 'response_serializers',
'response_deserializers'])):
pass
def break_down(methods):
"""Breaks down RPC methods.
Args:
methods: A dictionary from RPC mthod name to
interfaces.RpcMethod object describing the RPCs.
Returns:
A Breakdown corresponding to the given methods.
"""
unary_unary = {}
unary_stream = {}
stream_unary = {}
stream_stream = {}
request_serializers = {}
request_deserializers = {}
response_serializers = {}
response_deserializers = {}
for name, method in methods.iteritems():
cardinality = method.cardinality()
if cardinality is interfaces.Cardinality.UNARY_UNARY:
unary_unary[name] = _InlineUnaryUnaryMethod(method)
elif cardinality is interfaces.Cardinality.UNARY_STREAM:
unary_stream[name] = _InlineUnaryStreamMethod(method)
elif cardinality is interfaces.Cardinality.STREAM_UNARY:
stream_unary[name] = _InlineStreamUnaryMethod(method)
elif cardinality is interfaces.Cardinality.STREAM_STREAM:
stream_stream[name] = _InlineStreamStreamMethod(method)
request_serializers[name] = method.serialize_request
request_deserializers[name] = method.deserialize_request
response_serializers[name] = method.serialize_response
response_deserializers[name] = method.deserialize_response
return _EasyBreakdown(
unary_unary, unary_stream, stream_unary, stream_stream,
request_serializers, request_deserializers, response_serializers,
response_deserializers)
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Entry points into GRPC."""
import threading
from _adapter import fore
from _framework.base.packets import implementations as _tickets_implementations
from _framework.face import implementations as _face_implementations
from _framework.foundation import logging_pool
from grpc_early_adopter import _face_utilities
from grpc_early_adopter import interfaces
_MEGA_TIMEOUT = 60 * 60 * 24
_THREAD_POOL_SIZE = 80
class _Server(interfaces.Server):
def __init__(self, breakdown, port, private_key, certificate_chain):
self._lock = threading.Lock()
self._breakdown = breakdown
self._port = port
self._private_key = private_key
self._certificate_chain = certificate_chain
self._pool = None
self._fore_link = None
self._back = None
def start(self):
"""See interfaces.Server.start for specification."""
with self._lock:
if self._pool is None:
self._pool = logging_pool.pool(_THREAD_POOL_SIZE)
servicer = _face_implementations.servicer(
self._pool,
inline_value_in_value_out_methods=self._breakdown.unary_unary_methods,
inline_value_in_stream_out_methods=self._breakdown.unary_stream_methods,
inline_stream_in_value_out_methods=self._breakdown.stream_unary_methods,
inline_stream_in_stream_out_methods=self._breakdown.stream_stream_methods)
self._fore_link = fore.ForeLink(
self._pool, self._breakdown.request_deserializers,
self._breakdown.response_serializers, None,
((self._private_key, self._certificate_chain),), port=self._port)
port = self._fore_link.start()
self._back = _tickets_implementations.back(
servicer, self._pool, self._pool, self._pool, _MEGA_TIMEOUT,
_MEGA_TIMEOUT)
self._fore_link.join_rear_link(self._back)
self._back.join_fore_link(self._fore_link)
return port
else:
raise ValueError('Server currently running!')
def stop(self):
"""See interfaces.Server.stop for specification."""
with self._lock:
if self._pool is None:
raise ValueError('Server not running!')
else:
self._fore_link.stop()
self._pool.shutdown(wait=True)
self._pool = None
def _build_server(methods, port, private_key, certificate_chain):
breakdown = _face_utilities.break_down(methods)
return _Server(breakdown, port, private_key, certificate_chain)
def insecure_server(methods, port):
"""Constructs an insecure interfaces.Server.
Args:
methods: A dictionary from RPC method name to
interfaces.RpcMethod object describing the RPCs to be
serviced by the created server.
port: The port on which to serve.
Returns:
An interfaces.Server that will run with no security and
service unsecured raw requests.
"""
return _build_server(methods, port, None, None)
def secure_server(methods, port, private_key, certificate_chain):
"""Constructs a secure interfaces.Server.
Args:
methods: A dictionary from RPC method name to
interfaces.RpcMethod object describing the RPCs to be
serviced by the created server.
port: The port on which to serve.
private_key: A pem-encoded private key.
certificate_chain: A pem-encoded certificate chain.
Returns:
An interfaces.Server that will serve secure traffic.
"""
return _build_server(methods, port, private_key, certificate_chain)
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Interfaces of GRPC."""
import abc
import enum
@enum.unique
class Cardinality(enum.Enum):
"""Constants for the four cardinalities of RPC."""
UNARY_UNARY = 'request-unary/response-unary'
UNARY_STREAM = 'request-unary/response-streaming'
STREAM_UNARY = 'request-streaming/response-unary'
STREAM_STREAM = 'request-streaming/response-streaming'
class RpcMethod(object):
"""A sum type for the implementation of an RPC method."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def cardinality(self):
"""Identifies the cardinality of this RpcMethod.
Returns:
A Cardinality value identifying whether or not this
RpcMethod is request-unary or request-streaming and
whether or not it is response-unary or
response-streaming.
"""
raise NotImplementedError()
@abc.abstractmethod
def serialize_request(self, request):
"""Serializes a request value.
Args:
request: A request value appropriate for this RpcMethod.
Returns:
The serialization of the given request value as a
bytestring.
"""
raise NotImplementedError()
@abc.abstractmethod
def deserialize_request(self, serialized_request):
"""Deserializes a request value.
Args:
serialized_request: A bytestring that is the
serialization of a request value appropriate for this
RpcMethod.
Returns:
A request value corresponding to the given bytestring.
"""
raise NotImplementedError()
@abc.abstractmethod
def serialize_response(self, response):
"""Serializes a response value.
Args:
response: A response value appropriate for this RpcMethod.
Returns:
The serialization of the given response value as a
bytestring.
"""
raise NotImplementedError()
@abc.abstractmethod
def deserialize_response(self, serialized_response):
"""Deserializes a response value.
Args:
serialized_response: A bytestring that is the
serialization of a response value appropriate for this
RpcMethod.
Returns:
A response value corresponding to the given bytestring.
"""
raise NotImplementedError()
@abc.abstractmethod
def service_unary_unary(self, request):
"""Carries out this RPC.
This method may only be called if the cardinality of this
RpcMethod is Cardinality.UNARY_UNARY.
Args:
request: A request value appropriate for this RpcMethod.
Returns:
A response value appropriate for this RpcMethod.
"""
raise NotImplementedError()
@abc.abstractmethod
def service_unary_stream(self, request):
"""Carries out this RPC.
This method may only be called if the cardinality of this
RpcMethod is Cardinality.UNARY_STREAM.
Args:
request: A request value appropriate for this RpcMethod.
Yields:
Zero or more response values appropriate for this
RpcMethod.
"""
raise NotImplementedError()
@abc.abstractmethod
def service_stream_unary(self, request_iterator):
"""Carries out this RPC.
This method may only be called if the cardinality of this
RpcMethod is Cardinality.STREAM_UNARY.
Args:
request_iterator: An iterator of request values
appropriate for this RpcMethod.
Returns:
A response value appropriate for this RpcMethod.
"""
raise NotImplementedError()
@abc.abstractmethod
def service_stream_stream(self, request_iterator):
"""Carries out this RPC.
This method may only be called if the cardinality of this
RpcMethod is Cardinality.STREAM_STREAM.
Args:
request_iterator: An iterator of request values
appropriate for this RpcMethod.
Yields:
Zero or more response values appropraite for this
RpcMethod.
"""
raise NotImplementedError()
class Server(object):
"""A GRPC Server."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def start(self):
"""Instructs this server to commence service of RPCs."""
raise NotImplementedError()
@abc.abstractmethod
def stop(self):
"""Instructs this server to halt service of RPCs."""
raise NotImplementedError()
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Utilities for use with GRPC."""
from grpc_early_adopter import interfaces
class _RpcMethod(interfaces.RpcMethod):
def __init__(
self, cardinality, unary_unary, unary_stream, stream_unary,
stream_stream, request_serializer, request_deserializer,
response_serializer, response_deserializer):
self._cardinality = cardinality
self._unary_unary = unary_unary
self._unary_stream = unary_stream
self._stream_unary = stream_unary
self._stream_stream = stream_stream
self._request_serializer = request_serializer
self._request_deserializer = request_deserializer
self._response_serializer = response_serializer
self._response_deserializer = response_deserializer
def cardinality(self):
"""See interfaces.RpcMethod.cardinality for specification."""
return self._cardinality
def serialize_request(self, request):
"""See interfaces.RpcMethod.serialize_request for specification."""
return self._request_serializer(request)
def deserialize_request(self, serialized_request):
"""See interfaces.RpcMethod.deserialize_request for specification."""
return self._request_deserializer(serialized_request)
def serialize_response(self, response):
"""See interfaces.RpcMethod.serialize_response for specification."""
return self._response_serializer(response)
def deserialize_response(self, serialized_response):
"""See interfaces.RpcMethod.deserialize_response for specification."""
return self._response_deserializer(serialized_response)
def service_unary_unary(self, request):
"""See interfaces.RpcMethod.service_unary_unary for specification."""
return self._unary_unary(request)
def service_unary_stream(self, request):
"""See interfaces.RpcMethod.service_unary_stream for specification."""
return self._unary_stream(request)
def service_stream_unary(self, request_iterator):
"""See interfaces.RpcMethod.service_stream_unary for specification."""
return self._stream_unary(request_iterator)
def service_stream_stream(self, request_iterator):
"""See interfaces.RpcMethod.service_stream_stream for specification."""
return self._stream_stream(request_iterator)
def unary_unary_rpc_method(
behavior, request_serializer, request_deserializer, response_serializer,
response_deserializer):
"""Constructs an interfaces.RpcMethod for the given behavior.
Args:
behavior: A callable that implements a unary-unary RPC
method that accepts a single request and returns a single
response.
request_serializer: A callable that when called on a request
value returns a bytestring corresponding to that value.
request_deserializer: A callable that when called on a
bytestring returns the request value corresponding to that
bytestring.
response_serializer: A callable that when called on a
response value returns the bytestring corresponding to
that value.
response_deserializer: A callable that when called on a
bytestring returns the response value corresponding to
that bytestring.
Returns:
An interfaces.RpcMethod constructed from the given
arguments representing a unary-request/unary-response RPC
method.
"""
return _RpcMethod(
interfaces.Cardinality.UNARY_UNARY, behavior, None, None, None,
request_serializer, request_deserializer, response_serializer,
response_deserializer)
def unary_stream_rpc_method(
behavior, request_serializer, request_deserializer, response_serializer,
response_deserializer):
"""Constructs an interfaces.RpcMethod for the given behavior.
Args:
behavior: A callable that implements a unary-stream RPC
method that accepts a single request and returns an
iterator of zero or more responses.
request_serializer: A callable that when called on a request
value returns a bytestring corresponding to that value.
request_deserializer: A callable that when called on a
bytestring returns the request value corresponding to that
bytestring.
response_serializer: A callable that when called on a
response value returns the bytestring corresponding to
that value.
response_deserializer: A callable that when called on a
bytestring returns the response value corresponding to
that bytestring.
Returns:
An interfaces.RpcMethod constructed from the given
arguments representing a unary-request/streaming-response
RPC method.
"""
return _RpcMethod(
interfaces.Cardinality.UNARY_STREAM, None, behavior, None, None,
request_serializer, request_deserializer, response_serializer,
response_deserializer)
def stream_unary_rpc_method(
behavior, request_serializer, request_deserializer, response_serializer,
response_deserializer):
"""Constructs an interfaces.RpcMethod for the given behavior.
Args:
behavior: A callable that implements a stream-unary RPC
method that accepts an iterator of zero or more requests
and returns a single response.
request_serializer: A callable that when called on a request
value returns a bytestring corresponding to that value.
request_deserializer: A callable that when called on a
bytestring returns the request value corresponding to that
bytestring.
response_serializer: A callable that when called on a
response value returns the bytestring corresponding to
that value.
response_deserializer: A callable that when called on a
bytestring returns the response value corresponding to
that bytestring.
Returns:
An interfaces.RpcMethod constructed from the given
arguments representing a streaming-request/unary-response
RPC method.
"""
return _RpcMethod(
interfaces.Cardinality.STREAM_UNARY, None, None, behavior, None,
request_serializer, request_deserializer, response_serializer,
response_deserializer)
def stream_stream_rpc_method(
behavior, request_serializer, request_deserializer, response_serializer,
response_deserializer):
"""Constructs an interfaces.RpcMethod for the given behavior.
Args:
behavior: A callable that implements a stream-stream RPC
method that accepts an iterator of zero or more requests
and returns an iterator of zero or more responses.
request_serializer: A callable that when called on a request
value returns a bytestring corresponding to that value.
request_deserializer: A callable that when called on a
bytestring returns the request value corresponding to that
bytestring.
response_serializer: A callable that when called on a
response value returns the bytestring corresponding to
that value.
response_deserializer: A callable that when called on a
bytestring returns the response value corresponding to
that bytestring.
Returns:
An interfaces.RpcMethod constructed from the given
arguments representing a
streaming-request/streaming-response RPC method.
"""
return _RpcMethod(
interfaces.Cardinality.STREAM_STREAM, None, None, None, behavior,
request_serializer, request_deserializer, response_serializer,
response_deserializer)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment