Skip to content

Commit

Permalink
Zero resource warnings for twisted.trial._dist.test.
Browse files Browse the repository at this point in the history
python -Wall::ResourceWarning -m twisted.trial --force-gc twisted.trial._dist.test

disttrial now always waits for its worker processes before shutdown;
if they haven't terminated before the parent begins its reactor.stop
call, the parent will wait 10 seconds before sending a kill signal to
the remaining processes and waiting again.  As currently written this
could continue indefinitely if any worker process is uninterrupibly
sleeping.

As a result, test_disttrial can rely on workers' endDeferred Deferreds
firing... except on these tests:

twisted.trial._dist.test.test_disttrial.DistTrialRunnerTests.test_minimalWorker
twisted.trial._dist.test.test_disttrial.DistTrialRunnerTests.test_run
twisted.trial._dist.test.test_disttrial.DistTrialRunnerTests.test_runStopAfterTests
twisted.trial._dist.test.test_disttrial.DistTrialRunnerTests.test_runUncleanWarnings
twisted.trial._dist.test.test_disttrial.DistTrialRunnerTests.test_runUnexpectedError
twisted.trial._dist.test.test_disttrial.DistTrialRunnerTests.test_runUntilFailure
twisted.trial._dist.test.test_disttrial.DistTrialRunnerTests.test_runUsedDirectory

...because they don't advance the disttrial scheduling code enough to
reach shutdown event trigger that cleans up endDeferreds.

This commit must be split up between ResourceWarning mitigations and
disttrial process management.
  • Loading branch information
markrwilliams committed Feb 21, 2018
1 parent 1b52bd8 commit aaa46a9
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 60 deletions.
146 changes: 102 additions & 44 deletions src/twisted/trial/_dist/test/test_disttrial.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@
import sys

from twisted.internet.protocol import ProcessProtocol
from twisted.internet.defer import fail, succeed
from twisted.internet.task import Clock, Cooperator, deferLater
from twisted.internet.defer import fail, gatherResults, maybeDeferred, succeed
from twisted.internet.task import Cooperator, deferLater
from twisted.internet.main import CONNECTION_DONE
from twisted.internet import reactor
from twisted.internet import reactor, interfaces
from twisted.python.compat import NativeStringIO as StringIO
from twisted.python.failure import Failure
from twisted.python.lockfile import FilesystemLock

from twisted.test.test_cooperator import FakeScheduler
from twisted.test.proto_helpers import MemoryReactorClock

from twisted.trial.unittest import TestCase
from twisted.trial.reporter import Reporter, TreeReporter
Expand All @@ -28,6 +29,8 @@
from twisted.trial._dist.distreporter import DistReporter
from twisted.trial._dist.worker import LocalWorker

from zope.interface import implementer



class FakeTransport(object):
Expand All @@ -44,29 +47,54 @@ def closeChildFD(self, fd):
pass


class FakeReactor(Clock):

@implementer(interfaces.IReactorProcess)
class CountingReactor(MemoryReactorClock):
"""
A simple fake reactor for testing purposes.
A fake reactor that counts the calls to L{IReactorCore.run},
L{IReactorCore.stop}, and L{IReactorProcess.spawnProcess}.
"""
spawnCount = 0
stopCount = 0
runCount = 0

def __init__(self, workers):
MemoryReactorClock.__init__(self)
self._workers = workers


def spawnProcess(self, worker, *args, **kwargs):
"""
See L{IReactorProcess.spawnProcess}.
@param worker: See L{IReactorProcess.spawnProcess}.
@param args: See L{IReactorProcess.spawnProcess}.
@param kwargs: See L{IReactorProcess.spawnProcess}.
"""
self._workers.append(worker)
worker.makeConnection(FakeTransport())
self.spawnCount += 1


def stop(self):
"""
See L{IReactorCore.stop}.
"""
MemoryReactorClock.stop(self)
self.stopCount += 1


def run(self):
"""
See L{IReactorCore.run}.
"""
self.runCount += 1

self.running = True
self.hasRun = True

def addSystemEventTrigger(self, *args, **kw):
pass
for f, args, kwargs in self.whenRunningHooks:
f(*args, **kwargs)



Expand Down Expand Up @@ -97,6 +125,22 @@ def setUp(self):
self.runner._stream = StringIO()


def reap(self, workers, consumeErrors=True):
"""
Reap the workers.
@param workers: The workers to reap.
@type workers: An iterable of L{LocalWorker}
@param consumeErrors: Whether or not to suppress errors.
@type consumeErrors: L{bool}
"""
for worker in workers:
if consumeErrors:
worker.endDeferred.addErrback(lambda _: None)
worker.processEnded(Failure(CONNECTION_DONE))


def getFakeSchedulerAndEternalCooperator(self):
"""
Helper to create fake scheduler and cooperator in tests.
Expand Down Expand Up @@ -167,7 +211,10 @@ def test_run(self):
C{run} starts the reactor exactly once and spawns each of the workers
exactly once.
"""
fakeReactor = FakeReactor()
workers = []
fakeReactor = CountingReactor(workers)
self.addCleanup(self.reap, workers)

suite = TrialSuite()
for i in range(10):
suite.addTest(TestCase())
Expand All @@ -182,9 +229,10 @@ def test_runUsedDirectory(self):
if it is generates a name based on it.
"""

class FakeReactorWithLock(FakeReactor):
class CountingReactorWithLock(CountingReactor):

def spawnProcess(oself, worker, *args, **kwargs):
oself._workers.append(worker)
self.assertEqual(os.path.abspath(worker._logDirectory),
os.path.abspath(
os.path.join(workingDirectory + "-1",
Expand All @@ -203,10 +251,15 @@ def spawnProcess(oself, worker, *args, **kwargs):
self.addCleanup(lock.unlock)
self.runner._workingDirectory = workingDirectory

fakeReactor = FakeReactorWithLock()
workers = []

fakeReactor = CountingReactorWithLock(workers)
self.addCleanup(self.reap, workers)

suite = TrialSuite()
for i in range(10):
suite.addTest(TestCase())

self.runner.run(suite, fakeReactor)


Expand All @@ -215,7 +268,10 @@ def test_minimalWorker(self):
L{DistTrialRunner} doesn't try to start more workers than the number of
tests.
"""
fakeReactor = FakeReactor()
workers = []
fakeReactor = CountingReactor(workers)
self.addCleanup(self.reap, workers)

self.runner.run(TestCase(), fakeReactor)
self.assertEqual(fakeReactor.runCount, 1)
self.assertEqual(fakeReactor.spawnCount, 1)
Expand All @@ -226,7 +282,10 @@ def test_runUncleanWarnings(self):
Running with the C{unclean-warnings} option makes L{DistTrialRunner}
uses the L{UncleanWarningsReporterWrapper}.
"""
fakeReactor = FakeReactor()
workers = []
fakeReactor = CountingReactor(workers)
self.addCleanup(self.reap, workers)

self.runner._uncleanWarnings = True
result = self.runner.run(TestCase(), fakeReactor)
self.assertIsInstance(result, DistReporter)
Expand Down Expand Up @@ -272,9 +331,10 @@ def test_runUnexpectedError(self):
suite catches and fails.
"""

class FakeReactorWithFail(FakeReactor):
class CountingReactorWithFail(CountingReactor):

def spawnProcess(self, worker, *args, **kwargs):
self._workers.append(worker)
worker.makeConnection(FakeTransport())
self.spawnCount += 1
worker._ampProtocol.run = self.failingRun
Expand All @@ -284,7 +344,10 @@ def failingRun(self, case, result):

scheduler, cooperator = self.getFakeSchedulerAndEternalCooperator()

fakeReactor = FakeReactorWithFail()
workers = []
fakeReactor = CountingReactorWithFail(workers)
self.addCleanup(self.reap, workers)

result = self.runner.run(TestCase(), fakeReactor,
cooperator.cooperate)
self.assertEqual(fakeReactor.runCount, 1)
Expand All @@ -298,34 +361,33 @@ def test_runStopAfterTests(self):
L{DistTrialRunner} calls C{reactor.stop} and unlocks the test directory
once the tests have run.
"""
functions = []

class FakeReactorWithSuccess(FakeReactor):
class CountingReactorWithSuccess(CountingReactor):

def spawnProcess(self, worker, *args, **kwargs):
self._workers.append(worker)
worker.makeConnection(FakeTransport())
self.spawnCount += 1
worker._ampProtocol.run = self.succeedingRun

def succeedingRun(self, case, result):
return succeed(None)

def addSystemEventTrigger(
oself, phase, event, function, *args, **kwargs):
self.assertEqual('before', phase)
self.assertEqual('shutdown', event)
functions.append(function)

workingDirectory = self.runner._workingDirectory

fakeReactor = FakeReactorWithSuccess()
workers = []
fakeReactor = CountingReactorWithSuccess(workers)

self.runner.run(TestCase(), fakeReactor)

def check():
localLock = FilesystemLock(workingDirectory + ".lock")
self.assertTrue(localLock.lock())
self.assertEqual(1, fakeReactor.stopCount)

self.assertEqual(list(fakeReactor.triggers.keys()), ["before"])
self.assertEqual(list(fakeReactor.triggers["before"]), ["shutdown"])
self.reap(workers)

return deferLater(reactor, 0, check)


Expand All @@ -335,24 +397,10 @@ def test_runWaitForProcessesDeferreds(self):
reactor is stopping, and then unlocks the test directory, not trying to
stop the reactor again.
"""
functions = []
workers = []

class FakeReactorWithEvent(FakeReactor):

def spawnProcess(self, worker, *args, **kwargs):
worker.makeConnection(FakeTransport())
workers.append(worker)

def addSystemEventTrigger(oself,
phase, event, function, *args, **kwargs):
self.assertEqual('before', phase)
self.assertEqual('shutdown', event)
functions.append(function)

workingDirectory = self.runner._workingDirectory

fakeReactor = FakeReactorWithEvent()
fakeReactor = CountingReactor(workers)
self.runner.run(TestCase(), fakeReactor)

def check(ign):
Expand All @@ -362,10 +410,17 @@ def check(ign):
def realCheck():
localLock = FilesystemLock(workingDirectory + ".lock")
self.assertTrue(localLock.lock())
self.assertEqual(1, fakeReactor.stopCount)
# Stop is not called, as it ought to have been called before
self.assertEqual(0, fakeReactor.stopCount)

workers[0].processEnded(Failure(CONNECTION_DONE))
return functions[-1]().addCallback(check)
self.assertEqual(list(fakeReactor.triggers.keys()), ["before"])
self.assertEqual(list(fakeReactor.triggers["before"]), ["shutdown"])
self.reap(workers)

return gatherResults([
maybeDeferred(f, *a, **kw)
for f, a, kw in fakeReactor.triggers["before"]["shutdown"]
]).addCallback(check)


def test_runUntilFailure(self):
Expand All @@ -375,9 +430,10 @@ def test_runUntilFailure(self):
"""
called = []

class FakeReactorWithSuccess(FakeReactor):
class CountingReactorWithSuccess(CountingReactor):

def spawnProcess(self, worker, *args, **kwargs):
self._workers.append(worker)
worker.makeConnection(FakeTransport())
self.spawnCount += 1
worker._ampProtocol.run = self.succeedingRun
Expand All @@ -388,7 +444,9 @@ def succeedingRun(self, case, result):
return fail(RuntimeError("oops"))
return succeed(None)

fakeReactor = FakeReactorWithSuccess()
workers = []
fakeReactor = CountingReactorWithSuccess(workers)
self.addCleanup(self.reap, workers)

scheduler, cooperator = self.getFakeSchedulerAndEternalCooperator()

Expand Down
39 changes: 24 additions & 15 deletions src/twisted/trial/_dist/test/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,15 +316,32 @@ class LocalWorkerTests(TestCase):
Tests for L{LocalWorker} and L{LocalWorkerTransport}.
"""

def tidyLocalWorker(self, *args, **kwargs):
"""
Create a L{LocalWorker}, connect it to a transport, and ensure
its log files are closed.
@param args: See L{LocalWorker}
@param kwargs: See L{LocalWorker}
@return: a L{LocalWorker} instance
"""
worker = LocalWorker(*args, **kwargs)
worker.makeConnection(FakeTransport())
self.addCleanup(worker._testLog.close)
self.addCleanup(worker._outLog.close)
self.addCleanup(worker._errLog.close)
return worker


def test_childDataReceived(self):
"""
L{LocalWorker.childDataReceived} forwards the received data to linked
L{AMP} protocol if the right file descriptor, otherwise forwards to
C{ProcessProtocol.childDataReceived}.
"""
fakeTransport = FakeTransport()
localWorker = LocalWorker(FakeAMProtocol(), '.', 'test.log')
localWorker.makeConnection(fakeTransport)
localWorker = self.tidyLocalWorker(FakeAMProtocol(), '.', 'test.log')
localWorker._outLog = BytesIO()
localWorker.childDataReceived(4, b"foo")
localWorker.childDataReceived(1, b"bar")
Expand All @@ -337,9 +354,7 @@ def test_outReceived(self):
L{LocalWorker.outReceived} logs the output into its C{_outLog} log
file.
"""
fakeTransport = FakeTransport()
localWorker = LocalWorker(FakeAMProtocol(), '.', 'test.log')
localWorker.makeConnection(fakeTransport)
localWorker = self.tidyLocalWorker(FakeAMProtocol(), '.', 'test.log')
localWorker._outLog = BytesIO()
data = b"The quick brown fox jumps over the lazy dog"
localWorker.outReceived(data)
Expand All @@ -351,9 +366,7 @@ def test_errReceived(self):
L{LocalWorker.errReceived} logs the errors into its C{_errLog} log
file.
"""
fakeTransport = FakeTransport()
localWorker = LocalWorker(FakeAMProtocol(), '.', 'test.log')
localWorker.makeConnection(fakeTransport)
localWorker = self.tidyLocalWorker(FakeAMProtocol(), '.', 'test.log')
localWorker._errLog = BytesIO()
data = b"The quick brown fox jumps over the lazy dog"
localWorker.errReceived(data)
Expand Down Expand Up @@ -401,9 +414,7 @@ def test_connectionLost(self):
L{LocalWorker.connectionLost} closes the log streams.
"""

transport = FakeTransport()
localWorker = LocalWorker(FakeAMProtocol(), '.', 'test.log')
localWorker.makeConnection(transport)
localWorker = self.tidyLocalWorker(FakeAMProtocol(), '.', 'test.log')
localWorker.connectionLost(None)
self.assertTrue(localWorker._outLog.closed)
self.assertTrue(localWorker._errLog.closed)
Expand Down Expand Up @@ -456,10 +467,8 @@ def test_startError(self):
def failCallRemote(command, directory):
return fail(RuntimeError("oops"))

transport = FakeTransport()
protocol = FakeAMProtocol()
protocol.callRemote = failCallRemote
localWorker = LocalWorker(protocol, '.', 'test.log')
localWorker.makeConnection(transport)
self.tidyLocalWorker(protocol, '.', 'test.log')

self.assertEqual([], self.flushLoggedErrors(RuntimeError))
Loading

0 comments on commit aaa46a9

Please sign in to comment.