Skip to content

Commit

Permalink
moving testing utils to pytest-toolbox
Browse files Browse the repository at this point in the history
  • Loading branch information
samuelcolvin committed Dec 6, 2016
1 parent da70284 commit ea03a63
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 193 deletions.
190 changes: 2 additions & 188 deletions arq/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,12 @@
:mod:`testing`
==============
pytest plugin and other utilities useful when writing tests for code using arq.
include the plugin in your tests's `conftest.py` file with::
pytest_plugins = 'arq.testing'
Utils for testing arq.
See arq's own tests for examples of usage.
"""
import asyncio
import contextlib
import io
import logging
import os

import pytest
from aioredis import create_redis

from .utils import RedisMixin, timestamp
from .worker import BaseWorker
Expand Down Expand Up @@ -129,181 +119,5 @@ def mock_data(self, data):

class MockRedisWorker(MockRedisMixin, BaseWorker):
"""
Dependent of Base Worker which executes jobs from MockRedis than real redis.
"""


@contextlib.contextmanager
def loop_context(existing_loop=None):
"""
context manager which creates an asyncio loop.
:param existing_loop: if supplied this loop is passed straight through and no new loop is created.
"""
if existing_loop:
# loop already exists, pass it straight through
yield existing_loop
else:
_loop = asyncio.new_event_loop()
asyncio.set_event_loop(None)

yield _loop

_loop.stop()
_loop.run_forever()
_loop.close()
asyncio.set_event_loop(None)


def pytest_pycollect_makeitem(collector, name, obj):
"""
Fix pytest collecting for coroutines.
"""
if collector.funcnamefilter(name) and asyncio.iscoroutinefunction(obj):
return list(collector._genfunctions(name, obj))


def pytest_pyfunc_call(pyfuncitem):
"""
Run coroutines in an event loop instead of a normal function call.
"""
if asyncio.iscoroutinefunction(pyfuncitem.function):
existing_loop = pyfuncitem.funcargs.get('loop', None)
with loop_context(existing_loop) as _loop:
testargs = {arg: pyfuncitem.funcargs[arg] for arg in pyfuncitem._fixtureinfo.argnames}

task = _loop.create_task(pyfuncitem.obj(**testargs))
_loop.run_until_complete(task)

return True


@pytest.yield_fixture
def loop():
"""
Yield fixture using loop_context()
"""
with loop_context() as _loop:
yield _loop


@pytest.yield_fixture
def tmpworkdir(tmpdir):
"""
Create a temporary working working directory.
"""
cwd = os.getcwd()
os.chdir(tmpdir.strpath)

yield tmpdir

os.chdir(cwd)


@pytest.yield_fixture
def redis_conn(loop):
"""
yield fixture which creates a redis connection, and flushes redis before the test.
Note: redis is not flushed after the test both for performance and to allow later debugging.
"""
async def _get_conn():
conn = await create_redis(('localhost', 6379), loop=loop)
await conn.flushall()
return conn
conn = loop.run_until_complete(_get_conn())
conn.loop = loop
yield conn

conn.close()
loop.run_until_complete(conn.wait_closed())


LOGS = ('arq.main', 'arq.work', 'arq.jobs')


class StreamLog:
"""
Log stream object which allows one or more lots to be captured and tested.
"""
def __init__(self):
self.handler = None
self.stream = io.StringIO()
self.handler = logging.StreamHandler(stream=self.stream)
self.loggers = []
self.set_loggers()

def set_loggers(self, *log_names, level=logging.INFO, fmt='%(name)s: %(message)s'):
if self.loggers:
self.finish()
log_names = log_names or LOGS
self.loggers = [logging.getLogger(log_name) for log_name in log_names]
self.handler.setFormatter(logging.Formatter(fmt))
for logger in self.loggers:
logger.disabled = False
logger.addHandler(self.handler)
self.set_level(level)

def set_level(self, level):
for logger in self.loggers:
logger.setLevel(level)

def set_different_level(self, **levels):
for log_name, level in levels.items():
logger = logging.getLogger(log_name)
logger.setLevel(level)

@property
def log(self):
self.stream.seek(0)
return self.stream.read()

def finish(self):
for logger in self.loggers:
logger.removeHandler(self.handler)

def __contains__(self, item):
return item in self.log

def __str__(self):
return 'caplog:\n' + self.log

def __repr__(self):
return '< caplog: {!r}>'.format(self.log)


@pytest.yield_fixture
def caplog():
"""
Similar to pytest's "capsys" except logs are captured not stdout and stderr
See StreamLog for details on configuration and tests for examples of usage.
"""
stream_log = StreamLog()

yield stream_log

stream_log.finish()


@pytest.yield_fixture
def debug():
"""
fixture which causes all arq logs to display. For debugging purposes only, should alwasy
be removed before committing.
Dependent of Base Worker which executes jobs from MockRedis rather than real redis.
"""
# TODO: could be extended to also work as a context manager and allow more control.
handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)
handler.setFormatter(logging.Formatter('%(asctime)s %(name)8s %(levelname)8s: %(message)s', datefmt='%H:%M:%S'))
for logger_name in LOGS:
l = logging.getLogger(logger_name)
l.addHandler(handler)
l.setLevel(logging.DEBUG)

yield

for logger_name in LOGS:
l = logging.getLogger(logger_name)
l.removeHandler(handler)
l.setLevel(logging.NOTSET)
27 changes: 26 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,27 @@
import pytest

from aioredis import create_redis

from .fixtures import DemoActor, MockRedisDemoActor, MockRedisWorker

pytest_plugins = 'arq.testing'

@pytest.yield_fixture
def redis_conn(loop):
"""
yield fixture which creates a redis connection, and flushes redis before the test.
Note: redis is not flushed after the test both for performance and to allow later debugging.
"""
async def _get_conn():
conn = await create_redis(('localhost', 6379), loop=loop)
await conn.flushall()
return conn
conn = loop.run_until_complete(_get_conn())
conn.loop = loop
yield conn

conn.close()
loop.run_until_complete(conn.wait_closed())


@pytest.yield_fixture
Expand All @@ -25,3 +44,9 @@ def mock_actor_worker(mock_actor):
_worker.mock_data = mock_actor.mock_data
yield mock_actor, _worker
mock_actor.loop.run_until_complete(_worker.close())


@pytest.fixture
def caplog(caplog):
caplog.set_loggers(log_names=('arq.main', 'arq.work', 'arq.jobs'), fmt='%(name)s: %(message)s')
return caplog
1 change: 1 addition & 0 deletions tests/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pytest-aiohttp==0.1.3
pytest-cov==2.4.0
pytest-isort==0.1.0
pytest-sugar==0.7.1
pytest-toolbox==0.1
pytz==2016.10
Sphinx==1.4.6
typed-ast==0.6.1
3 changes: 0 additions & 3 deletions tests/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,6 @@ async def test_dispatch_work(tmpworkdir, loop, caplog, redis_conn):
'task complete, 1 jobs done, 0 failed\n'
'task complete, 2 jobs done, 0 failed\n'
'shutting down worker after 0.0XXs ◆ 2 jobs done ◆ 0 failed ◆ 0 timed out\n') == log
# quick check of caplog's str and repr
assert str(caplog).startswith('caplog:\nMockRedisDemoActor')
assert repr(caplog).startswith("< caplog: 'MockRedisDemoActor")


async def test_handle_exception(loop, caplog):
Expand Down
2 changes: 1 addition & 1 deletion tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ async def test_queue_not_found(loop):


async def test_mock_timeout(loop, caplog):
caplog.set_loggers('arq.main', 'arq.work', 'arq.mock', level=logging.DEBUG)
caplog.set_loggers(log_names=('arq.main', 'arq.work', 'arq.mock'), level=logging.DEBUG)
worker = MockRedisWorkerQuit(loop=loop)
actor = MockRedisDemoActor(loop=loop)
worker.mock_data = actor.mock_data
Expand Down

0 comments on commit ea03a63

Please sign in to comment.