Skip to content

Commit

Permalink
WIP - tests pass
Browse files Browse the repository at this point in the history
  • Loading branch information
markrwilliams committed Feb 21, 2018
1 parent c164919 commit 1b52bd8
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 21 deletions.
47 changes: 36 additions & 11 deletions src/twisted/trial/_dist/disttrial.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from twisted.python.filepath import FilePath
from twisted.python.modules import theSystemPath
from twisted.internet.defer import DeferredList
from twisted.internet.defer import DeferredList, Deferred
from twisted.internet.task import cooperate

from twisted.trial.util import _unusedTestDirectory
Expand All @@ -38,6 +38,7 @@ class DistTrialRunner(object):
@ivar _reporterFactory: the reporter class to be used.
"""
_terminationTimeout = 2
_distReporterFactory = DistReporter

def _makeResult(self):
Expand Down Expand Up @@ -200,6 +201,7 @@ def run(self, suite, reactor=None, cooperate=cooperate,
processEndDeferreds = [worker.endDeferred for worker in workers]
self.launchWorkerProcesses(reactor.spawnProcess, workers,
self._workerArguments)
processes = [worker.transport for worker in workers]

def runTests():
testCases = iter(list(_iterateTests(suite)))
Expand All @@ -225,25 +227,48 @@ def nextRun(ign):

def stop(ign):
testDirLock.unlock()
if not stopping:
stopping.append(None)
reactor.stop()

def beforeShutDown():
def removeProcess(result, process):
processes.remove(process)
return result

for process, endDeferred in list(zip(processes,
processEndDeferreds)):
# Worker processes exit when the pipe they receive
# commands on is closed
process.closeChildFD(_WORKER_AMP_STDIN)
endDeferred.addBoth(removeProcess, process)

if not stopping:
stopping.append(None)
d = DeferredList(processEndDeferreds, consumeErrors=True)
return d.addCallback(continueShutdown)
reactor.stop()

def continueShutdown(ign):
self.writeResults(result)
return ign
def waitForProcesses():
processesEnded = DeferredList(processEndDeferreds,
consumeErrors=True)
timer = Deferred().addTimeout(self._terminationTimeout, reactor)
timed = DeferredList(
[processesEnded, timer],
fireOnOneCallback=True,
fireOnOneErrback=True,
consumeErrors=True,
)
timed.addErrback(killProcessesAndWaitAgain)
return timed

def killProcessesAndWaitAgain(error):
for process in list(processes):
process.signalProcess("KILL")

return waitForProcesses()

d = runTests()
d.addCallback(nextRun)
d.addBoth(stop)

reactor.addSystemEventTrigger('before', 'shutdown', beforeShutDown)
reactor.addSystemEventTrigger(
'before', 'shutdown', stopping.append, None)
reactor.addSystemEventTrigger('before', 'shutdown', waitForProcesses)
reactor.run()

return result
Expand Down
20 changes: 10 additions & 10 deletions src/twisted/trial/_dist/test/test_disttrial.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from twisted.internet.protocol import ProcessProtocol
from twisted.internet.defer import fail, succeed
from twisted.internet.task import Cooperator, deferLater
from twisted.internet.task import Clock, Cooperator, deferLater
from twisted.internet.main import CONNECTION_DONE
from twisted.internet import reactor
from twisted.python.compat import NativeStringIO as StringIO
Expand Down Expand Up @@ -40,9 +40,11 @@ def writeToChild(self, fd, data):
Ignore write calls.
"""

def closeChildFD(self, fd):
pass


class FakeReactor(object):
class FakeReactor(Clock):
"""
A simple fake reactor for testing purposes.
"""
Expand Down Expand Up @@ -308,7 +310,8 @@ def spawnProcess(self, worker, *args, **kwargs):
def succeedingRun(self, case, result):
return succeed(None)

def addSystemEventTrigger(oself, phase, event, function):
def addSystemEventTrigger(
oself, phase, event, function, *args, **kwargs):
self.assertEqual('before', phase)
self.assertEqual('shutdown', event)
functions.append(function)
Expand All @@ -322,9 +325,6 @@ def check():
localLock = FilesystemLock(workingDirectory + ".lock")
self.assertTrue(localLock.lock())
self.assertEqual(1, fakeReactor.stopCount)
# We don't wait for the process deferreds here, so nothing is
# returned by the function before shutdown
self.assertIdentical(None, functions[0]())

return deferLater(reactor, 0, check)

Expand All @@ -344,7 +344,8 @@ def spawnProcess(self, worker, *args, **kwargs):
worker.makeConnection(FakeTransport())
workers.append(worker)

def addSystemEventTrigger(oself, phase, event, function):
def addSystemEventTrigger(oself,
phase, event, function, *args, **kwargs):
self.assertEqual('before', phase)
self.assertEqual('shutdown', event)
functions.append(function)
Expand All @@ -361,11 +362,10 @@ def check(ign):
def realCheck():
localLock = FilesystemLock(workingDirectory + ".lock")
self.assertTrue(localLock.lock())
# Stop is not called, as it ought to have been called before
self.assertEqual(0, fakeReactor.stopCount)
self.assertEqual(1, fakeReactor.stopCount)

workers[0].processEnded(Failure(CONNECTION_DONE))
return functions[0]().addCallback(check)
return functions[-1]().addCallback(check)


def test_runUntilFailure(self):
Expand Down
1 change: 1 addition & 0 deletions src/twisted/trial/_dist/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def run(self, testCase):
"""
if _PY3:
testCase = testCase.decode("utf-8")
print("CASE", testCase)
case = self._loader.loadByName(testCase)
suite = TrialSuite([case], self._forceGarbageCollection)
suite.run(self._result)
Expand Down

0 comments on commit 1b52bd8

Please sign in to comment.