diff --git a/docs/core/howto/endpoints.rst b/docs/core/howto/endpoints.rst index da3f2a288fb..43db9ab0890 100644 --- a/docs/core/howto/endpoints.rst +++ b/docs/core/howto/endpoints.rst @@ -99,6 +99,94 @@ implemented like this: .. note:: If you've used ``ClientFactory`` before, keep in mind that the ``connect`` method takes a ``Factory``, not a ``ClientFactory``. Even if you pass a ``ClientFactory`` to ``endpoint.connect``, its ``clientConnectionFailed`` and ``clientConnectionLost`` methods will not be called. + In particular, clients that extend ``ReconnectingClientFactory`` won't reconnect. The next section describes how to set up reconnecting clients on endpoints. + + +Persistent Client Connections +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +:api:`twisted.application.internet.ClientService` can maintain a persistent outgoing connection to a server which can be started and stopped along with your application. + +One popular protocol to maintain a long-lived client connection to is IRC, so for an example of ``ClientService``, here's how you would make a long-lived encrypted connection to an IRC server (other details, like how to authenticate, omitted for brevity): + +.. code-block:: python + + from twisted.internet.protocol import Factory + from twisted.internet.endpoints import clientFromString + from twisted.words.protocols.irc import IRCClient + from twisted.application.internet import ClientService + from twisted.internet import reactor + + myEndpoint = clientFromString(reactor, "tls:example.com:6997") + myFactory = Factory.forProtocol(IRCClient) + + myReconnectingService = ClientService(myEndpoint, myFactory) + +If you already have a parent service, you can add the reconnecting service as a child service: + +.. code-block:: python + + parentService.addService(myReconnectingService) + +If you do not have a parent service, you can start and stop the reconnecting service using its ``startService`` and ``stopService`` methods. + +``ClientService.stopService`` returns a ``Deferred`` that fires once the current connection closes or the current connection attempt is cancelled. + + +Getting The Active Client +------------------------- + +When maintaining a long-lived connection, it's often useful to be able to get the current connection (if the connection is active) or wait for the next connection (if a connection attempt is currently in progress). +For example, we might want to pass our ``ClientService`` from the previous example to some code that can send IRC notifications in response to some external event. +The ``ClientService.whenConnected`` method returns a ``Deferred`` that fires with the next available ``Protocol`` instance. +You can use it like so: + +.. code-block:: python + + waitForConnection = myReconnectingService.whenConnected() + def connectedNow(clientForIRC): + clientForIRC.say("#bot-test", "hello, world!") + waitForConnection.addCallback(connectedNow) + +Keep in mind that you may need to wrap this up for your particular application, since when no existing connection is available, the callback is executed just as soon as the connection is established. +For example, that little snippet is slightly oversimplified: at the time ``connectedNow`` is run, the bot hasn't authenticated or joined the channel yet, so its message will be refused. +A real-life IRC bot would need to have its own method for waiting until the connection is fully ready for chat before chatting. + +Retry Policies +-------------- + +``ClientService`` will immediately attempt an outgoing connection when ``startService`` is called. +If that connection attempt fails for any reason (name resolution, connection refused, network unreachable, and so on), it will retry according to the policy specified in the ``retryPolicy`` constructor argument. +By default, ``ClientService`` will use an exponential backoff algorithm with a minimum delay of 1 second and a maximum delay of 1 minute, and a jitter of up to 1 additional second to prevent stampeding-herd performance cascades. +This is a good default, and if you do not have highly specialized requirements, you probably want to use it. +If you need to tune these parameters, you have two options: + +1. You can pass your own timeout policy to ``ClientService``'s constructor. + A timeout policy is a callable that takes the number of failed attempts, and computes a delay until the next connection attempt. + So, for example, if you are *really really sure* that you want to reconnect *every single second* if the service you are talking to goes down, you can do this: + + .. code-block:: python + + myReconnectingService = ClientService(myEndpoint, myFactory, retryPolicy=lambda ignored: 1) + + Of course, unless you have only one client and only one server and they're both on localhost, this sort of policy is likely to cause massive performance degradation and thundering herd resource contention in the event of your server's failure, so you probably want to take the second option... + +2. You can tweak the default exponential backoff policy with a few parameters by passing the result of :api:`twisted.application.internet.backoffPolicy` to the ``retryPolicy`` argument. + For example, if you want to make it triple the delay between attempts, but start with a faster connection interval (half a second instead of one second), you could do it like so: + + .. code-block:: python + + myReconnectingService = ClientService( + myEndpoint, myFactory, + retryPolicy=backoffPolicy(initialDelay=0.5, factor=3.0) + ) + +.. note:: + + Before endpoints, reconnecting clients were created as subclasses of ``ReconnectingClientFactory``. + These subclasses were required to call ``resetDelay``. + One of the many advantages of using endpoints is that these special subclasses are no longer needed. + ``ClientService`` accepts ordinary ``IProtocolFactory`` providers. Maximizing the Return on your Endpoint Investment diff --git a/twisted/application/internet.py b/twisted/application/internet.py index 8484370ea6c..e6a817f8495 100644 --- a/twisted/application/internet.py +++ b/twisted/application/internet.py @@ -39,10 +39,17 @@ from __future__ import absolute_import, division +from random import random as _goodEnoughRandom + from twisted.python import log +from twisted.logger import Logger + from twisted.application import service from twisted.internet import task -from twisted.internet.defer import CancelledError +from twisted.python.failure import Failure +from twisted.internet.defer import ( + CancelledError, gatherResults, Deferred, succeed, fail +) @@ -392,8 +399,291 @@ def stop(passthrough): +class _ReconnectingProtocolProxy(object): + """ + A proxy for a Protocol to provide connectionLost notification to a client + connection service, in support of reconnecting when connections are lost. + """ + + def __init__(self, protocol, lostNotification): + """ + Create a L{_ReconnectingProtocolProxy}. + + @param protocol: the application-provided L{interfaces.IProtocol} + provider. + @type protocol: provider of L{interfaces.IProtocol} which may + additionally provide L{interfaces.IHalfCloseableProtocol} and + L{interfaces.IFileDescriptorReceiver}. + + @param lostNotification: a 1-argument callable to invoke with the + C{reason} when the connection is lost. + """ + self._protocol = protocol + self._lostNotification = lostNotification + + + def connectionLost(self, reason): + """ + The connection was lost. Relay this information. + + @param reason: The reason the connection was lost. + + @return: the underlying protocol's result + """ + try: + return self._protocol.connectionLost(reason) + finally: + self._lostNotification(reason) + + + def __getattr__(self, item): + return getattr(self._protocol, item) + + + def __repr__(self): + return '<%s wrapping %r>' % ( + self.__class__.__name__, self._protocol) + + + +class _DisconnectFactory(object): + """ + A L{_DisconnectFactory} is a proxy for L{IProtocolFactory} that catches + C{connectionLost} notifications and relays them. + """ + + def __init__(self, protocolFactory, protocolDisconnected): + self._protocolFactory = protocolFactory + self._protocolDisconnected = protocolDisconnected + + + def buildProtocol(self, addr): + """ + Create a L{_ReconnectingProtocolProxy} with the disconnect-notification + callback we were called with. + + @param addr: The address the connection is coming from. + + @return: a L{_ReconnectingProtocolProxy} for a protocol produced by + C{self._protocolFactory} + """ + return _ReconnectingProtocolProxy( + self._protocolFactory.buildProtocol(addr), + self._protocolDisconnected + ) + + + def __getattr__(self, item): + return getattr(self._protocolFactory, item) + + + def __repr__(self): + return '<%s wrapping %r>' % ( + self.__class__.__name__, self._protocolFactory) + + + +def backoffPolicy(initialDelay=1.0, maxDelay=60.0, factor=1.5, + jitter=_goodEnoughRandom): + """ + A timeout policy for L{ClientService} which computes an exponential backoff + interval with configurable parameters. + + @since: 16.1.0 + + @param initialDelay: Delay for the first reconnection attempt (default + 1.0s). + @type initialDelay: L{float} + + @param maxDelay: Maximum number of seconds between connection attempts + (default 60 seconds, or one minute). Note that this value is before + jitter is applied, so the actual maximum possible delay is this value + plus the maximum possible result of C{jitter()}. + @type maxDelay: L{float} + + @param factor: A multiplicative factor by which the delay grows on each + failed reattempt. Default: 1.5. + @type factor: L{float} + + @param jitter: A 0-argument callable that introduces noise into the delay. + By default, C{random.random}, i.e. a pseudorandom floating-point value + between zero and one. + @type jitter: 0-argument callable returning L{float} + + @return: a 1-argument callable that, given an attempt count, returns a + floating point number; the number of seconds to delay. + @rtype: see L{ClientService.__init__}'s C{retryPolicy} argument. + """ + def policy(attempt): + return min(initialDelay * (factor ** attempt), maxDelay) + jitter() + return policy + +_defaultPolicy = backoffPolicy() + + + +def _noop(): + """ + Do nothing; this stands in for C{transport.loseConnection()} and + C{DelayedCall.cancel()} when L{ClientService} is in a state where there's + nothing to do. + """ + + + +class ClientService(service.Service, object): + """ + A L{ClientService} maintains a single outgoing connection to a client + endpoint, reconnecting after a configurable timeout when a connection + fails, either before or after connecting. + + @since: 16.1.0 + """ + + _log = Logger() + + def __init__(self, endpoint, factory, retryPolicy=None, clock=None): + """ + @param endpoint: A L{stream client endpoint + } provider which will be used to + connect when the service starts. + + @param factory: A L{protocol factory } + which will be used to create clients for the endpoint. + + @param retryPolicy: A policy configuring how long L{ClientService} will + wait between attempts to connect to C{endpoint}. + @type retryPolicy: callable taking (the number of failed connection + attempts made in a row (L{int})) and returning the number of + seconds to wait before making another attempt. + + @param clock: The clock used to schedule reconnection. It's mainly + useful to be parametrized in tests. If the factory is serialized, + this attribute will not be serialized, and the default value (the + reactor) will be restored when deserialized. + @type clock: L{IReactorTime} + """ + clock = _maybeGlobalReactor(clock) + retryPolicy = _defaultPolicy if retryPolicy is None else retryPolicy + + self._endpoint = endpoint + self._failedAttempts = 0 + self._stopped = False + self._factory = factory + self._timeoutForAttempt = retryPolicy + self._clock = clock + self._stopRetry = _noop + self._lostDeferred = succeed(None) + self._connectionInProgress = succeed(None) + self._loseConnection = _noop + + self._currentConnection = None + self._awaitingConnected = [] + + + def whenConnected(self): + """ + Retrieve the currently-connected L{Protocol}, or the next one to + connect. + + @return: a Deferred that fires with a protocol produced by the factory + passed to C{__init__} + @rtype: L{Deferred} firing with L{IProtocol} or failing with + L{CancelledError} the service is stopped. + """ + if self._currentConnection is not None: + return succeed(self._currentConnection) + elif self._stopped: + return fail(CancelledError()) + else: + result = Deferred() + self._awaitingConnected.append(result) + return result + + + def _unawait(self, value): + """ + Fire all outstanding L{ClientService.whenConnected} L{Deferred}s. + + @param value: the value to fire the L{Deferred}s with. + """ + self._awaitingConnected, waiting = [], self._awaitingConnected + for w in waiting: + w.callback(value) + + + def startService(self): + """ + Start this L{ClientService}, initiating the connection retry loop. + """ + if self.running: + self._log.warn("Duplicate ClientService.startService {log_source}") + return + super(ClientService, self).startService() + self._failedAttempts = 0 + + def connectNow(): + thisLostDeferred = Deferred() + + def clientConnect(protocol): + self._failedAttempts = 0 + self._loseConnection = protocol.transport.loseConnection + self._lostDeferred = thisLostDeferred + self._currentConnection = protocol._protocol + self._unawait(self._currentConnection) + + def clientDisconnect(reason): + self._currentConnection = None + self._loseConnection = _noop + thisLostDeferred.callback(None) + retry(reason) + + factoryProxy = _DisconnectFactory(self._factory, clientDisconnect) + + self._stopRetry = _noop + self._connectionInProgress = (self._endpoint.connect(factoryProxy) + .addCallback(clientConnect) + .addErrback(retry)) + + def retry(failure): + if not self.running: + return + self._failedAttempts += 1 + delay = self._timeoutForAttempt(self._failedAttempts) + self._log.info("Scheduling retry {attempt} to connect {endpoint} " + "in {delay} seconds.", attempt=self._failedAttempts, + endpoint=self._endpoint, delay=delay) + self._stopRetry = self._clock.callLater(delay, connectNow).cancel + + connectNow() + + + def stopService(self): + """ + Stop attempting to reconnect and close any existing connections. + + @return: a L{Deferred} that fires when all outstanding connections are + closed and all in-progress connection attempts halted. + """ + super(ClientService, self).stopService() + self._stopRetry() + self._stopRetry = _noop + self._connectionInProgress.cancel() + self._loseConnection() + self._currentConnection = None + def finishStopping(result): + if not self.running: + self._stopped = True + self._unawait(Failure(CancelledError())) + return None + return (gatherResults([self._connectionInProgress, self._lostDeferred]) + .addBoth(finishStopping)) + + + __all__ = (['TimerService', 'CooperatorService', 'MulticastServer', - 'StreamServerEndpointService', 'UDPServer'] + - [tran+side + 'StreamServerEndpointService', 'UDPServer', + 'ClientService'] + + [tran + side for tran in 'TCP UNIX SSL UNIXDatagram'.split() for side in 'Server Client'.split()]) diff --git a/twisted/application/test/test_internet.py b/twisted/application/test/test_internet.py index 9242e4c1a29..2e1e58f6566 100644 --- a/twisted/application/test/test_internet.py +++ b/twisted/application/test/test_internet.py @@ -3,6 +3,9 @@ """ Tests for (new code in) L{twisted.application.internet}. + +@var AT_LEAST_ONE_ATTEMPT: At least enough seconds for L{ClientService} to make + one attempt. """ from __future__ import absolute_import, division @@ -12,15 +15,21 @@ from zope.interface import implementer from zope.interface.verify import verifyClass -from twisted.internet.protocol import Factory -from twisted.trial.unittest import TestCase +from twisted.internet.protocol import Factory, Protocol +from twisted.internet.task import Clock +from twisted.trial.unittest import TestCase, SynchronousTestCase from twisted.application import internet from twisted.application.internet import ( - StreamServerEndpointService, TimerService) -from twisted.internet.interfaces import IStreamServerEndpoint, IListeningPort + StreamServerEndpointService, TimerService, ClientService) from twisted.internet.defer import Deferred, CancelledError +from twisted.internet.interfaces import ( + IStreamServerEndpoint, IStreamClientEndpoint, IListeningPort, + IHalfCloseableProtocol, IFileDescriptorReceiver +) from twisted.internet import task from twisted.python.failure import Failure +from twisted.logger import globalLogPublisher, formatEvent +from twisted.test.proto_helpers import StringTransport def fakeTargetFunction(): @@ -66,6 +75,10 @@ def listen(self, factory): """ Return a Deferred and store it for future use. (Implementation of L{IStreamServerEndpoint}). + + @param factory: the factory to listen with + + @return: a L{Deferred} stored in L{FakeServer.result} """ self.listenAttempts += 1 self.factory = factory @@ -108,6 +121,11 @@ class FakePort(object): deferred = None def stopListening(self): + """ + Stop listening. + + @return: a L{Deferred} stored in L{FakePort.deferred} + """ self.deferred = Deferred() return self.deferred @@ -149,8 +167,11 @@ def test_synchronousRaiseRaisesSynchronously(self, thunk=None): behavior of L{twisted.internet.strports.service}, which is to return a service which synchronously raises an exception from C{startService} (so that, among other things, twistd will not start running). However, - since L{IStreamServerEndpoint.listen} may fail asynchronously, it is - a bad idea to rely on this behavior. + since L{IStreamServerEndpoint.listen} may fail asynchronously, it is a + bad idea to rely on this behavior. + + @param thunk: If specified, a callable to execute in place of + C{startService}. """ self.fakeServer.failImmediately = ZeroDivisionError() self.svc._raiseSynchronously = True @@ -399,3 +420,393 @@ def test_pickleTimerServiceNotPickleLoopFinished(self): nothing = object() value = getattr(loadedTimer, "_loopFinished", nothing) self.assertIdentical(nothing, value) + + + +class ConnectInformation(object): + """ + Information about C{endpointForTesting} + + @ivar connectQueue: a L{list} of L{Deferred} returned from C{connect}. If + these are not already fired, you can fire them with no value and they + will trigger building a factory. + + @ivar constructedProtocols: a L{list} of protocols constructed. + + @ivar passedFactories: a L{list} of L{IProtocolFactory}; the ones actually + passed to the underlying endpoint / i.e. the reactor. + """ + def __init__(self): + self.connectQueue = [] + self.constructedProtocols = [] + self.passedFactories = [] + + + +def endpointForTesting(fireImmediately=False): + """ + Make a sample endpoint for testing. + + @param fireImmediately: If true, fire all L{Deferred}s returned from + C{connect} immedaitely. + + @return: a 2-tuple of C{(information, endpoint)}, where C{information} is a + L{ConnectInformation} describing the operations in progress on + C{endpoint}. + """ + @implementer(IStreamClientEndpoint) + class ClientTestEndpoint(object): + def connect(self, factory): + result = Deferred() + info.passedFactories.append(factory) + @result.addCallback + def createProtocol(ignored): + protocol = factory.buildProtocol(None) + info.constructedProtocols.append(protocol) + transport = StringTransport() + protocol.makeConnection(transport) + return protocol + info.connectQueue.append(result) + if fireImmediately: + result.callback(None) + return result + info = ConnectInformation() + return info, ClientTestEndpoint() + + + +def catchLogs(testCase, logPublisher=globalLogPublisher): + """ + Catch the global log stream. + + @param testCase: The test case to add a cleanup to. + + @param logPublisher: the log publisher to add and remove observers for. + + @return: a 0-argument callable that returns a list of textual log messages + for comparison. + @rtype: L{list} of L{unicode} + """ + logs = [] + logPublisher.addObserver(logs.append) + testCase.addCleanup(lambda: logPublisher.removeObserver(logs.append)) + return lambda: [formatEvent(event) for event in logs] + + + +AT_LEAST_ONE_ATTEMPT = 100. + +class ClientServiceTests(SynchronousTestCase): + """ + Tests for L{ClientService}. + """ + + def makeReconnector(self, fireImmediately=True, startService=True, + protocolType=Protocol, **kw): + """ + Create a L{ClientService} along with a L{ConnectInformation} indicating + the connections in progress on its endpoint. + + @param fireImmediately: Should all of the endpoint connection attempts + fire synchronously? + @type fireImmediately: L{bool} + + @param startService: Should the L{ClientService} be started before + being returned? + @type startService: L{bool} + + @param protocolType: a 0-argument callable returning a new L{IProtocol} + provider to be used for application-level protocol connections. + + @param kw: Arbitrary keyword arguments to be passed on to + L{ClientService} + + @return: a 2-tuple of L{ConnectInformation} (for information about test + state) and L{ClientService} (the system under test). The + L{ConnectInformation} has 2 additional attributes; + C{applicationFactory} and C{applicationProtocols}, which refer to + the unwrapped protocol factory and protocol instances passed in to + L{ClientService} respectively. + """ + nkw = {} + nkw.update(clock=Clock()) + nkw.update(kw) + cq, endpoint = endpointForTesting(fireImmediately=fireImmediately) + + # `endpointForTesting` is totally generic to any LLPI client that uses + # endpoints, and maintains all its state internally; however, + # applicationProtocols and applicationFactory are bonus attributes that + # are only specifically interesitng to tests that use wrapper + # protocols. For now, set them here, externally. + + applicationProtocols = cq.applicationProtocols = [] + + class RememberingFactory(Factory, object): + protocol = protocolType + def buildProtocol(self, addr): + result = super(RememberingFactory, self).buildProtocol(addr) + applicationProtocols.append(result) + return result + + cq.applicationFactory = factory = RememberingFactory() + + service = ClientService(endpoint, factory, **nkw) + def stop(): + service._protocol = None + if service.running: + service.stopService() + # Ensure that we don't leave any state in the reactor after + # stopService. + self.assertEqual(service._clock.getDelayedCalls(), []) + self.addCleanup(stop) + if startService: + service.startService() + return cq, service + + + def test_startService(self): + """ + When the service is started, a connection attempt is made. + """ + cq, service = self.makeReconnector(fireImmediately=False) + self.assertEqual(len(cq.connectQueue), 1) + + + def test_startStopFactory(self): + """ + Although somewhat obscure, L{IProtocolFactory} includes both C{doStart} + and C{doStop} methods; ensure that when these methods are called on the + factory that was passed to the reactor, the factory that was passed + from the application receives them. + """ + cq, service = self.makeReconnector() + firstAppFactory = cq.applicationFactory + self.assertEqual(firstAppFactory.numPorts, 0) + firstPassedFactory = cq.passedFactories[0] + firstPassedFactory.doStart() + self.assertEqual(firstAppFactory.numPorts, 1) + + + def test_stopServiceWhileConnected(self): + """ + When the service is stopped, no further connect attempts are made. The + returned L{Deferred} fires when all outstanding connections have been + stopped. + """ + cq, service = self.makeReconnector() + d = service.stopService() + self.assertNoResult(d) + protocol = cq.constructedProtocols[0] + self.assertEqual(protocol.transport.disconnecting, True) + protocol.connectionLost(Failure(Exception())) + self.successResultOf(d) + + + def test_startServiceWhileStopping(self): + """ + When L{ClientService} is stopping - that is, + L{ClientService.stopService} has been called, but the L{Deferred} it + returns has not fired yet - calling L{startService} will cause a new + connection to be made, and new calls to L{whenConnected} to succeed. + """ + cq, service = self.makeReconnector(fireImmediately=False) + cq.connectQueue[0].callback(None) + first = cq.constructedProtocols[0] + stopped = service.stopService() + self.assertNoResult(stopped) + nextProtocol = service.whenConnected() + self.assertNoResult(nextProtocol) + service.startService() + self.assertNoResult(nextProtocol) + self.assertNoResult(stopped) + self.assertEqual(first.transport.disconnecting, True) + cq.connectQueue[1].callback(None) + self.assertEqual(len(cq.constructedProtocols), 2) + self.assertIdentical(self.successResultOf(nextProtocol), + cq.applicationProtocols[1]) + secondStopped = service.stopService() + first.connectionLost(Failure(Exception())) + self.successResultOf(stopped) + self.assertNoResult(secondStopped) + + + def test_startServiceWhileStopped(self): + """ + When L{ClientService} is stopped - that is, + L{ClientService.stopService} has been called and the L{Deferred} it + returns has fired - calling L{startService} will cause a new connection + to be made, and new calls to L{whenConnected} to succeed. + """ + cq, service = self.makeReconnector(fireImmediately=False) + stopped = service.stopService() + self.successResultOf(stopped) + self.failureResultOf(service.whenConnected(), CancelledError) + service.startService() + cq.connectQueue[-1].callback(None) + self.assertIdentical(cq.applicationProtocols[-1], + self.successResultOf(service.whenConnected())) + + + def test_interfacesForTransport(self): + """ + If the protocol objects returned by the factory given to + L{ClientService} provide special "marker" interfaces for their + transport - L{IHalfCloseableProtocol} or L{IFileDescriptorReceiver} - + those interfaces will be provided by the protocol objects passed on to + the reactor. + """ + @implementer(IHalfCloseableProtocol, IFileDescriptorReceiver) + class FancyProtocol(Protocol, object): + """ + Provider of various interfaces. + """ + cq, service = self.makeReconnector(protocolType=FancyProtocol) + reactorFacing = cq.constructedProtocols[0] + self.assertTrue(IFileDescriptorReceiver.providedBy(reactorFacing)) + self.assertTrue(IHalfCloseableProtocol.providedBy(reactorFacing)) + + + def test_stopServiceWhileRetrying(self): + """ + When the service is stopped while retrying, the retry is cancelled. + """ + clock = Clock() + cq, service = self.makeReconnector(fireImmediately=False, clock=clock) + cq.connectQueue[0].errback(Exception()) + clock.advance(AT_LEAST_ONE_ATTEMPT) + self.assertEqual(len(cq.connectQueue), 2) + d = service.stopService() + cq.connectQueue[1].errback(Exception()) + self.successResultOf(d) + + + def test_stopServiceWhileConnecting(self): + """ + When the service is stopped while initially connecting, the connection + attempt is cancelled. + """ + clock = Clock() + cq, service = self.makeReconnector(fireImmediately=False, clock=clock) + self.assertEqual(len(cq.connectQueue), 1) + self.assertNoResult(cq.connectQueue[0]) + d = service.stopService() + self.successResultOf(d) + + + def test_clientConnected(self): + """ + When a client connects, the service keeps a reference to the new + protocol and resets the delay. + """ + clock = Clock() + cq, service = self.makeReconnector(clock=clock) + awaitingProtocol = service.whenConnected() + self.assertEqual(clock.getDelayedCalls(), []) + self.assertIdentical(self.successResultOf(awaitingProtocol), + cq.applicationProtocols[0]) + + + def test_clientConnectionFailed(self): + """ + When a client connection fails, the service removes its reference + to the protocol and tries again after a timeout. + """ + clock = Clock() + cq, service = self.makeReconnector(fireImmediately=False, + clock=clock) + self.assertEqual(len(cq.connectQueue), 1) + cq.connectQueue[0].errback(Failure(Exception())) + whenConnected = service.whenConnected() + self.assertNoResult(whenConnected) + # Don't fail during test tear-down when service shutdown causes all + # waiting connections to fail. + whenConnected.addErrback(lambda ignored: ignored.trap(CancelledError)) + clock.advance(AT_LEAST_ONE_ATTEMPT) + self.assertEqual(len(cq.connectQueue), 2) + + + def test_clientConnectionLost(self): + """ + When a client connection is lost, the service removes its reference + to the protocol and calls retry. + """ + clock = Clock() + cq, service = self.makeReconnector(clock=clock, fireImmediately=False) + self.assertEquals(len(cq.connectQueue), 1) + cq.connectQueue[0].callback(None) + self.assertEquals(len(cq.connectQueue), 1) + self.assertIdentical(self.successResultOf(service.whenConnected()), + cq.applicationProtocols[0]) + cq.constructedProtocols[0].connectionLost(Failure(Exception())) + clock.advance(AT_LEAST_ONE_ATTEMPT) + self.assertEquals(len(cq.connectQueue), 2) + cq.connectQueue[1].callback(None) + self.assertIdentical(self.successResultOf(service.whenConnected()), + cq.applicationProtocols[1]) + + + def test_clientConnectionLostWhileStopping(self): + """ + When a client connection is lost while the service is stopping, the + protocol stopping deferred is called and the reference to the protocol + is removed. + """ + clock = Clock() + cq, service = self.makeReconnector(clock=clock) + d = service.stopService() + cq.constructedProtocols[0].connectionLost(Failure(IndentationError())) + self.failureResultOf(service.whenConnected(), CancelledError) + self.assertTrue(d.called) + + + def test_startTwice(self): + """ + If L{ClientService} is started when it's already started, it will log a + complaint and do nothing else (in particular it will not make + additional connections). + """ + cq, service = self.makeReconnector(fireImmediately=False, + startService=False) + self.assertEqual(len(cq.connectQueue), 0) + service.startService() + self.assertEqual(len(cq.connectQueue), 1) + messages = catchLogs(self) + service.startService() + self.assertEqual(len(cq.connectQueue), 1) + self.assertIn("Duplicate ClientService.startService", messages()[0]) + + + def test_whenConnectedLater(self): + """ + L{ClientService.whenConnected} returns a L{Deferred} that fires when a + connection is established. + """ + clock = Clock() + cq, service = self.makeReconnector(fireImmediately=False, clock=clock) + a = service.whenConnected() + b = service.whenConnected() + self.assertNoResult(a) + self.assertNoResult(b) + cq.connectQueue[0].callback(None) + resultA = self.successResultOf(a) + resultB = self.successResultOf(b) + self.assertIdentical(resultA, resultB) + self.assertIdentical(resultA, cq.applicationProtocols[0]) + + + def test_whenConnectedStopService(self): + """ + L{ClientService.whenConnected} returns a L{Deferred} that fails when + L{ClientService.stopService} is called. + """ + clock = Clock() + cq, service = self.makeReconnector(fireImmediately=False, clock=clock) + a = service.whenConnected() + b = service.whenConnected() + self.assertNoResult(a) + self.assertNoResult(b) + service.stopService() + clock.advance(AT_LEAST_ONE_ATTEMPT) + self.failureResultOf(a, CancelledError) + self.failureResultOf(b, CancelledError) + diff --git a/twisted/topfiles/4735.feature b/twisted/topfiles/4735.feature new file mode 100644 index 00000000000..59dc3dd99e4 --- /dev/null +++ b/twisted/topfiles/4735.feature @@ -0,0 +1 @@ +twisted.application.internet.ClientService, a service that maintains a persistent outgoing endpoint-based connection; a replacement for ReconnectingClientFactory that uses modern APIs.