Skip to content

Commit

Permalink
Refactoring within Python RPC Framework
Browse files Browse the repository at this point in the history
The assembly and face layers were mostly redundant except that the
assembly layer had far-better interfaces and the face layer had more
of a reason to exist. Now they are merged.
  • Loading branch information
nathanielmanistaatgoogle committed Mar 7, 2015
1 parent 3631e82 commit 5c87a30
Show file tree
Hide file tree
Showing 20 changed files with 664 additions and 1,054 deletions.
30 changes: 5 additions & 25 deletions src/python/src/grpc/_adapter/_face_test_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,31 +50,12 @@ class FaceTestCase(test_case.FaceTestCase, coverage.BlockingCoverage):
"""Provides abstract Face-layer tests a GRPC-backed implementation."""

def set_up_implementation(
self,
name,
methods,
inline_value_in_value_out_methods,
inline_value_in_stream_out_methods,
inline_stream_in_value_out_methods,
inline_stream_in_stream_out_methods,
event_value_in_value_out_methods,
event_value_in_stream_out_methods,
event_stream_in_value_out_methods,
event_stream_in_stream_out_methods,
multi_method):
self, name, methods, method_implementations,
multi_method_implementation):
pool = logging_pool.pool(_MAXIMUM_POOL_SIZE)

servicer = face_implementations.servicer(
pool,
inline_value_in_value_out_methods=inline_value_in_value_out_methods,
inline_value_in_stream_out_methods=inline_value_in_stream_out_methods,
inline_stream_in_value_out_methods=inline_stream_in_value_out_methods,
inline_stream_in_stream_out_methods=inline_stream_in_stream_out_methods,
event_value_in_value_out_methods=event_value_in_value_out_methods,
event_value_in_stream_out_methods=event_value_in_stream_out_methods,
event_stream_in_value_out_methods=event_stream_in_value_out_methods,
event_stream_in_stream_out_methods=event_stream_in_stream_out_methods,
multi_method=multi_method)
pool, method_implementations, multi_method_implementation)

serialization = serial.serialization(methods)

Expand All @@ -96,9 +77,8 @@ def set_up_implementation(
rear_link.join_fore_link(front)
front.join_rear_link(rear_link)

server = face_implementations.server()
stub = face_implementations.stub(front, pool)
return server, stub, (rear_link, fore_link, front, back)
stub = face_implementations.generic_stub(front, pool)
return stub, (rear_link, fore_link, front, back)

def tear_down_implementation(self, memo):
rear_link, fore_link, front, back = memo
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,20 @@
import abc
import collections

# assembly_interfaces is referenced from specification in this module.
from grpc.framework.assembly import interfaces as assembly_interfaces # pylint: disable=unused-import
from grpc.framework.assembly import utilities as assembly_utilities
# face_interfaces is referenced from specification in this module.
from grpc.framework.common import cardinality
from grpc.framework.face import interfaces as face_interfaces # pylint: disable=unused-import
from grpc.framework.face import utilities as face_utilities
from grpc.early_adopter import _reexport
from grpc.early_adopter import interfaces


# TODO(issue 726): Kill the "implementations" attribute of this in favor
# of the same-information-less-bogusly-represented "cardinalities".
class InvocationBreakdown(object):
"""An intermediate representation of invocation-side views of RPC methods.
Attributes:
cardinalities: A dictionary from RPC method name to interfaces.Cardinality
value.
implementations: A dictionary from RPC method name to
assembly_interfaces.MethodImplementation describing the method.
request_serializers: A dictionary from RPC method name to callable
behavior to be used serializing request values for the RPC.
response_deserializers: A dictionary from RPC method name to callable
Expand All @@ -59,17 +56,16 @@ class _EasyInvocationBreakdown(
InvocationBreakdown,
collections.namedtuple(
'_EasyInvocationBreakdown',
('cardinalities', 'implementations', 'request_serializers',
'response_deserializers'))):
('cardinalities', 'request_serializers', 'response_deserializers'))):
pass


class ServiceBreakdown(object):
"""An intermediate representation of service-side views of RPC methods.
Attributes:
implementations: A dictionary from RPC method name
assembly_interfaces.MethodImplementation implementing the RPC method.
implementations: A dictionary from RPC method name to
face_interfaces.MethodImplementation implementing the RPC method.
request_deserializers: A dictionary from RPC method name to callable
behavior to be used deserializing request values for the RPC.
response_serializers: A dictionary from RPC method name to callable
Expand Down Expand Up @@ -97,25 +93,14 @@ def break_down_invocation(method_descriptions):
An InvocationBreakdown corresponding to the given method descriptions.
"""
cardinalities = {}
implementations = {}
request_serializers = {}
response_deserializers = {}
for name, method_description in method_descriptions.iteritems():
cardinality = method_description.cardinality()
cardinalities[name] = cardinality
if cardinality is interfaces.Cardinality.UNARY_UNARY:
implementations[name] = assembly_utilities.unary_unary_inline(None)
elif cardinality is interfaces.Cardinality.UNARY_STREAM:
implementations[name] = assembly_utilities.unary_stream_inline(None)
elif cardinality is interfaces.Cardinality.STREAM_UNARY:
implementations[name] = assembly_utilities.stream_unary_inline(None)
elif cardinality is interfaces.Cardinality.STREAM_STREAM:
implementations[name] = assembly_utilities.stream_stream_inline(None)
cardinalities[name] = method_description.cardinality()
request_serializers[name] = method_description.serialize_request
response_deserializers[name] = method_description.deserialize_response
return _EasyInvocationBreakdown(
cardinalities, implementations, request_serializers,
response_deserializers)
cardinalities, request_serializers, response_deserializers)


def break_down_service(method_descriptions):
Expand All @@ -139,28 +124,28 @@ def service(
service_behavior=method_description.service_unary_unary):
return service_behavior(
request, _reexport.rpc_context(face_rpc_context))
implementations[name] = assembly_utilities.unary_unary_inline(service)
implementations[name] = face_utilities.unary_unary_inline(service)
elif cardinality is interfaces.Cardinality.UNARY_STREAM:
def service(
request, face_rpc_context,
service_behavior=method_description.service_unary_stream):
return service_behavior(
request, _reexport.rpc_context(face_rpc_context))
implementations[name] = assembly_utilities.unary_stream_inline(service)
implementations[name] = face_utilities.unary_stream_inline(service)
elif cardinality is interfaces.Cardinality.STREAM_UNARY:
def service(
request_iterator, face_rpc_context,
service_behavior=method_description.service_stream_unary):
return service_behavior(
request_iterator, _reexport.rpc_context(face_rpc_context))
implementations[name] = assembly_utilities.stream_unary_inline(service)
implementations[name] = face_utilities.stream_unary_inline(service)
elif cardinality is interfaces.Cardinality.STREAM_STREAM:
def service(
request_iterator, face_rpc_context,
service_behavior=method_description.service_stream_stream):
return service_behavior(
request_iterator, _reexport.rpc_context(face_rpc_context))
implementations[name] = assembly_utilities.stream_stream_inline(service)
implementations[name] = face_utilities.stream_stream_inline(service)
request_deserializers[name] = method_description.deserialize_request
response_serializers[name] = method_description.serialize_response

Expand Down
47 changes: 32 additions & 15 deletions src/python/src/grpc/early_adopter/_reexport.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,20 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

from grpc.framework.common import cardinality
from grpc.framework.face import exceptions as face_exceptions
from grpc.framework.face import interfaces as face_interfaces
from grpc.framework.foundation import future
from grpc.early_adopter import exceptions
from grpc.early_adopter import interfaces

_EARLY_ADOPTER_CARDINALITY_TO_COMMON_CARDINALITY = {
interfaces.Cardinality.UNARY_UNARY: cardinality.Cardinality.UNARY_UNARY,
interfaces.Cardinality.UNARY_STREAM: cardinality.Cardinality.UNARY_STREAM,
interfaces.Cardinality.STREAM_UNARY: cardinality.Cardinality.STREAM_UNARY,
interfaces.Cardinality.STREAM_STREAM: cardinality.Cardinality.STREAM_STREAM,
}

_ABORTION_REEXPORT = {
face_interfaces.Abortion.CANCELLED: interfaces.Abortion.CANCELLED,
face_interfaces.Abortion.EXPIRED: interfaces.Abortion.EXPIRED,
Expand Down Expand Up @@ -142,28 +150,28 @@ def add_abortion_callback(self, abortion_callback):

class _UnaryUnarySyncAsync(interfaces.UnaryUnarySyncAsync):

def __init__(self, face_unary_unary_sync_async):
self._underlying = face_unary_unary_sync_async
def __init__(self, face_unary_unary_multi_callable):
self._underlying = face_unary_unary_multi_callable

def __call__(self, request, timeout):
return _call_reexporting_errors(
self._underlying, request, timeout)

def async(self, request, timeout):
return _ReexportedFuture(self._underlying.async(request, timeout))
return _ReexportedFuture(self._underlying.future(request, timeout))


class _StreamUnarySyncAsync(interfaces.StreamUnarySyncAsync):

def __init__(self, face_stream_unary_sync_async):
self._underlying = face_stream_unary_sync_async
def __init__(self, face_stream_unary_multi_callable):
self._underlying = face_stream_unary_multi_callable

def __call__(self, request_iterator, timeout):
return _call_reexporting_errors(
self._underlying, request_iterator, timeout)

def async(self, request_iterator, timeout):
return _ReexportedFuture(self._underlying.async(request_iterator, timeout))
return _ReexportedFuture(self._underlying.future(request_iterator, timeout))


class _Stub(interfaces.Stub):
Expand All @@ -182,31 +190,40 @@ def __exit__(self, exc_type, exc_val, exc_tb):

def __getattr__(self, attr):
underlying_attr = self._assembly_stub.__getattr__(attr)
cardinality = self._cardinalities.get(attr)
method_cardinality = self._cardinalities.get(attr)
# TODO(nathaniel): unify this trick with its other occurrence in the code.
if cardinality is None:
for name, cardinality in self._cardinalities.iteritems():
if method_cardinality is None:
for name, method_cardinality in self._cardinalities.iteritems():
last_slash_index = name.rfind('/')
if 0 <= last_slash_index and name[last_slash_index + 1:] == attr:
break
else:
raise AttributeError(attr)
if cardinality is interfaces.Cardinality.UNARY_UNARY:
if method_cardinality is interfaces.Cardinality.UNARY_UNARY:
return _UnaryUnarySyncAsync(underlying_attr)
elif cardinality is interfaces.Cardinality.UNARY_STREAM:
elif method_cardinality is interfaces.Cardinality.UNARY_STREAM:
return lambda request, timeout: _CancellableIterator(
underlying_attr(request, timeout))
elif cardinality is interfaces.Cardinality.STREAM_UNARY:
elif method_cardinality is interfaces.Cardinality.STREAM_UNARY:
return _StreamUnarySyncAsync(underlying_attr)
elif cardinality is interfaces.Cardinality.STREAM_STREAM:
elif method_cardinality is interfaces.Cardinality.STREAM_STREAM:
return lambda request_iterator, timeout: _CancellableIterator(
underlying_attr(request_iterator, timeout))
else:
raise AttributeError(attr)


def common_cardinalities(early_adopter_cardinalities):
common_cardinalities = {}
for name, early_adopter_cardinality in early_adopter_cardinalities.iteritems():
common_cardinalities[name] = _EARLY_ADOPTER_CARDINALITY_TO_COMMON_CARDINALITY[
early_adopter_cardinality]
return common_cardinalities


def rpc_context(face_rpc_context):
return _RpcContext(face_rpc_context)


def stub(assembly_stub, cardinalities):
return _Stub(assembly_stub, cardinalities)
def stub(face_stub, cardinalities):
return _Stub(face_stub, cardinalities)
11 changes: 6 additions & 5 deletions src/python/src/grpc/early_adopter/implementations.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

from grpc._adapter import fore as _fore
from grpc._adapter import rear as _rear
from grpc.early_adopter import _assembly_utilities
from grpc.early_adopter import _face_utilities
from grpc.early_adopter import _reexport
from grpc.early_adopter import interfaces
from grpc.framework.assembly import implementations as _assembly_implementations
Expand Down Expand Up @@ -95,12 +95,13 @@ def port(self):

def _build_stub(breakdown, activated_rear_link):
assembly_stub = _assembly_implementations.assemble_dynamic_inline_stub(
breakdown.implementations, activated_rear_link)
_reexport.common_cardinalities(breakdown.cardinalities),
activated_rear_link)
return _reexport.stub(assembly_stub, breakdown.cardinalities)


def _build_server(methods, port, private_key, certificate_chain):
breakdown = _assembly_utilities.break_down_service(methods)
breakdown = _face_utilities.break_down_service(methods)
return _Server(breakdown, port, private_key, certificate_chain)


Expand All @@ -117,7 +118,7 @@ def insecure_stub(methods, host, port):
Returns:
An interfaces.Stub affording RPC invocation.
"""
breakdown = _assembly_utilities.break_down_invocation(methods)
breakdown = _face_utilities.break_down_invocation(methods)
activated_rear_link = _rear.activated_rear_link(
host, port, breakdown.request_serializers,
breakdown.response_deserializers)
Expand Down Expand Up @@ -147,7 +148,7 @@ def secure_stub(
Returns:
An interfaces.Stub affording RPC invocation.
"""
breakdown = _assembly_utilities.break_down_invocation(methods)
breakdown = _face_utilities.break_down_invocation(methods)
activated_rear_link = _rear.secure_activated_rear_link(
host, port, breakdown.request_serializers,
breakdown.response_deserializers, root_certificates, private_key,
Expand Down
Loading

0 comments on commit 5c87a30

Please sign in to comment.