Skip to content

Commit

Permalink
Merge pull request grpc#1058 from nathanielmanistaatgoogle/python-ref…
Browse files Browse the repository at this point in the history
…actoring

Use distinct enums for distinct ticket types
  • Loading branch information
soltanmm committed Mar 17, 2015
2 parents afdc24a + e2e443c commit 758a0c2
Show file tree
Hide file tree
Showing 7 changed files with 172 additions and 111 deletions.
63 changes: 41 additions & 22 deletions src/python/src/grpc/_adapter/_links_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,11 @@ def testZeroMessageRoundTrip(self):
test_fore_link = _test_links.ForeLink(None, None)
def rear_action(front_to_back_ticket, fore_link):
if front_to_back_ticket.kind in (
tickets.Kind.COMPLETION, tickets.Kind.ENTIRE):
tickets.FrontToBackPacket.Kind.COMPLETION,
tickets.FrontToBackPacket.Kind.ENTIRE):
back_to_front_ticket = tickets.BackToFrontPacket(
front_to_back_ticket.operation_id, 0, tickets.Kind.COMPLETION, None)
front_to_back_ticket.operation_id, 0,
tickets.BackToFrontPacket.Kind.COMPLETION, None)
fore_link.accept_back_to_front_ticket(back_to_front_ticket)
test_rear_link = _test_links.RearLink(rear_action, None)

Expand All @@ -81,20 +83,24 @@ def rear_action(front_to_back_ticket, fore_link):
rear_link.start()

front_to_back_ticket = tickets.FrontToBackPacket(
test_operation_id, 0, tickets.Kind.ENTIRE, test_method,
interfaces.ServicedSubscription.Kind.FULL, None, None, _TIMEOUT)
test_operation_id, 0, tickets.FrontToBackPacket.Kind.ENTIRE,
test_method, interfaces.ServicedSubscription.Kind.FULL, None, None,
_TIMEOUT)
rear_link.accept_front_to_back_ticket(front_to_back_ticket)

with test_fore_link.condition:
while (not test_fore_link.tickets or
test_fore_link.tickets[-1].kind is tickets.Kind.CONTINUATION):
test_fore_link.tickets[-1].kind is
tickets.BackToFrontPacket.Kind.CONTINUATION):
test_fore_link.condition.wait()

rear_link.stop()
fore_link.stop()

with test_fore_link.condition:
self.assertIs(test_fore_link.tickets[-1].kind, tickets.Kind.COMPLETION)
self.assertIs(
test_fore_link.tickets[-1].kind,
tickets.BackToFrontPacket.Kind.COMPLETION)

def testEntireRoundTrip(self):
test_operation_id = object()
Expand All @@ -109,11 +115,15 @@ def rear_action(front_to_back_ticket, fore_link):
else:
payload = test_back_to_front_datum
terminal = front_to_back_ticket.kind in (
tickets.Kind.COMPLETION, tickets.Kind.ENTIRE)
tickets.FrontToBackPacket.Kind.COMPLETION,
tickets.FrontToBackPacket.Kind.ENTIRE)
if payload is not None or terminal:
if terminal:
kind = tickets.BackToFrontPacket.Kind.COMPLETION
else:
kind = tickets.BackToFrontPacket.Kind.CONTINUATION
back_to_front_ticket = tickets.BackToFrontPacket(
front_to_back_ticket.operation_id, rear_sequence_number[0],
tickets.Kind.COMPLETION if terminal else tickets.Kind.CONTINUATION,
front_to_back_ticket.operation_id, rear_sequence_number[0], kind,
payload)
rear_sequence_number[0] += 1
fore_link.accept_back_to_front_ticket(back_to_front_ticket)
Expand All @@ -135,14 +145,15 @@ def rear_action(front_to_back_ticket, fore_link):
rear_link.start()

front_to_back_ticket = tickets.FrontToBackPacket(
test_operation_id, 0, tickets.Kind.ENTIRE, test_method,
interfaces.ServicedSubscription.Kind.FULL, None,
test_operation_id, 0, tickets.FrontToBackPacket.Kind.ENTIRE,
test_method, interfaces.ServicedSubscription.Kind.FULL, None,
test_front_to_back_datum, _TIMEOUT)
rear_link.accept_front_to_back_ticket(front_to_back_ticket)

with test_fore_link.condition:
while (not test_fore_link.tickets or
test_fore_link.tickets[-1].kind is not tickets.Kind.COMPLETION):
test_fore_link.tickets[-1].kind is not
tickets.BackToFrontPacket.Kind.COMPLETION):
test_fore_link.condition.wait()

rear_link.stop()
Expand Down Expand Up @@ -172,11 +183,15 @@ def rear_action(front_to_back_ticket, fore_link):
else:
response = None
terminal = front_to_back_ticket.kind in (
tickets.Kind.COMPLETION, tickets.Kind.ENTIRE)
tickets.FrontToBackPacket.Kind.COMPLETION,
tickets.FrontToBackPacket.Kind.ENTIRE)
if response is not None or terminal:
if terminal:
kind = tickets.BackToFrontPacket.Kind.COMPLETION
else:
kind = tickets.BackToFrontPacket.Kind.CONTINUATION
back_to_front_ticket = tickets.BackToFrontPacket(
front_to_back_ticket.operation_id, rear_sequence_number[0],
tickets.Kind.COMPLETION if terminal else tickets.Kind.CONTINUATION,
front_to_back_ticket.operation_id, rear_sequence_number[0], kind,
response)
rear_sequence_number[0] += 1
fore_link.accept_back_to_front_ticket(back_to_front_ticket)
Expand All @@ -199,25 +214,29 @@ def rear_action(front_to_back_ticket, fore_link):
rear_link.start()

commencement_ticket = tickets.FrontToBackPacket(
test_operation_id, 0, tickets.Kind.COMMENCEMENT, test_method,
interfaces.ServicedSubscription.Kind.FULL, None, None, _TIMEOUT)
test_operation_id, 0, tickets.FrontToBackPacket.Kind.COMMENCEMENT,
test_method, interfaces.ServicedSubscription.Kind.FULL, None, None,
_TIMEOUT)
fore_sequence_number = 1
rear_link.accept_front_to_back_ticket(commencement_ticket)
for request in scenario.requests():
continuation_ticket = tickets.FrontToBackPacket(
test_operation_id, fore_sequence_number, tickets.Kind.CONTINUATION,
None, None, None, request, None)
test_operation_id, fore_sequence_number,
tickets.FrontToBackPacket.Kind.CONTINUATION, None, None, None,
request, None)
fore_sequence_number += 1
rear_link.accept_front_to_back_ticket(continuation_ticket)
completion_ticket = tickets.FrontToBackPacket(
test_operation_id, fore_sequence_number, tickets.Kind.COMPLETION, None,
None, None, None, None)
test_operation_id, fore_sequence_number,
tickets.FrontToBackPacket.Kind.COMPLETION, None, None, None, None,
None)
fore_sequence_number += 1
rear_link.accept_front_to_back_ticket(completion_ticket)

with test_fore_link.condition:
while (not test_fore_link.tickets or
test_fore_link.tickets[-1].kind is not tickets.Kind.COMPLETION):
test_fore_link.tickets[-1].kind is not
tickets.BackToFrontPacket.Kind.COMPLETION):
test_fore_link.condition.wait()

rear_link.stop()
Expand Down
12 changes: 8 additions & 4 deletions src/python/src/grpc/_adapter/_lonely_rear_link_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,22 +76,26 @@ def _perform_lonely_client_test_with_ticket_kind(
with fore_link.condition:
while True:
if (fore_link.tickets and
fore_link.tickets[-1].kind is not packets.Kind.CONTINUATION):
fore_link.tickets[-1].kind is not
packets.BackToFrontPacket.Kind.CONTINUATION):
break
fore_link.condition.wait()

rear_link.stop()

with fore_link.condition:
self.assertIsNot(fore_link.tickets[-1].kind, packets.Kind.COMPLETION)
self.assertIsNot(
fore_link.tickets[-1].kind,
packets.BackToFrontPacket.Kind.COMPLETION)

@unittest.skip('TODO(nathaniel): This seems to have broken in the last few weeks; fix it.')
def testLonelyClientCommencementPacket(self):
self._perform_lonely_client_test_with_ticket_kind(
packets.Kind.COMMENCEMENT)
packets.FrontToBackPacket.Kind.COMMENCEMENT)

def testLonelyClientEntirePacket(self):
self._perform_lonely_client_test_with_ticket_kind(packets.Kind.ENTIRE)
self._perform_lonely_client_test_with_ticket_kind(
packets.FrontToBackPacket.Kind.ENTIRE)


if __name__ == '__main__':
Expand Down
28 changes: 15 additions & 13 deletions src/python/src/grpc/_adapter/fore.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def _on_service_acceptance_event(self, event, server):
self._response_serializers[method])

ticket = tickets.FrontToBackPacket(
call, 0, tickets.Kind.COMMENCEMENT, method,
call, 0, tickets.FrontToBackPacket.Kind.COMMENCEMENT, method,
interfaces.ServicedSubscription.Kind.FULL, None, None,
service_acceptance.deadline - time.time())
self._rear_link.accept_front_to_back_ticket(ticket)
Expand All @@ -146,13 +146,13 @@ def _on_read_event(self, event):
rpc_state.sequence_number += 1
if event.bytes is None:
ticket = tickets.FrontToBackPacket(
call, sequence_number, tickets.Kind.COMPLETION, None, None, None,
None, None)
call, sequence_number, tickets.FrontToBackPacket.Kind.COMPLETION,
None, None, None, None, None)
else:
call.read(call)
ticket = tickets.FrontToBackPacket(
call, sequence_number, tickets.Kind.CONTINUATION, None, None, None,
rpc_state.deserializer(event.bytes), None)
call, sequence_number, tickets.FrontToBackPacket.Kind.CONTINUATION,
None, None, None, rpc_state.deserializer(event.bytes), None)

self._rear_link.accept_front_to_back_ticket(ticket)

Expand Down Expand Up @@ -181,7 +181,8 @@ def _on_complete_event(self, event):
sequence_number = rpc_state.sequence_number
rpc_state.sequence_number += 1
ticket = tickets.FrontToBackPacket(
call, sequence_number, tickets.Kind.TRANSMISSION_FAILURE, None, None,
call, sequence_number,
tickets.FrontToBackPacket.Kind.TRANSMISSION_FAILURE, None, None,
None, None, None)
self._rear_link.accept_front_to_back_ticket(ticket)

Expand All @@ -200,16 +201,17 @@ def _on_finish_event(self, event):
rpc_state.sequence_number += 1
if code is _low.Code.CANCELLED:
ticket = tickets.FrontToBackPacket(
call, sequence_number, tickets.Kind.CANCELLATION, None, None, None,
None, None)
call, sequence_number, tickets.FrontToBackPacket.Kind.CANCELLATION,
None, None, None, None, None)
elif code is _low.Code.EXPIRED:
ticket = tickets.FrontToBackPacket(
call, sequence_number, tickets.Kind.EXPIRATION, None, None, None,
None, None)
call, sequence_number, tickets.FrontToBackPacket.Kind.EXPIRATION,
None, None, None, None, None)
else:
# TODO(nathaniel): Better mapping of codes to ticket-categories
ticket = tickets.FrontToBackPacket(
call, sequence_number, tickets.Kind.TRANSMISSION_FAILURE, None, None,
call, sequence_number,
tickets.FrontToBackPacket.Kind.TRANSMISSION_FAILURE, None, None,
None, None, None)
self._rear_link.accept_front_to_back_ticket(ticket)

Expand Down Expand Up @@ -351,9 +353,9 @@ def accept_back_to_front_ticket(self, ticket):
if self._server is None:
return

if ticket.kind is tickets.Kind.CONTINUATION:
if ticket.kind is tickets.BackToFrontPacket.Kind.CONTINUATION:
self._continue(ticket.operation_id, ticket.payload)
elif ticket.kind is tickets.Kind.COMPLETION:
elif ticket.kind is tickets.BackToFrontPacket.Kind.COMPLETION:
self._complete(ticket.operation_id, ticket.payload)
else:
self._cancel(ticket.operation_id)
28 changes: 14 additions & 14 deletions src/python/src/grpc/_adapter/rear.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def _on_write_event(self, operation_id, event, rpc_state):
rpc_state.active = False
ticket = tickets.BackToFrontPacket(
operation_id, rpc_state.common.sequence_number,
tickets.Kind.TRANSMISSION_FAILURE, None)
tickets.BackToFrontPacket.Kind.TRANSMISSION_FAILURE, None)
rpc_state.common.sequence_number += 1
self._fore_link.accept_back_to_front_ticket(ticket)

Expand All @@ -165,7 +165,8 @@ def _on_read_event(self, operation_id, event, rpc_state):

ticket = tickets.BackToFrontPacket(
operation_id, rpc_state.common.sequence_number,
tickets.Kind.CONTINUATION, rpc_state.common.deserializer(event.bytes))
tickets.BackToFrontPacket.Kind.CONTINUATION,
rpc_state.common.deserializer(event.bytes))
rpc_state.common.sequence_number += 1
self._fore_link.accept_back_to_front_ticket(ticket)

Expand All @@ -175,7 +176,7 @@ def _on_complete_event(self, operation_id, event, rpc_state):
rpc_state.active = False
ticket = tickets.BackToFrontPacket(
operation_id, rpc_state.common.sequence_number,
tickets.Kind.TRANSMISSION_FAILURE, None)
tickets.BackToFrontPacket.Kind.TRANSMISSION_FAILURE, None)
rpc_state.common.sequence_number += 1
self._fore_link.accept_back_to_front_ticket(ticket)

Expand All @@ -188,17 +189,16 @@ def _on_finish_event(self, operation_id, event, rpc_state):
"""Handle termination of an RPC."""
# TODO(nathaniel): Cover all statuses.
if event.status.code is _low.Code.OK:
category = tickets.Kind.COMPLETION
kind = tickets.BackToFrontPacket.Kind.COMPLETION
elif event.status.code is _low.Code.CANCELLED:
# TODO(issue 752): Use a CANCELLATION ticket kind here.
category = tickets.Kind.SERVICER_FAILURE
kind = tickets.BackToFrontPacket.Kind.SERVICER_FAILURE
elif event.status.code is _low.Code.EXPIRED:
category = tickets.Kind.EXPIRATION
kind = tickets.BackToFrontPacket.Kind.EXPIRATION
else:
category = tickets.Kind.TRANSMISSION_FAILURE
kind = tickets.BackToFrontPacket.Kind.TRANSMISSION_FAILURE
ticket = tickets.BackToFrontPacket(
operation_id, rpc_state.common.sequence_number, category,
None)
operation_id, rpc_state.common.sequence_number, kind, None)
rpc_state.common.sequence_number += 1
self._fore_link.accept_back_to_front_ticket(ticket)

Expand Down Expand Up @@ -372,17 +372,17 @@ def accept_front_to_back_ticket(self, ticket):
if self._completion_queue is None:
return

if ticket.kind is tickets.Kind.COMMENCEMENT:
if ticket.kind is tickets.FrontToBackPacket.Kind.COMMENCEMENT:
self._commence(
ticket.operation_id, ticket.name, ticket.payload, ticket.timeout)
elif ticket.kind is tickets.Kind.CONTINUATION:
elif ticket.kind is tickets.FrontToBackPacket.Kind.CONTINUATION:
self._continue(ticket.operation_id, ticket.payload)
elif ticket.kind is tickets.Kind.COMPLETION:
elif ticket.kind is tickets.FrontToBackPacket.Kind.COMPLETION:
self._complete(ticket.operation_id, ticket.payload)
elif ticket.kind is tickets.Kind.ENTIRE:
elif ticket.kind is tickets.FrontToBackPacket.Kind.ENTIRE:
self._entire(
ticket.operation_id, ticket.name, ticket.payload, ticket.timeout)
elif ticket.kind is tickets.Kind.CANCELLATION:
elif ticket.kind is tickets.FrontToBackPacket.Kind.CANCELLATION:
self._cancel(ticket.operation_id)
else:
# NOTE(nathaniel): All other categories are treated as cancellation.
Expand Down
31 changes: 18 additions & 13 deletions src/python/src/grpc/framework/base/packets/_reception.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@
from grpc.framework.base.packets import _interfaces
from grpc.framework.base.packets import packets

_INITIAL_FRONT_TO_BACK_PACKET_KINDS = (
packets.FrontToBackPacket.Kind.COMMENCEMENT,
packets.FrontToBackPacket.Kind.ENTIRE,
)


class _Receiver(object):
"""Common specification of different packet-handling behavior."""
Expand Down Expand Up @@ -151,15 +156,15 @@ def _abortive(self, packet):
A base_interfaces.Outcome value describing operation abortion if the
packet is abortive or None if the packet is not abortive.
"""
if packet.kind is packets.Kind.CANCELLATION:
if packet.kind is packets.FrontToBackPacket.Kind.CANCELLATION:
return base_interfaces.Outcome.CANCELLED
elif packet.kind is packets.Kind.EXPIRATION:
elif packet.kind is packets.FrontToBackPacket.Kind.EXPIRATION:
return base_interfaces.Outcome.EXPIRED
elif packet.kind is packets.Kind.SERVICED_FAILURE:
elif packet.kind is packets.FrontToBackPacket.Kind.SERVICED_FAILURE:
return base_interfaces.Outcome.SERVICED_FAILURE
elif packet.kind is packets.Kind.RECEPTION_FAILURE:
elif packet.kind is packets.FrontToBackPacket.Kind.RECEPTION_FAILURE:
return base_interfaces.Outcome.SERVICED_FAILURE
elif (packet.kind in (packets.Kind.COMMENCEMENT, packets.Kind.ENTIRE) and
elif (packet.kind in _INITIAL_FRONT_TO_BACK_PACKET_KINDS and
self._first_packet_seen):
return base_interfaces.Outcome.RECEPTION_FAILURE
elif self._last_packet_seen:
Expand All @@ -179,14 +184,14 @@ def receive(self, packet):
if packet.timeout is not None:
self._expiration_manager.change_timeout(packet.timeout)

if packet.kind is packets.Kind.COMMENCEMENT:
if packet.kind is packets.FrontToBackPacket.Kind.COMMENCEMENT:
self._first_packet_seen = True
self._ingestion_manager.start(packet.name)
if packet.payload is not None:
self._ingestion_manager.consume(packet.payload)
elif packet.kind is packets.Kind.CONTINUATION:
elif packet.kind is packets.FrontToBackPacket.Kind.CONTINUATION:
self._ingestion_manager.consume(packet.payload)
elif packet.kind is packets.Kind.COMPLETION:
elif packet.kind is packets.FrontToBackPacket.Kind.COMPLETION:
self._last_packet_seen = True
if packet.payload is None:
self._ingestion_manager.terminate()
Expand Down Expand Up @@ -239,11 +244,11 @@ def _abortive(self, packet):
A base_interfaces.Outcome value describing operation abortion if the
packet is abortive or None if the packet is not abortive.
"""
if packet.kind is packets.Kind.EXPIRATION:
if packet.kind is packets.BackToFrontPacket.Kind.EXPIRATION:
return base_interfaces.Outcome.EXPIRED
elif packet.kind is packets.Kind.SERVICER_FAILURE:
elif packet.kind is packets.BackToFrontPacket.Kind.SERVICER_FAILURE:
return base_interfaces.Outcome.SERVICER_FAILURE
elif packet.kind is packets.Kind.RECEPTION_FAILURE:
elif packet.kind is packets.BackToFrontPacket.Kind.RECEPTION_FAILURE:
return base_interfaces.Outcome.SERVICER_FAILURE
elif self._last_packet_seen:
return base_interfaces.Outcome.RECEPTION_FAILURE
Expand All @@ -259,9 +264,9 @@ def abort_if_abortive(self, packet):

def receive(self, packet):
"""See _Receiver.receive for specification."""
if packet.kind is packets.Kind.CONTINUATION:
if packet.kind is packets.BackToFrontPacket.Kind.CONTINUATION:
self._ingestion_manager.consume(packet.payload)
elif packet.kind is packets.Kind.COMPLETION:
elif packet.kind is packets.BackToFrontPacket.Kind.COMPLETION:
self._last_packet_seen = True
if packet.payload is None:
self._ingestion_manager.terminate()
Expand Down
Loading

0 comments on commit 758a0c2

Please sign in to comment.