Skip to content

Commit

Permalink
Beta API clean-ups
Browse files Browse the repository at this point in the history
(1) Renamed the "beta" module "implementations" because it hasn't been
monolithic since "interfaces" was factored out of it a few changes
back.

(2) Moved ChannelConnectivity from grpc.beta.beta to
grpc.beta.interfaces since it is constants that don't depend on the
beta implementation.

(3) Moved the Server interface definition from grpc.beta.beta to
grpc.beta.interfaces since it is an interface.

(4) Dropped the "create_" prefix from "create_<...>_channel" functions
to better match the other creation functions throughout the codebase.
  • Loading branch information
nathanielmanistaatgoogle committed Sep 5, 2015
1 parent 41abb05 commit f65d3c1
Show file tree
Hide file tree
Showing 13 changed files with 216 additions and 212 deletions.
16 changes: 8 additions & 8 deletions src/compiler/python_generator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ bool PrintAlphaServerFactory(const grpc::string& package_qualified_service_name,
}
out->Print("}\n");
out->Print(
"return implementations.server("
"return early_adopter_implementations.server("
"\"$PackageQualifiedServiceName$\","
" method_service_descriptions, port, private_key=private_key,"
" certificate_chain=certificate_chain)\n",
Expand Down Expand Up @@ -422,7 +422,7 @@ bool PrintAlphaStubFactory(const grpc::string& package_qualified_service_name,
}
out->Print("}\n");
out->Print(
"return implementations.stub("
"return early_adopter_implementations.stub("
"\"$PackageQualifiedServiceName$\","
" method_invocation_descriptions, host, port,"
" metadata_transformer=metadata_transformer, secure=secure,"
Expand Down Expand Up @@ -586,13 +586,13 @@ bool PrintBetaServerFactory(const grpc::string& package_qualified_service_name,
"Constructor", name_and_implementation_constructor->second);
}
out->Print("}\n");
out->Print("server_options = beta.server_options("
out->Print("server_options = beta_implementations.server_options("
"request_deserializers=request_deserializers, "
"response_serializers=response_serializers, "
"thread_pool=pool, thread_pool_size=pool_size, "
"default_timeout=default_timeout, "
"maximum_timeout=maximum_timeout)\n");
out->Print("return beta.server(method_implementations, "
out->Print("return beta_implementations.server(method_implementations, "
"options=server_options)\n");
}
return true;
Expand Down Expand Up @@ -685,13 +685,13 @@ bool PrintBetaStubFactory(const grpc::string& package_qualified_service_name,
"Cardinality", name_and_cardinality->second);
}
out->Print("}\n");
out->Print("stub_options = beta.stub_options("
out->Print("stub_options = beta_implementations.stub_options("
"host=host, metadata_transformer=metadata_transformer, "
"request_serializers=request_serializers, "
"response_deserializers=response_deserializers, "
"thread_pool=pool, thread_pool_size=pool_size)\n");
out->Print(
"return beta.dynamic_stub(channel, \'$PackageQualifiedServiceName$\', "
"return beta_implementations.dynamic_stub(channel, \'$PackageQualifiedServiceName$\', "
"cardinalities, options=stub_options)\n",
"PackageQualifiedServiceName", package_qualified_service_name);
}
Expand All @@ -701,9 +701,9 @@ bool PrintBetaStubFactory(const grpc::string& package_qualified_service_name,
bool PrintPreamble(const FileDescriptor* file,
const GeneratorConfiguration& config, Printer* out) {
out->Print("import abc\n");
out->Print("from $Package$ import beta\n",
out->Print("from $Package$ import implementations as beta_implementations\n",
"Package", config.beta_package_root);
out->Print("from $Package$ import implementations\n",
out->Print("from $Package$ import implementations as early_adopter_implementations\n",
"Package", config.early_adopter_package_root);
out->Print("from grpc.framework.alpha import utilities as alpha_utilities\n");
out->Print("from grpc.framework.common import cardinality\n");
Expand Down
16 changes: 12 additions & 4 deletions src/python/grpcio/grpc/beta/_connectivity_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,24 @@
import time

from grpc._adapter import _low
from grpc._adapter import _types
from grpc.beta import interfaces
from grpc.framework.foundation import callable_util

_CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = (
'Exception calling channel subscription callback!')

_LOW_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY = {
state: connectivity for state, connectivity in zip(
_types.ConnectivityState, interfaces.ChannelConnectivity)
}


class ConnectivityChannel(object):

def __init__(self, low_channel, mapping):
def __init__(self, low_channel):
self._lock = threading.Lock()
self._low_channel = low_channel
self._mapping = mapping

self._polling = False
self._connectivity = None
Expand Down Expand Up @@ -88,7 +94,8 @@ def _poll_connectivity(self, low_channel, initial_try_to_connect):
try_to_connect = initial_try_to_connect
low_connectivity = low_channel.check_connectivity_state(try_to_connect)
with self._lock:
self._connectivity = self._mapping[low_connectivity]
self._connectivity = _LOW_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[
low_connectivity]
callbacks = tuple(
callback for callback, unused_but_known_to_be_none_connectivity
in self._callbacks_and_connectivities)
Expand All @@ -112,7 +119,8 @@ def _poll_connectivity(self, low_channel, initial_try_to_connect):
if event.success or try_to_connect:
low_connectivity = low_channel.check_connectivity_state(try_to_connect)
with self._lock:
self._connectivity = self._mapping[low_connectivity]
self._connectivity = _LOW_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[
low_connectivity]
if not self._delivering:
callbacks = self._deliveries(self._connectivity)
if callbacks:
Expand Down
6 changes: 3 additions & 3 deletions src/python/grpcio/grpc/beta/_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def _disassemble(grpc_link, end_link, pool, event, grace):
event.set()


class Server(object):
class Server(interfaces.Server):

def __init__(self, grpc_link, end_link, pool):
self._grpc_link = grpc_link
Expand All @@ -82,9 +82,9 @@ def __init__(self, grpc_link, end_link, pool):
def add_insecure_port(self, address):
return self._grpc_link.add_port(address, None)

def add_secure_port(self, address, intermediary_low_server_credentials):
def add_secure_port(self, address, server_credentials):
return self._grpc_link.add_port(
address, intermediary_low_server_credentials)
address, server_credentials._intermediary_low_credentials) # pylint: disable=protected-access

def start(self):
self._grpc_link.join_link(self._end_link)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,39 +40,14 @@
from grpc.beta import _connectivity_channel
from grpc.beta import _server
from grpc.beta import _stub
from grpc.beta import interfaces
from grpc.framework.common import cardinality # pylint: disable=unused-import
from grpc.framework.interfaces.face import face # pylint: disable=unused-import

_CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = (
'Exception calling channel subscription callback!')


@enum.unique
class ChannelConnectivity(enum.Enum):
"""Mirrors grpc_connectivity_state in the gRPC Core.
Attributes:
IDLE: The channel is idle.
CONNECTING: The channel is connecting.
READY: The channel is ready to conduct RPCs.
TRANSIENT_FAILURE: The channel has seen a failure from which it expects to
recover.
FATAL_FAILURE: The channel has seen a failure from which it cannot recover.
"""

IDLE = (_types.ConnectivityState.IDLE, 'idle',)
CONNECTING = (_types.ConnectivityState.CONNECTING, 'connecting',)
READY = (_types.ConnectivityState.READY, 'ready',)
TRANSIENT_FAILURE = (
_types.ConnectivityState.TRANSIENT_FAILURE, 'transient failure',)
FATAL_FAILURE = (_types.ConnectivityState.FATAL_FAILURE, 'fatal failure',)

_LOW_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY = {
state: connectivity for state, connectivity in zip(
_types.ConnectivityState, ChannelConnectivity)
}


class ClientCredentials(object):
"""A value encapsulating the data required to create a secure Channel.
Expand Down Expand Up @@ -118,13 +93,14 @@ def __init__(self, low_channel, intermediary_low_channel):
self._low_channel = low_channel
self._intermediary_low_channel = intermediary_low_channel
self._connectivity_channel = _connectivity_channel.ConnectivityChannel(
low_channel, _LOW_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY)
low_channel)

def subscribe(self, callback, try_to_connect=None):
"""Subscribes to this Channel's connectivity.
Args:
callback: A callable to be invoked and passed this Channel's connectivity.
callback: A callable to be invoked and passed an
interfaces.ChannelConnectivity identifying this Channel's connectivity.
The callable will be invoked immediately upon subscription and again for
every change to this Channel's connectivity thereafter until it is
unsubscribed.
Expand All @@ -144,7 +120,7 @@ def unsubscribe(self, callback):
self._connectivity_channel.unsubscribe(callback)


def create_insecure_channel(host, port):
def insecure_channel(host, port):
"""Creates an insecure Channel to a remote host.
Args:
Expand All @@ -159,7 +135,7 @@ def create_insecure_channel(host, port):
return Channel(intermediary_low_channel._internal, intermediary_low_channel) # pylint: disable=protected-access


def create_secure_channel(host, port, client_credentials):
def secure_channel(host, port, client_credentials):
"""Creates a secure Channel to a remote host.
Args:
Expand Down Expand Up @@ -313,86 +289,6 @@ def ssl_server_credentials(
intermediary_low_credentials._internal, intermediary_low_credentials) # pylint: disable=protected-access


class Server(object):
"""Services RPCs."""
__metaclass__ = abc.ABCMeta

@abc.abstractmethod
def add_insecure_port(self, address):
"""Reserves a port for insecure RPC service once this Server becomes active.
This method may only be called before calling this Server's start method is
called.
Args:
address: The address for which to open a port.
Returns:
An integer port on which RPCs will be serviced after this link has been
started. This is typically the same number as the port number contained
in the passed address, but will likely be different if the port number
contained in the passed address was zero.
"""
raise NotImplementedError()

@abc.abstractmethod
def add_secure_port(self, address, server_credentials):
"""Reserves a port for secure RPC service after this Server becomes active.
This method may only be called before calling this Server's start method is
called.
Args:
address: The address for which to open a port.
server_credentials: A ServerCredentials.
Returns:
An integer port on which RPCs will be serviced after this link has been
started. This is typically the same number as the port number contained
in the passed address, but will likely be different if the port number
contained in the passed address was zero.
"""
raise NotImplementedError()

@abc.abstractmethod
def start(self):
"""Starts this Server's service of RPCs.
This method may only be called while the server is not serving RPCs (i.e. it
is not idempotent).
"""
raise NotImplementedError()

@abc.abstractmethod
def stop(self, grace):
"""Stops this Server's service of RPCs.
All calls to this method immediately stop service of new RPCs. When existing
RPCs are aborted is controlled by the grace period parameter passed to this
method.
This method may be called at any time and is idempotent. Passing a smaller
grace value than has been passed in a previous call will have the effect of
stopping the Server sooner. Passing a larger grace value than has been
passed in a previous call will not have the effect of stopping the sooner
later.
Args:
grace: A duration of time in seconds to allow existing RPCs to complete
before being aborted by this Server's stopping. May be zero for
immediate abortion of all in-progress RPCs.
Returns:
A threading.Event that will be set when this Server has completely
stopped. The returned event may not be set until after the full grace
period (if some ongoing RPC continues for the full length of the period)
of it may be set much sooner (such as if this Server had no RPCs underway
at the time it was stopped or if all RPCs that it had underway completed
very early in the grace period).
"""
raise NotImplementedError()


class ServerOptions(object):
"""A value encapsulating the various options for creation of a Server.
Expand Down Expand Up @@ -450,27 +346,8 @@ def server_options(
thread_pool, thread_pool_size, default_timeout, maximum_timeout)


class _Server(Server):

def __init__(self, underserver):
self._underserver = underserver

def add_insecure_port(self, address):
return self._underserver.add_insecure_port(address)

def add_secure_port(self, address, server_credentials):
return self._underserver.add_secure_port(
address, server_credentials._intermediary_low_credentials) # pylint: disable=protected-access

def start(self):
self._underserver.start()

def stop(self, grace):
return self._underserver.stop(grace)


def server(service_implementations, options=None):
"""Creates a Server with which RPCs can be serviced.
"""Creates an interfaces.Server with which RPCs can be serviced.
Args:
service_implementations: A dictionary from service name-method name pair to
Expand All @@ -479,13 +356,12 @@ def server(service_implementations, options=None):
functionality of the returned Server.
Returns:
A Server with which RPCs can be serviced.
An interfaces.Server with which RPCs can be serviced.
"""
effective_options = _EMPTY_SERVER_OPTIONS if options is None else options
underserver = _server.server(
return _server.server(
service_implementations, effective_options.multi_method_implementation,
effective_options.request_deserializers,
effective_options.response_serializers, effective_options.thread_pool,
effective_options.thread_pool_size, effective_options.default_timeout,
effective_options.maximum_timeout)
return _Server(underserver)
Loading

0 comments on commit f65d3c1

Please sign in to comment.