Skip to content

Commit

Permalink
Merge pull request grpc#3260 from nathanielmanistaatgoogle/protocol-o…
Browse files Browse the repository at this point in the history
…bjects

Plumb protocol objects through RPC Framework core
  • Loading branch information
soltanmm committed Sep 7, 2015
2 parents d91e5c6 + 13db8e5 commit a4836ad
Show file tree
Hide file tree
Showing 12 changed files with 319 additions and 24 deletions.
4 changes: 3 additions & 1 deletion src/python/grpcio/grpc/_links/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,12 @@ def _on_service_acceptance_event(self, event, server):
request_deserializer, response_serializer, 1, _Read.READING, None, 1,
_HighWrite.OPEN, _LowWrite.OPEN, False, None, None, None,
set((_READ, _FINISH,)))
protocol = links.Protocol(
links.Protocol.Kind.SERVICER_CONTEXT, 'TODO: Service Context Object!')
ticket = links.Ticket(
call, 0, group, method, links.Ticket.Subscription.FULL,
service_acceptance.deadline - time.time(), None, event.metadata, None,
None, None, None, None, 'TODO: Service Context Object!')
None, None, None, None, protocol)
self._relay.add_value(ticket)

def _on_read_event(self, event):
Expand Down
17 changes: 12 additions & 5 deletions src/python/grpcio/grpc/framework/core/_ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class _IngestionManager(_interfaces.IngestionManager):

def __init__(
self, lock, pool, subscription, subscription_creator, termination_manager,
transmission_manager, expiration_manager):
transmission_manager, expiration_manager, protocol_manager):
"""Constructor.
Args:
Expand All @@ -157,12 +157,14 @@ def __init__(
transmission_manager: The _interfaces.TransmissionManager for the
operation.
expiration_manager: The _interfaces.ExpirationManager for the operation.
protocol_manager: The _interfaces.ProtocolManager for the operation.
"""
self._lock = lock
self._pool = pool
self._termination_manager = termination_manager
self._transmission_manager = transmission_manager
self._expiration_manager = expiration_manager
self._protocol_manager = protocol_manager

if subscription is None:
self._subscription_creator = subscription_creator
Expand Down Expand Up @@ -296,6 +298,8 @@ def _create(self, subscription_creator, group, name):
self._abort_and_notify(
base.Outcome.Kind.REMOTE_FAILURE, code, details)
elif outcome.return_value.subscription.kind is base.Subscription.Kind.FULL:
self._protocol_manager.set_protocol_receiver(
outcome.return_value.subscription.protocol_receiver)
self._operator_post_create(outcome.return_value.subscription)
else:
# TODO(nathaniel): Support other subscriptions.
Expand Down Expand Up @@ -378,7 +382,7 @@ def advance(self, initial_metadata, payload, completion, allowance):

def invocation_ingestion_manager(
subscription, lock, pool, termination_manager, transmission_manager,
expiration_manager):
expiration_manager, protocol_manager):
"""Creates an IngestionManager appropriate for invocation-side use.
Args:
Expand All @@ -390,18 +394,20 @@ def invocation_ingestion_manager(
transmission_manager: The _interfaces.TransmissionManager for the
operation.
expiration_manager: The _interfaces.ExpirationManager for the operation.
protocol_manager: The _interfaces.ProtocolManager for the operation.
Returns:
An IngestionManager appropriate for invocation-side use.
"""
return _IngestionManager(
lock, pool, subscription, None, termination_manager, transmission_manager,
expiration_manager)
expiration_manager, protocol_manager)


def service_ingestion_manager(
servicer, operation_context, output_operator, lock, pool,
termination_manager, transmission_manager, expiration_manager):
termination_manager, transmission_manager, expiration_manager,
protocol_manager):
"""Creates an IngestionManager appropriate for service-side use.
The returned IngestionManager will require its set_group_and_name method to be
Expand All @@ -420,6 +426,7 @@ def service_ingestion_manager(
transmission_manager: The _interfaces.TransmissionManager for the
operation.
expiration_manager: The _interfaces.ExpirationManager for the operation.
protocol_manager: The _interfaces.ProtocolManager for the operation.
Returns:
An IngestionManager appropriate for service-side use.
Expand All @@ -428,4 +435,4 @@ def service_ingestion_manager(
servicer, operation_context, output_operator)
return _IngestionManager(
lock, pool, None, subscription_creator, termination_manager,
transmission_manager, expiration_manager)
transmission_manager, expiration_manager, protocol_manager)
25 changes: 25 additions & 0 deletions src/python/grpcio/grpc/framework/core/_interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,31 @@ def terminate(self):
raise NotImplementedError()


class ProtocolManager(object):
"""A manager of protocol-specific values passing through an operation."""
__metaclass__ = abc.ABCMeta

@abc.abstractmethod
def set_protocol_receiver(self, protocol_receiver):
"""Registers the customer object that will receive protocol objects.
Args:
protocol_receiver: A base.ProtocolReceiver to which protocol objects for
the operation should be passed.
"""
raise NotImplementedError()

@abc.abstractmethod
def accept_protocol_context(self, protocol_context):
"""Accepts the protocol context object for the operation.
Args:
protocol_context: An object designated for use as the protocol context
of the operation, with further semantics implementation-determined.
"""
raise NotImplementedError()


class EmissionManager(base.Operator):
"""A manager of values emitted by customer code."""
__metaclass__ = abc.ABCMeta
Expand Down
16 changes: 12 additions & 4 deletions src/python/grpcio/grpc/framework/core/_operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from grpc.framework.core import _expiration
from grpc.framework.core import _ingestion
from grpc.framework.core import _interfaces
from grpc.framework.core import _protocol
from grpc.framework.core import _reception
from grpc.framework.core import _termination
from grpc.framework.core import _transmission
Expand Down Expand Up @@ -123,16 +124,19 @@ def invocation_operate(
operation_id, ticket_sink, lock, pool, termination_manager)
expiration_manager = _expiration.invocation_expiration_manager(
timeout, lock, termination_manager, transmission_manager)
protocol_manager = _protocol.invocation_protocol_manager(
subscription, lock, pool, termination_manager, transmission_manager,
expiration_manager)
operation_context = _context.OperationContext(
lock, termination_manager, transmission_manager, expiration_manager)
emission_manager = _emission.EmissionManager(
lock, termination_manager, transmission_manager, expiration_manager)
ingestion_manager = _ingestion.invocation_ingestion_manager(
subscription, lock, pool, termination_manager, transmission_manager,
expiration_manager)
expiration_manager, protocol_manager)
reception_manager = _reception.ReceptionManager(
termination_manager, transmission_manager, expiration_manager,
ingestion_manager)
protocol_manager, ingestion_manager)

termination_manager.set_expiration_manager(expiration_manager)
transmission_manager.set_expiration_manager(expiration_manager)
Expand Down Expand Up @@ -174,16 +178,20 @@ def service_operate(
ticket.timeout, servicer_package.default_timeout,
servicer_package.maximum_timeout, lock, termination_manager,
transmission_manager)
protocol_manager = _protocol.service_protocol_manager(
lock, pool, termination_manager, transmission_manager,
expiration_manager)
operation_context = _context.OperationContext(
lock, termination_manager, transmission_manager, expiration_manager)
emission_manager = _emission.EmissionManager(
lock, termination_manager, transmission_manager, expiration_manager)
ingestion_manager = _ingestion.service_ingestion_manager(
servicer_package.servicer, operation_context, emission_manager, lock,
pool, termination_manager, transmission_manager, expiration_manager)
pool, termination_manager, transmission_manager, expiration_manager,
protocol_manager)
reception_manager = _reception.ReceptionManager(
termination_manager, transmission_manager, expiration_manager,
ingestion_manager)
protocol_manager, ingestion_manager)

termination_manager.set_expiration_manager(expiration_manager)
transmission_manager.set_expiration_manager(expiration_manager)
Expand Down
176 changes: 176 additions & 0 deletions src/python/grpcio/grpc/framework/core/_protocol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

"""State and behavior for passing protocol objects in an operation."""

import collections
import enum

from grpc.framework.core import _constants
from grpc.framework.core import _interfaces
from grpc.framework.core import _utilities
from grpc.framework.foundation import callable_util
from grpc.framework.interfaces.base import base

_EXCEPTION_LOG_MESSAGE = 'Exception delivering protocol object!'

_LOCAL_FAILURE_OUTCOME = _utilities.Outcome(
base.Outcome.Kind.LOCAL_FAILURE, None, None)


class _Awaited(
collections.namedtuple('_Awaited', ('kind', 'value',))):

@enum.unique
class Kind(enum.Enum):
NOT_YET_ARRIVED = 'not yet arrived'
ARRIVED = 'arrived'

_NOT_YET_ARRIVED = _Awaited(_Awaited.Kind.NOT_YET_ARRIVED, None)
_ARRIVED_AND_NONE = _Awaited(_Awaited.Kind.ARRIVED, None)


class _Transitory(
collections.namedtuple('_Transitory', ('kind', 'value',))):

@enum.unique
class Kind(enum.Enum):
NOT_YET_SEEN = 'not yet seen'
PRESENT = 'present'
GONE = 'gone'

_NOT_YET_SEEN = _Transitory(_Transitory.Kind.NOT_YET_SEEN, None)
_GONE = _Transitory(_Transitory.Kind.GONE, None)


class _ProtocolManager(_interfaces.ProtocolManager):
"""An implementation of _interfaces.ExpirationManager."""

def __init__(
self, protocol_receiver, lock, pool, termination_manager,
transmission_manager, expiration_manager):
"""Constructor.
Args:
protocol_receiver: An _Awaited wrapping of the base.ProtocolReceiver to
which protocol objects should be passed during the operation. May be
of kind _Awaited.Kind.NOT_YET_ARRIVED if the customer's subscription is
not yet known and may be of kind _Awaited.Kind.ARRIVED but with a value
of None if the customer's subscription did not include a
ProtocolReceiver.
lock: The operation-wide lock.
pool: A thread pool.
termination_manager: The _interfaces.TerminationManager for the operation.
transmission_manager: The _interfaces.TransmissionManager for the
operation.
expiration_manager: The _interfaces.ExpirationManager for the operation.
"""
self._lock = lock
self._pool = pool
self._termination_manager = termination_manager
self._transmission_manager = transmission_manager
self._expiration_manager = expiration_manager

self._protocol_receiver = protocol_receiver
self._context = _NOT_YET_SEEN

def _abort_and_notify(self, outcome):
if self._termination_manager.outcome is None:
self._termination_manager.abort(outcome)
self._transmission_manager.abort(outcome)
self._expiration_manager.terminate()

def _deliver(self, behavior, value):
def deliver():
delivery_outcome = callable_util.call_logging_exceptions(
behavior, _EXCEPTION_LOG_MESSAGE, value)
if delivery_outcome.kind is callable_util.Outcome.Kind.RAISED:
with self._lock:
self._abort_and_notify(_LOCAL_FAILURE_OUTCOME)
self._pool.submit(
callable_util.with_exceptions_logged(
deliver, _constants.INTERNAL_ERROR_LOG_MESSAGE))

def set_protocol_receiver(self, protocol_receiver):
"""See _interfaces.ProtocolManager.set_protocol_receiver for spec."""
self._protocol_receiver = _Awaited(_Awaited.Kind.ARRIVED, protocol_receiver)
if (self._context.kind is _Transitory.Kind.PRESENT and
protocol_receiver is not None):
self._deliver(protocol_receiver.context, self._context.value)
self._context = _GONE

def accept_protocol_context(self, protocol_context):
"""See _interfaces.ProtocolManager.accept_protocol_context for spec."""
if self._protocol_receiver.kind is _Awaited.Kind.ARRIVED:
if self._protocol_receiver.value is not None:
self._deliver(self._protocol_receiver.value.context, protocol_context)
self._context = _GONE
else:
self._context = _Transitory(_Transitory.Kind.PRESENT, protocol_context)


def invocation_protocol_manager(
subscription, lock, pool, termination_manager, transmission_manager,
expiration_manager):
"""Creates an _interfaces.ProtocolManager for invocation-side use.
Args:
subscription: The local customer's subscription to the operation.
lock: The operation-wide lock.
pool: A thread pool.
termination_manager: The _interfaces.TerminationManager for the operation.
transmission_manager: The _interfaces.TransmissionManager for the
operation.
expiration_manager: The _interfaces.ExpirationManager for the operation.
"""
if subscription.kind is base.Subscription.Kind.FULL:
awaited_protocol_receiver = _Awaited(
_Awaited.Kind.ARRIVED, subscription.protocol_receiver)
else:
awaited_protocol_receiver = _ARRIVED_AND_NONE
return _ProtocolManager(
awaited_protocol_receiver, lock, pool, termination_manager,
transmission_manager, expiration_manager)


def service_protocol_manager(
lock, pool, termination_manager, transmission_manager, expiration_manager):
"""Creates an _interfaces.ProtocolManager for service-side use.
Args:
lock: The operation-wide lock.
pool: A thread pool.
termination_manager: The _interfaces.TerminationManager for the operation.
transmission_manager: The _interfaces.TransmissionManager for the
operation.
expiration_manager: The _interfaces.ExpirationManager for the operation.
"""
return _ProtocolManager(
_NOT_YET_ARRIVED, lock, pool, termination_manager, transmission_manager,
expiration_manager)
14 changes: 13 additions & 1 deletion src/python/grpcio/grpc/framework/core/_reception.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,23 +51,31 @@
base.Outcome.Kind.RECEPTION_FAILURE, None, None)


def _carrying_protocol_context(ticket):
return ticket.protocol is not None and ticket.protocol.kind in (
links.Protocol.Kind.INVOCATION_CONTEXT,
links.Protocol.Kind.SERVICER_CONTEXT,)


class ReceptionManager(_interfaces.ReceptionManager):
"""A ReceptionManager based around a _Receiver passed to it."""

def __init__(
self, termination_manager, transmission_manager, expiration_manager,
ingestion_manager):
protocol_manager, ingestion_manager):
"""Constructor.
Args:
termination_manager: The operation's _interfaces.TerminationManager.
transmission_manager: The operation's _interfaces.TransmissionManager.
expiration_manager: The operation's _interfaces.ExpirationManager.
protocol_manager: The operation's _interfaces.ProtocolManager.
ingestion_manager: The operation's _interfaces.IngestionManager.
"""
self._termination_manager = termination_manager
self._transmission_manager = transmission_manager
self._expiration_manager = expiration_manager
self._protocol_manager = protocol_manager
self._ingestion_manager = ingestion_manager

self._lowest_unseen_sequence_number = 0
Expand Down Expand Up @@ -100,6 +108,10 @@ def _sequence_failure(self, ticket):
def _process_one(self, ticket):
if ticket.sequence_number == 0:
self._ingestion_manager.set_group_and_method(ticket.group, ticket.method)
if _carrying_protocol_context(ticket):
self._protocol_manager.accept_protocol_context(ticket.protocol.value)
else:
self._protocol_manager.accept_protocol_context(None)
if ticket.timeout is not None:
self._expiration_manager.change_timeout(ticket.timeout)
if ticket.termination is None:
Expand Down
Loading

0 comments on commit a4836ad

Please sign in to comment.