Skip to content

Commit

Permalink
Merge pull request grpc#3231 from nathanielmanistaatgoogle/protocol-o…
Browse files Browse the repository at this point in the history
…bjects

Initial work on protocol objects
  • Loading branch information
soltanmm committed Sep 4, 2015
2 parents 79e042d + d2aa1cf commit 4e3a113
Show file tree
Hide file tree
Showing 11 changed files with 217 additions and 120 deletions.
8 changes: 4 additions & 4 deletions src/python/grpcio/grpc/framework/core/_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def stop(self, grace):

def operate(
self, group, method, subscription, timeout, initial_metadata=None,
payload=None, completion=None):
payload=None, completion=None, protocol_options=None):
"""See base.End.operate for specification."""
operation_id = uuid.uuid4()
with self._lock:
Expand All @@ -177,9 +177,9 @@ def operate(
termination_action = _termination_action(
self._lock, self._stats, operation_id, self._cycle)
operation = _operation.invocation_operate(
operation_id, group, method, subscription, timeout, initial_metadata,
payload, completion, self._mate.accept_ticket, termination_action,
self._cycle.pool)
operation_id, group, method, subscription, timeout, protocol_options,
initial_metadata, payload, completion, self._mate.accept_ticket,
termination_action, self._cycle.pool)
self._cycle.operations[operation_id] = operation
return operation.context, operation.operator

Expand Down
4 changes: 2 additions & 2 deletions src/python/grpcio/grpc/framework/core/_interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ class TransmissionManager(object):

@abc.abstractmethod
def kick_off(
self, group, method, timeout, initial_metadata, payload, completion,
allowance):
self, group, method, timeout, protocol_options, initial_metadata,
payload, completion, allowance):
"""Transmits the values associated with operation invocation."""
raise NotImplementedError()

Expand Down
10 changes: 7 additions & 3 deletions src/python/grpcio/grpc/framework/core/_operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,9 @@ def abort(self, outcome_kind):


def invocation_operate(
operation_id, group, method, subscription, timeout, initial_metadata,
payload, completion, ticket_sink, termination_action, pool):
operation_id, group, method, subscription, timeout, protocol_options,
initial_metadata, payload, completion, ticket_sink, termination_action,
pool):
"""Constructs objects necessary for front-side operation management.
Args:
Expand All @@ -95,6 +96,8 @@ def invocation_operate(
subscription: A base.Subscription describing the customer's interest in the
results of the operation.
timeout: A length of time in seconds to allow for the operation.
protocol_options: A transport-specific, application-specific, and/or
protocol-specific value relating to the invocation. May be None.
initial_metadata: An initial metadata value to be sent to the other side of
the operation. May be None if the initial metadata will be passed later or
if there will be no initial metadata passed at all.
Expand Down Expand Up @@ -136,7 +139,8 @@ def invocation_operate(
emission_manager.set_ingestion_manager(ingestion_manager)

transmission_manager.kick_off(
group, method, timeout, initial_metadata, payload, completion, None)
group, method, timeout, protocol_options, initial_metadata, payload,
completion, None)

return _EasyOperation(
lock, termination_manager, transmission_manager, expiration_manager,
Expand Down
7 changes: 4 additions & 3 deletions src/python/grpcio/grpc/framework/core/_transmission.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,18 +207,19 @@ def transmit(ticket):
self._transmitting = True

def kick_off(
self, group, method, timeout, initial_metadata, payload, completion,
allowance):
self, group, method, timeout, protocol_options, initial_metadata,
payload, completion, allowance):
"""See _interfaces.TransmissionManager.kickoff for specification."""
# TODO(nathaniel): Support other subscriptions.
subscription = links.Ticket.Subscription.FULL
terminal_metadata, code, message, termination = _explode_completion(
completion)
self._remote_allowance = 1 if payload is None else 0
protocol = links.Protocol(links.Protocol.Kind.CALL_OPTION, protocol_options)
ticket = links.Ticket(
self._operation_id, 0, group, method, subscription, timeout, allowance,
initial_metadata, payload, terminal_metadata, code, message,
termination, None)
termination, protocol)
self._lowest_unused_sequence_number = 1
self._transmit(ticket)

Expand Down
71 changes: 44 additions & 27 deletions src/python/grpcio/grpc/framework/crust/_calls.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,14 @@
_EMPTY_COMPLETION = utilities.completion(None, None, None)


def _invoke(end, group, method, timeout, initial_metadata, payload, complete):
def _invoke(
end, group, method, timeout, protocol_options, initial_metadata, payload,
complete):
rendezvous = _control.Rendezvous(None, None)
operation_context, operator = end.operate(
group, method, utilities.full_subscription(rendezvous), timeout,
initial_metadata=initial_metadata, payload=payload,
completion=_EMPTY_COMPLETION if complete else None)
protocol_options=protocol_options, initial_metadata=initial_metadata,
payload=payload, completion=_EMPTY_COMPLETION if complete else None)
rendezvous.set_operator_and_context(operator, operation_context)
outcome = operation_context.add_termination_callback(rendezvous.set_outcome)
if outcome is not None:
Expand Down Expand Up @@ -93,36 +95,43 @@ def in_pool():


def blocking_unary_unary(
end, group, method, timeout, with_call, initial_metadata, payload):
end, group, method, timeout, with_call, protocol_options, initial_metadata,
payload):
"""Services in a blocking fashion a unary-unary servicer method."""
rendezvous, unused_operation_context, unused_outcome = _invoke(
end, group, method, timeout, initial_metadata, payload, True)
end, group, method, timeout, protocol_options, initial_metadata, payload,
True)
if with_call:
return next(rendezvous), rendezvous
else:
return next(rendezvous)


def future_unary_unary(end, group, method, timeout, initial_metadata, payload):
def future_unary_unary(
end, group, method, timeout, protocol_options, initial_metadata, payload):
"""Services a value-in value-out servicer method by returning a Future."""
rendezvous, unused_operation_context, unused_outcome = _invoke(
end, group, method, timeout, initial_metadata, payload, True)
end, group, method, timeout, protocol_options, initial_metadata, payload,
True)
return rendezvous


def inline_unary_stream(end, group, method, timeout, initial_metadata, payload):
def inline_unary_stream(
end, group, method, timeout, protocol_options, initial_metadata, payload):
"""Services a value-in stream-out servicer method."""
rendezvous, unused_operation_context, unused_outcome = _invoke(
end, group, method, timeout, initial_metadata, payload, True)
end, group, method, timeout, protocol_options, initial_metadata, payload,
True)
return rendezvous


def blocking_stream_unary(
end, group, method, timeout, with_call, initial_metadata, payload_iterator,
pool):
end, group, method, timeout, with_call, protocol_options, initial_metadata,
payload_iterator, pool):
"""Services in a blocking fashion a stream-in value-out servicer method."""
rendezvous, operation_context, outcome = _invoke(
end, group, method, timeout, initial_metadata, None, False)
end, group, method, timeout, protocol_options, initial_metadata, None,
False)
if outcome is None:
def in_pool():
for payload in payload_iterator:
Expand All @@ -141,10 +150,12 @@ def in_pool():


def future_stream_unary(
end, group, method, timeout, initial_metadata, payload_iterator, pool):
end, group, method, timeout, protocol_options, initial_metadata,
payload_iterator, pool):
"""Services a stream-in value-out servicer method by returning a Future."""
rendezvous, operation_context, outcome = _invoke(
end, group, method, timeout, initial_metadata, None, False)
end, group, method, timeout, protocol_options, initial_metadata, None,
False)
if outcome is None:
def in_pool():
for payload in payload_iterator:
Expand All @@ -155,10 +166,12 @@ def in_pool():


def inline_stream_stream(
end, group, method, timeout, initial_metadata, payload_iterator, pool):
end, group, method, timeout, protocol_options, initial_metadata,
payload_iterator, pool):
"""Services a stream-in stream-out servicer method."""
rendezvous, operation_context, outcome = _invoke(
end, group, method, timeout, initial_metadata, None, False)
end, group, method, timeout, protocol_options, initial_metadata, None,
False)
if outcome is None:
def in_pool():
for payload in payload_iterator:
Expand All @@ -169,36 +182,40 @@ def in_pool():


def event_unary_unary(
end, group, method, timeout, initial_metadata, payload, receiver,
abortion_callback, pool):
end, group, method, timeout, protocol_options, initial_metadata, payload,
receiver, abortion_callback, pool):
rendezvous, operation_context, outcome = _invoke(
end, group, method, timeout, initial_metadata, payload, True)
end, group, method, timeout, protocol_options, initial_metadata, payload,
True)
return _event_return_unary(
receiver, abortion_callback, rendezvous, operation_context, outcome, pool)


def event_unary_stream(
end, group, method, timeout, initial_metadata, payload,
end, group, method, timeout, protocol_options, initial_metadata, payload,
receiver, abortion_callback, pool):
rendezvous, operation_context, outcome = _invoke(
end, group, method, timeout, initial_metadata, payload, True)
end, group, method, timeout, protocol_options, initial_metadata, payload,
True)
return _event_return_stream(
receiver, abortion_callback, rendezvous, operation_context, outcome, pool)


def event_stream_unary(
end, group, method, timeout, initial_metadata, receiver, abortion_callback,
pool):
end, group, method, timeout, protocol_options, initial_metadata, receiver,
abortion_callback, pool):
rendezvous, operation_context, outcome = _invoke(
end, group, method, timeout, initial_metadata, None, False)
end, group, method, timeout, protocol_options, initial_metadata, None,
False)
return _event_return_unary(
receiver, abortion_callback, rendezvous, operation_context, outcome, pool)


def event_stream_stream(
end, group, method, timeout, initial_metadata, receiver, abortion_callback,
pool):
end, group, method, timeout, protocol_options, initial_metadata, receiver,
abortion_callback, pool):
rendezvous, operation_context, outcome = _invoke(
end, group, method, timeout, initial_metadata, None, False)
end, group, method, timeout, protocol_options, initial_metadata, None,
False)
return _event_return_stream(
receiver, abortion_callback, rendezvous, operation_context, outcome, pool)
4 changes: 4 additions & 0 deletions src/python/grpcio/grpc/framework/crust/_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,10 @@ def add_abortion_callback(self, abortion_callback):
else:
return self._termination.abortion

def protocol_context(self):
with self._condition:
raise NotImplementedError('TODO: protocol context implementation!')

def initial_metadata(self):
with self._condition:
while True:
Expand Down
3 changes: 3 additions & 0 deletions src/python/grpcio/grpc/framework/crust/_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ def add_abortion_callback(self, abortion_callback):
def cancel(self):
self._rendezvous.cancel()

def protocol_context(self):
return self._rendezvous.protocol_context()

def invocation_metadata(self):
return self._rendezvous.initial_metadata()

Expand Down
Loading

0 comments on commit 4e3a113

Please sign in to comment.