forked from twisted/twisted
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path_asynctest.py
405 lines (335 loc) · 14.3 KB
/
_asynctest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
# -*- test-case-name: twisted.trial.test -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Things likely to be used by writers of unit tests.
Maintainer: Jonathan Lange
"""
from __future__ import division, absolute_import
import inspect
import warnings
from zope.interface import implementer
# We can't import reactor at module-level because this code runs before trial
# installs a user-specified reactor, installing the default reactor and
# breaking reactor installation. See also #6047.
from twisted.internet import defer, utils
from twisted.python import failure
from twisted.trial import itrial, util
from twisted.trial._synctest import (
FailTest, SkipTest, SynchronousTestCase)
_wait_is_running = []
@implementer(itrial.ITestCase)
class TestCase(SynchronousTestCase):
"""
A unit test. The atom of the unit testing universe.
This class extends L{SynchronousTestCase} which extends C{unittest.TestCase}
from the standard library. The main feature is the ability to return
C{Deferred}s from tests and fixture methods and to have the suite wait for
those C{Deferred}s to fire. Also provides new assertions such as
L{assertFailure}.
@ivar timeout: A real number of seconds. If set, the test will
raise an error if it takes longer than C{timeout} seconds.
If not set, util.DEFAULT_TIMEOUT_DURATION is used.
"""
def __init__(self, methodName='runTest'):
"""
Construct an asynchronous test case for C{methodName}.
@param methodName: The name of a method on C{self}. This method should
be a unit test. That is, it should be a short method that calls some of
the assert* methods. If C{methodName} is unspecified,
L{SynchronousTestCase.runTest} will be used as the test method. This is
mostly useful for testing Trial.
"""
super(TestCase, self).__init__(methodName)
def assertFailure(self, deferred, *expectedFailures):
"""
Fail if C{deferred} does not errback with one of C{expectedFailures}.
Returns the original Deferred with callbacks added. You will need
to return this Deferred from your test case.
"""
def _cb(ignore):
raise self.failureException(
"did not catch an error, instead got %r" % (ignore,))
def _eb(failure):
if failure.check(*expectedFailures):
return failure.value
else:
output = ('\nExpected: %r\nGot:\n%s'
% (expectedFailures, str(failure)))
raise self.failureException(output)
return deferred.addCallbacks(_cb, _eb)
failUnlessFailure = assertFailure
def _run(self, methodName, result):
from twisted.internet import reactor
timeout = self.getTimeout()
def onTimeout(d):
e = defer.TimeoutError("%r (%s) still running at %s secs"
% (self, methodName, timeout))
f = failure.Failure(e)
# try to errback the deferred that the test returns (for no gorram
# reason) (see issue1005 and test_errorPropagation in
# test_deferred)
try:
d.errback(f)
except defer.AlreadyCalledError:
# if the deferred has been called already but the *back chain
# is still unfinished, crash the reactor and report timeout
# error ourself.
reactor.crash()
self._timedOut = True # see self._wait
todo = self.getTodo()
if todo is not None and todo.expected(f):
result.addExpectedFailure(self, f, todo)
else:
result.addError(self, f)
onTimeout = utils.suppressWarnings(
onTimeout, util.suppress(category=DeprecationWarning))
method = getattr(self, methodName)
if inspect.isgeneratorfunction(method):
exc = TypeError(
'%r is a generator function and therefore will never run' % (
method,))
return defer.fail(exc)
d = defer.maybeDeferred(
utils.runWithWarningsSuppressed, self._getSuppress(), method)
call = reactor.callLater(timeout, onTimeout, d)
d.addBoth(lambda x : call.active() and call.cancel() or x)
return d
def __call__(self, *args, **kwargs):
return self.run(*args, **kwargs)
def deferSetUp(self, ignored, result):
d = self._run('setUp', result)
d.addCallbacks(self.deferTestMethod, self._ebDeferSetUp,
callbackArgs=(result,),
errbackArgs=(result,))
return d
def _ebDeferSetUp(self, failure, result):
if failure.check(SkipTest):
result.addSkip(self, self._getSkipReason(self.setUp, failure.value))
else:
result.addError(self, failure)
if failure.check(KeyboardInterrupt):
result.stop()
return self.deferRunCleanups(None, result)
def deferTestMethod(self, ignored, result):
d = self._run(self._testMethodName, result)
d.addCallbacks(self._cbDeferTestMethod, self._ebDeferTestMethod,
callbackArgs=(result,),
errbackArgs=(result,))
d.addBoth(self.deferRunCleanups, result)
d.addBoth(self.deferTearDown, result)
return d
def _cbDeferTestMethod(self, ignored, result):
if self.getTodo() is not None:
result.addUnexpectedSuccess(self, self.getTodo())
else:
self._passed = True
return ignored
def _ebDeferTestMethod(self, f, result):
todo = self.getTodo()
if todo is not None and todo.expected(f):
result.addExpectedFailure(self, f, todo)
elif f.check(self.failureException, FailTest):
result.addFailure(self, f)
elif f.check(KeyboardInterrupt):
result.addError(self, f)
result.stop()
elif f.check(SkipTest):
result.addSkip(
self,
self._getSkipReason(getattr(self, self._testMethodName), f.value))
else:
result.addError(self, f)
def deferTearDown(self, ignored, result):
d = self._run('tearDown', result)
d.addErrback(self._ebDeferTearDown, result)
return d
def _ebDeferTearDown(self, failure, result):
result.addError(self, failure)
if failure.check(KeyboardInterrupt):
result.stop()
self._passed = False
def deferRunCleanups(self, ignored, result):
"""
Run any scheduled cleanups and report errors (if any to the result
object.
"""
d = self._runCleanups()
d.addCallback(self._cbDeferRunCleanups, result)
return d
def _cbDeferRunCleanups(self, cleanupResults, result):
for flag, testFailure in cleanupResults:
if flag == defer.FAILURE:
result.addError(self, testFailure)
if testFailure.check(KeyboardInterrupt):
result.stop()
self._passed = False
def _cleanUp(self, result):
try:
clean = util._Janitor(self, result).postCaseCleanup()
if not clean:
self._passed = False
except:
result.addError(self, failure.Failure())
self._passed = False
for error in self._observer.getErrors():
result.addError(self, error)
self._passed = False
self.flushLoggedErrors()
self._removeObserver()
if self._passed:
result.addSuccess(self)
def _classCleanUp(self, result):
try:
util._Janitor(self, result).postClassCleanup()
except:
result.addError(self, failure.Failure())
def _makeReactorMethod(self, name):
"""
Create a method which wraps the reactor method C{name}. The new
method issues a deprecation warning and calls the original.
"""
def _(*a, **kw):
warnings.warn("reactor.%s cannot be used inside unit tests. "
"In the future, using %s will fail the test and may "
"crash or hang the test run."
% (name, name),
stacklevel=2, category=DeprecationWarning)
return self._reactorMethods[name](*a, **kw)
return _
def _deprecateReactor(self, reactor):
"""
Deprecate C{iterate}, C{crash} and C{stop} on C{reactor}. That is,
each method is wrapped in a function that issues a deprecation
warning, then calls the original.
@param reactor: The Twisted reactor.
"""
self._reactorMethods = {}
for name in ['crash', 'iterate', 'stop']:
self._reactorMethods[name] = getattr(reactor, name)
setattr(reactor, name, self._makeReactorMethod(name))
def _undeprecateReactor(self, reactor):
"""
Restore the deprecated reactor methods. Undoes what
L{_deprecateReactor} did.
@param reactor: The Twisted reactor.
"""
for name, method in self._reactorMethods.items():
setattr(reactor, name, method)
self._reactorMethods = {}
def _runCleanups(self):
"""
Run the cleanups added with L{addCleanup} in order.
@return: A C{Deferred} that fires when all cleanups are run.
"""
def _makeFunction(f, args, kwargs):
return lambda: f(*args, **kwargs)
callables = []
while len(self._cleanups) > 0:
f, args, kwargs = self._cleanups.pop()
callables.append(_makeFunction(f, args, kwargs))
return util._runSequentially(callables)
def _runFixturesAndTest(self, result):
"""
Really run C{setUp}, the test method, and C{tearDown}. Any of these may
return L{defer.Deferred}s. After they complete, do some reactor cleanup.
@param result: A L{TestResult} object.
"""
from twisted.internet import reactor
self._deprecateReactor(reactor)
self._timedOut = False
try:
d = self.deferSetUp(None, result)
try:
self._wait(d)
finally:
self._cleanUp(result)
self._classCleanUp(result)
finally:
self._undeprecateReactor(reactor)
def addCleanup(self, f, *args, **kwargs):
"""
Extend the base cleanup feature with support for cleanup functions which
return Deferreds.
If the function C{f} returns a Deferred, C{TestCase} will wait until the
Deferred has fired before proceeding to the next function.
"""
return super(TestCase, self).addCleanup(f, *args, **kwargs)
def getSuppress(self):
return self._getSuppress()
def getTimeout(self):
"""
Returns the timeout value set on this test. Checks on the instance
first, then the class, then the module, then packages. As soon as it
finds something with a C{timeout} attribute, returns that. Returns
L{util.DEFAULT_TIMEOUT_DURATION} if it cannot find anything. See
L{TestCase} docstring for more details.
"""
timeout = util.acquireAttribute(self._parents, 'timeout',
util.DEFAULT_TIMEOUT_DURATION)
try:
return float(timeout)
except (ValueError, TypeError):
# XXX -- this is here because sometimes people will have methods
# called 'timeout', or set timeout to 'orange', or something
# Particularly, test_news.NewsTestCase and ReactorCoreTestCase
# both do this.
warnings.warn("'timeout' attribute needs to be a number.",
category=DeprecationWarning)
return util.DEFAULT_TIMEOUT_DURATION
def _wait(self, d, running=_wait_is_running):
"""Take a Deferred that only ever callbacks. Block until it happens.
"""
if running:
raise RuntimeError("_wait is not reentrant")
from twisted.internet import reactor
results = []
def append(any):
if results is not None:
results.append(any)
def crash(ign):
if results is not None:
reactor.crash()
crash = utils.suppressWarnings(
crash, util.suppress(message=r'reactor\.crash cannot be used.*',
category=DeprecationWarning))
def stop():
reactor.crash()
stop = utils.suppressWarnings(
stop, util.suppress(message=r'reactor\.crash cannot be used.*',
category=DeprecationWarning))
running.append(None)
try:
d.addBoth(append)
if results:
# d might have already been fired, in which case append is
# called synchronously. Avoid any reactor stuff.
return
d.addBoth(crash)
reactor.stop = stop
try:
reactor.run()
finally:
del reactor.stop
# If the reactor was crashed elsewhere due to a timeout, hopefully
# that crasher also reported an error. Just return.
# _timedOut is most likely to be set when d has fired but hasn't
# completed its callback chain (see self._run)
if results or self._timedOut: #defined in run() and _run()
return
# If the timeout didn't happen, and we didn't get a result or
# a failure, then the user probably aborted the test, so let's
# just raise KeyboardInterrupt.
# FIXME: imagine this:
# web/test/test_webclient.py:
# exc = self.assertRaises(error.Error, wait, method(url))
#
# wait() will raise KeyboardInterrupt, and assertRaises will
# swallow it. Therefore, wait() raising KeyboardInterrupt is
# insufficient to stop trial. A suggested solution is to have
# this code set a "stop trial" flag, or otherwise notify trial
# that it should really try to stop as soon as possible.
raise KeyboardInterrupt()
finally:
results = None
running.pop()