48
48
from twisted .internet import task
49
49
from twisted .python .failure import Failure
50
50
from twisted .internet .defer import (
51
- CancelledError , Deferred , succeed , fail
51
+ CancelledError , Deferred , succeed , fail , maybeDeferred
52
52
)
53
53
54
54
from automat import MethodicalMachine
@@ -548,7 +548,8 @@ class _ClientMachine(object):
548
548
549
549
_machine = MethodicalMachine ()
550
550
551
- def __init__ (self , endpoint , factory , retryPolicy , clock , log ):
551
+ def __init__ (self , endpoint , factory , retryPolicy , clock ,
552
+ prepareConnection , log ):
552
553
"""
553
554
@see: L{ClientService.__init__}
554
555
@@ -566,6 +567,7 @@ def __init__(self, endpoint, factory, retryPolicy, clock, log):
566
567
self ._factory = factory
567
568
self ._timeoutForAttempt = retryPolicy
568
569
self ._clock = clock
570
+ self ._prepareConnection = prepareConnection
569
571
self ._connectionInProgress = succeed (None )
570
572
571
573
self ._awaitingConnected = []
@@ -633,10 +635,35 @@ def _connect(self):
633
635
634
636
self ._connectionInProgress = (
635
637
self ._endpoint .connect (factoryProxy )
638
+ .addCallback (self ._runPrepareConnection )
636
639
.addCallback (self ._connectionMade )
637
640
.addErrback (self ._connectionFailed ))
638
641
639
642
643
+ def _runPrepareConnection (self , protocol ):
644
+ """
645
+ Run any C{prepareConnection} callback with the connected protocol,
646
+ ignoring its return value but propagating any failure.
647
+
648
+ @param protocol: The protocol of the connection.
649
+ @type protocol: L{IProtocol}
650
+
651
+ @return: Either:
652
+
653
+ - A L{Deferred} that succeeds with the protocol when the
654
+ C{prepareConnection} callback has executed successfully.
655
+
656
+ - A L{Deferred} that fails when the C{prepareConnection} callback
657
+ throws or returns a failed L{Deferred}.
658
+
659
+ - The protocol, when no C{prepareConnection} callback is defined.
660
+ """
661
+ if self ._prepareConnection :
662
+ return (maybeDeferred (self ._prepareConnection , protocol )
663
+ .addCallback (lambda _ : protocol ))
664
+ return protocol
665
+
666
+
640
667
@_machine .output ()
641
668
def _resetFailedAttempts (self ):
642
669
"""
@@ -1009,7 +1036,9 @@ class ClientService(service.Service, object):
1009
1036
"""
1010
1037
1011
1038
_log = Logger ()
1012
- def __init__ (self , endpoint , factory , retryPolicy = None , clock = None ):
1039
+
1040
+ def __init__ (self , endpoint , factory , retryPolicy = None , clock = None ,
1041
+ prepareConnection = None ):
1013
1042
"""
1014
1043
@param endpoint: A L{stream client endpoint
1015
1044
<interfaces.IStreamClientEndpoint>} provider which will be used to
@@ -1029,13 +1058,36 @@ def __init__(self, endpoint, factory, retryPolicy=None, clock=None):
1029
1058
this attribute will not be serialized, and the default value (the
1030
1059
reactor) will be restored when deserialized.
1031
1060
@type clock: L{IReactorTime}
1061
+
1062
+ @param prepareConnection: A single argument L{callable} that may return
1063
+ a L{Deferred}. It will be called once with the L{protocol
1064
+ <interfaces.IProtocol>} each time a new connection is made. It may
1065
+ call methods on the protocol to prepare it for use (e.g.
1066
+ authenticate) or validate it (check its health).
1067
+
1068
+ The C{prepareConnection} callable may raise an exception or return
1069
+ a L{Deferred} which fails to reject the connection. A rejected
1070
+ connection is not used to fire an L{Deferred} returned by
1071
+ L{whenConnected}. Instead, L{ClientService} handles the failure
1072
+ and continues as if the connection attempt were a failure
1073
+ (incrementing the counter passed to C{retryPolicy}).
1074
+
1075
+ L{Deferred}s returned by L{whenConnected} will not fire until
1076
+ any L{Deferred} returned by the C{prepareConnection} callable
1077
+ fire. Otherwise its successful return value is consumed, but
1078
+ ignored.
1079
+
1080
+ Present Since Twisted NEXT
1081
+
1082
+ @type prepareConnection: L{callable}
1083
+
1032
1084
"""
1033
1085
clock = _maybeGlobalReactor (clock )
1034
1086
retryPolicy = _defaultPolicy if retryPolicy is None else retryPolicy
1035
1087
1036
1088
self ._machine = _ClientMachine (
1037
1089
endpoint , factory , retryPolicy , clock ,
1038
- log = self ._log ,
1090
+ prepareConnection = prepareConnection , log = self ._log ,
1039
1091
)
1040
1092
1041
1093
0 commit comments