Skip to content

Commit

Permalink
Change managers to pass each other Outcomes
Browse files Browse the repository at this point in the history
This refactoring greatly reduces the base-internal use of packet kinds
and will make much simpler a future change distinguishing front-to-back
packet kinds from back-to-front packet kinds.
  • Loading branch information
nathanielmanistaatgoogle committed Mar 16, 2015
1 parent d3d55c7 commit ae3e5b5
Show file tree
Hide file tree
Showing 10 changed files with 130 additions and 129 deletions.
5 changes: 3 additions & 2 deletions src/python/src/grpc/framework/base/packets/_cancellation.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

"""State and behavior for operation cancellation."""

from grpc.framework.base import interfaces as base_interfaces
from grpc.framework.base.packets import _interfaces
from grpc.framework.base.packets import packets

Expand Down Expand Up @@ -58,7 +59,7 @@ def __init__(
def cancel(self):
"""See _interfaces.CancellationManager.cancel for specification."""
with self._lock:
self._termination_manager.abort(packets.Kind.CANCELLATION)
self._transmission_manager.abort(packets.Kind.CANCELLATION)
self._termination_manager.abort(base_interfaces.Outcome.CANCELLED)
self._transmission_manager.abort(base_interfaces.Outcome.CANCELLED)
self._ingestion_manager.abort()
self._expiration_manager.abort()
8 changes: 4 additions & 4 deletions src/python/src/grpc/framework/base/packets/_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,9 @@

import time

# _interfaces and packets are referenced from specification in this module.
# _interfaces is referenced from specification in this module.
from grpc.framework.base import interfaces as base_interfaces
from grpc.framework.base.packets import _interfaces # pylint: disable=unused-import
from grpc.framework.base.packets import packets # pylint: disable=unused-import


class OperationContext(base_interfaces.OperationContext):
Expand All @@ -48,8 +47,9 @@ def __init__(
Args:
lock: The operation-wide lock.
operation_id: An object identifying the operation.
local_failure: Whichever one of packets.Kind.SERVICED_FAILURE or
packets.Kind.SERVICER_FAILURE describes local failure of customer code.
local_failure: Whichever one of base_interfaces.Outcome.SERVICED_FAILURE
or base_interfaces.Outcome.SERVICER_FAILURE describes local failure of
customer code.
termination_manager: The _interfaces.TerminationManager for the operation.
transmission_manager: The _interfaces.TransmissionManager for the
operation.
Expand Down
22 changes: 11 additions & 11 deletions src/python/src/grpc/framework/base/packets/_emission.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,29 +29,29 @@

"""State and behavior for handling emitted values."""

# packets is referenced from specifications in this module.
from grpc.framework.base import interfaces as base_interfaces
from grpc.framework.base.packets import _interfaces
from grpc.framework.base.packets import packets # pylint: disable=unused-import


class _EmissionManager(_interfaces.EmissionManager):
"""An implementation of _interfaces.EmissionManager."""

def __init__(
self, lock, failure_kind, termination_manager, transmission_manager):
self, lock, failure_outcome, termination_manager, transmission_manager):
"""Constructor.
Args:
lock: The operation-wide lock.
failure_kind: Whichever one of packets.Kind.SERVICED_FAILURE or
packets.Kind.SERVICER_FAILURE describes this object's methods being
called inappropriately by customer code.
failure_outcome: Whichever one of
base_interfaces.Outcome.SERVICED_FAILURE or
base_interfaces.Outcome.SERVICER_FAILURE describes this object's
methods being called inappropriately by customer code.
termination_manager: The _interfaces.TerminationManager for the operation.
transmission_manager: The _interfaces.TransmissionManager for the
operation.
"""
self._lock = lock
self._failure_kind = failure_kind
self._failure_outcome = failure_outcome
self._termination_manager = termination_manager
self._transmission_manager = transmission_manager
self._ingestion_manager = None
Expand All @@ -65,8 +65,8 @@ def set_ingestion_manager_and_expiration_manager(
self._expiration_manager = expiration_manager

def _abort(self):
self._termination_manager.abort(self._failure_kind)
self._transmission_manager.abort(self._failure_kind)
self._termination_manager.abort(self._failure_outcome)
self._transmission_manager.abort(self._failure_outcome)
self._ingestion_manager.abort()
self._expiration_manager.abort()

Expand Down Expand Up @@ -106,7 +106,7 @@ def front_emission_manager(lock, termination_manager, transmission_manager):
An _interfaces.EmissionManager appropriate for front-side use.
"""
return _EmissionManager(
lock, packets.Kind.SERVICED_FAILURE, termination_manager,
lock, base_interfaces.Outcome.SERVICED_FAILURE, termination_manager,
transmission_manager)


Expand All @@ -122,5 +122,5 @@ def back_emission_manager(lock, termination_manager, transmission_manager):
An _interfaces.EmissionManager appropriate for back-side use.
"""
return _EmissionManager(
lock, packets.Kind.SERVICER_FAILURE, termination_manager,
lock, base_interfaces.Outcome.SERVICER_FAILURE, termination_manager,
transmission_manager)
4 changes: 2 additions & 2 deletions src/python/src/grpc/framework/base/packets/_ends.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ def _front_operate(
lock, transmission_pool, callback, operation_id, name,
subscription.kind, trace_id, timeout, termination_manager)
operation_context = _context.OperationContext(
lock, operation_id, packets.Kind.SERVICED_FAILURE,
lock, operation_id, base_interfaces.Outcome.SERVICED_FAILURE,
termination_manager, transmission_manager)
emission_manager = _emission.front_emission_manager(
lock, termination_manager, transmission_manager)
Expand Down Expand Up @@ -327,7 +327,7 @@ def _back_operate(
lock, transmission_pool, callback, ticket.operation_id,
termination_manager, ticket.subscription)
operation_context = _context.OperationContext(
lock, ticket.operation_id, packets.Kind.SERVICER_FAILURE,
lock, ticket.operation_id, base_interfaces.Outcome.SERVICER_FAILURE,
termination_manager, transmission_manager)
emission_manager = _emission.back_emission_manager(
lock, termination_manager, transmission_manager)
Expand Down
6 changes: 3 additions & 3 deletions src/python/src/grpc/framework/base/packets/_expiration.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@

import time

from grpc.framework.base import interfaces as base_interfaces
from grpc.framework.base.packets import _interfaces
from grpc.framework.base.packets import packets
from grpc.framework.foundation import later


Expand Down Expand Up @@ -73,8 +73,8 @@ def _expire(self, index):
with self._lock:
if self._future is not None and index == self._index:
self._future = None
self._termination_manager.abort(packets.Kind.EXPIRATION)
self._transmission_manager.abort(packets.Kind.EXPIRATION)
self._termination_manager.abort(base_interfaces.Outcome.EXPIRED)
self._transmission_manager.abort(base_interfaces.Outcome.EXPIRED)
self._ingestion_manager.abort()

def start(self):
Expand Down
40 changes: 22 additions & 18 deletions src/python/src/grpc/framework/base/packets/_ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ class _IngestionManager(_interfaces.IngestionManager):
"""An implementation of _interfaces.IngestionManager."""

def __init__(
self, lock, pool, consumer_creator, failure_kind, termination_manager,
self, lock, pool, consumer_creator, failure_outcome, termination_manager,
transmission_manager):
"""Constructor.
Expand All @@ -216,16 +216,18 @@ def __init__(
consumer_creator: A _ConsumerCreator wrapping the portion of customer code
that when called returns the stream.Consumer with which the customer
code will ingest payload values.
failure_kind: Whichever one of packets.Kind.SERVICED_FAILURE or
packets.Kind.SERVICER_FAILURE describes local failure of customer code.
failure_outcome: Whichever one of
interfaces.Outcome.SERVICED_FAILURE or
interfaces.Outcome.SERVICER_FAILURE describes local failure of
customer code.
termination_manager: The _interfaces.TerminationManager for the operation.
transmission_manager: The _interfaces.TransmissionManager for the
operation.
"""
self._lock = lock
self._pool = pool
self._consumer_creator = consumer_creator
self._failure_kind = failure_kind
self._failure_outcome = failure_outcome
self._termination_manager = termination_manager
self._transmission_manager = transmission_manager
self._expiration_manager = None
Expand Down Expand Up @@ -299,12 +301,12 @@ def _process(self, wrapped_ingestion_consumer, payload, complete):
else:
with self._lock:
if self._pending_ingestion is not None:
self._abort_and_notify(self._failure_kind)
self._abort_and_notify(self._failure_outcome)
self._processing = False
return
else:
with self._lock:
self._abort_and_notify(self._failure_kind)
self._abort_and_notify(self._failure_outcome)
self._processing = False
return

Expand All @@ -316,16 +318,16 @@ def initialize():
_CREATE_CONSUMER_EXCEPTION_LOG_MESSAGE, requirement)
if consumer_creation_outcome.return_value is None:
with self._lock:
self._abort_and_notify(self._failure_kind)
self._abort_and_notify(self._failure_outcome)
self._processing = False
elif consumer_creation_outcome.return_value.remote_error:
with self._lock:
self._abort_and_notify(packets.Kind.RECEPTION_FAILURE)
self._abort_and_notify(interfaces.Outcome.RECEPTION_FAILURE)
self._processing = False
elif consumer_creation_outcome.return_value.abandoned:
with self._lock:
if self._pending_ingestion is not None:
self._abort_and_notify(self._failure_kind)
self._abort_and_notify(self._failure_outcome)
self._processing = False
else:
wrapped_ingestion_consumer = _WrappedConsumer(
Expand All @@ -346,7 +348,7 @@ def initialize():

def consume(self, payload):
if self._ingestion_complete:
self._abort_and_notify(self._failure_kind)
self._abort_and_notify(self._failure_outcome)
elif self._pending_ingestion is not None:
if self._processing:
self._pending_ingestion.append(payload)
Expand All @@ -359,7 +361,7 @@ def consume(self, payload):

def terminate(self):
if self._ingestion_complete:
self._abort_and_notify(self._failure_kind)
self._abort_and_notify(self._failure_outcome)
else:
self._ingestion_complete = True
if self._pending_ingestion is not None and not self._processing:
Expand All @@ -371,7 +373,7 @@ def terminate(self):

def consume_and_terminate(self, payload):
if self._ingestion_complete:
self._abort_and_notify(self._failure_kind)
self._abort_and_notify(self._failure_outcome)
else:
self._ingestion_complete = True
if self._pending_ingestion is not None:
Expand All @@ -397,19 +399,20 @@ def front_ingestion_manager(
Args:
lock: The operation-wide lock.
pool: A thread pool in which to execute customer code.
subscription: A base_interfaces.ServicedSubscription indicating the
subscription: A interfaces.ServicedSubscription indicating the
customer's interest in the results of the operation.
termination_manager: The _interfaces.TerminationManager for the operation.
transmission_manager: The _interfaces.TransmissionManager for the
operation.
operation_context: A base_interfaces.OperationContext for the operation.
operation_context: A interfaces.OperationContext for the operation.
Returns:
An IngestionManager appropriate for front-side use.
"""
ingestion_manager = _IngestionManager(
lock, pool, _FrontConsumerCreator(subscription, operation_context),
packets.Kind.SERVICED_FAILURE, termination_manager, transmission_manager)
interfaces.Outcome.SERVICED_FAILURE, termination_manager,
transmission_manager)
ingestion_manager.start(None)
return ingestion_manager

Expand All @@ -422,11 +425,11 @@ def back_ingestion_manager(
Args:
lock: The operation-wide lock.
pool: A thread pool in which to execute customer code.
servicer: A base_interfaces.Servicer for servicing the operation.
servicer: A interfaces.Servicer for servicing the operation.
termination_manager: The _interfaces.TerminationManager for the operation.
transmission_manager: The _interfaces.TransmissionManager for the
operation.
operation_context: A base_interfaces.OperationContext for the operation.
operation_context: A interfaces.OperationContext for the operation.
emission_consumer: The _interfaces.EmissionConsumer for the operation.
Returns:
Expand All @@ -435,5 +438,6 @@ def back_ingestion_manager(
ingestion_manager = _IngestionManager(
lock, pool, _BackConsumerCreator(
servicer, operation_context, emission_consumer),
packets.Kind.SERVICER_FAILURE, termination_manager, transmission_manager)
interfaces.Outcome.SERVICER_FAILURE, termination_manager,
transmission_manager)
return ingestion_manager
8 changes: 4 additions & 4 deletions src/python/src/grpc/framework/base/packets/_interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@ def ingestion_complete(self):
raise NotImplementedError()

@abc.abstractmethod
def abort(self, kind):
def abort(self, outcome):
"""Indicates that the operation must abort for the indicated reason.
Args:
kind: A value of packets.Kind indicating operation abortion.
outcome: A base_interfaces.Outcome indicating operation abortion.
"""
raise NotImplementedError()

Expand All @@ -109,11 +109,11 @@ def inmit(self, emission, complete):
raise NotImplementedError()

@abc.abstractmethod
def abort(self, kind):
def abort(self, outcome):
"""Indicates that the operation has aborted for the indicated reason.
Args:
kind: A value of packets.Kind indicating operation abortion.
outcome: A base_interfaces.Outcome indicating operation abortion.
"""
raise NotImplementedError()

Expand Down
Loading

0 comments on commit ae3e5b5

Please sign in to comment.