From 70dbe0e857387ecd4996e2e9bd76816f5b71e0c5 Mon Sep 17 00:00:00 2001 From: Samuel Colvin Date: Wed, 27 Jul 2016 22:15:41 +0100 Subject: [PATCH] adding tests and setup --- .travis.yml | 20 ++++++++++ LICENSE | 21 ++++++++++ README.md | 6 +++ arq/__init__.py | 2 + arq/main.py | 22 +++++------ arq/version.py | 3 ++ arq/worker.py | 2 +- requirements.txt | 3 -- setup.cfg | 22 +++++++++++ setup.py | 47 ++++++++++++++++++++++ tests/__init__.py | 0 tests/conftest.py | 88 ++++++++++++++++++++++++++++++++++++++++++ tests/fixtures.py | 19 +++++++++ tests/requirements.txt | 7 ++++ tests/test_dispatch.py | 40 +++++++++++++++++++ 15 files changed, 287 insertions(+), 15 deletions(-) create mode 100644 .travis.yml create mode 100644 LICENSE create mode 100644 README.md create mode 100644 arq/version.py delete mode 100644 requirements.txt create mode 100644 setup.cfg create mode 100644 setup.py create mode 100644 tests/__init__.py create mode 100644 tests/conftest.py create mode 100644 tests/fixtures.py create mode 100644 tests/requirements.txt create mode 100644 tests/test_dispatch.py diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 00000000..b522ef4d --- /dev/null +++ b/.travis.yml @@ -0,0 +1,20 @@ +language: python + +cache: pip + +services: + - redis-server + +python: + - "3.5" + +install: + - pip install -e . + - pip install -r tests/requirements.txt + +script: + - flake8 arq/ tests/ + - py.test --cov=arq + +after_success: + - codecov diff --git a/LICENSE b/LICENSE new file mode 100644 index 00000000..65575b60 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2016 Samuel Colvin + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 00000000..24a6296c --- /dev/null +++ b/README.md @@ -0,0 +1,6 @@ +arq +=== + +[![Build Status](https://travis-ci.org/samuelcolvin/arq.svg?branch=master)](https://travis-ci.org/samuelcolvin/arq) + +[rq](https://github.com/nvie/rq) meets asyncio using [aioredis](https://github.com/aio-libs/aioredis). diff --git a/arq/__init__.py b/arq/__init__.py index eb542278..a73ad91d 100644 --- a/arq/__init__.py +++ b/arq/__init__.py @@ -1,3 +1,5 @@ +# flake8: noqa from .utils import * from .main import * from .worker import * +from .version import VERSION diff --git a/arq/main.py b/arq/main.py index 85999640..d11d4723 100644 --- a/arq/main.py +++ b/arq/main.py @@ -5,7 +5,7 @@ import msgpack -from .utils import * +from .utils import RedisMixin, timestamp from .worker import run_job @@ -47,7 +47,6 @@ class Dispatch(RedisMixin): HIGH_QUEUE = b'high' DEFAULT_QUEUE = b'dft' LOW_QUEUE = b'low' - __tasks = set() DEFAULT_QUEUES = ( HIGH_QUEUE, @@ -55,36 +54,37 @@ class Dispatch(RedisMixin): LOW_QUEUE ) + def __init__(self, **kwargs): + self.arq_tasks = set() + super().__init__(**kwargs) + async def enqueue_job(self, func_name, *args, queue=None, **kwargs): queue = queue or self.DEFAULT_QUEUE - class_name = self.__class__.__name__ data = self.encode_args( - queued_at=int(timestamp() * 1000), - class_name=class_name, func_name=func_name, args=args, kwargs=kwargs, ) - logger.debug('%s.%s ▶ %s (mode: %s)', class_name, func_name, queue.decode(), mode) + logger.debug('%s.%s ▶ %s (mode: %s)', self.__class__.__name__, func_name, queue.decode(), mode) if mode.direct or mode.asyncio_loop: coro = run_job(queue, data, lambda j: self) if mode.direct: await coro else: - self.__tasks.add(self.loop.create_task(coro)) + self.arq_tasks.add(self.loop.create_task(coro)) else: pool = await self.init_redis_pool() async with pool.get() as redis: await redis.rpush(queue, data) - @staticmethod - def encode_args(*, queued_at, class_name, func_name, args, kwargs): - return msgpack.packb([queued_at, class_name, func_name, args, kwargs], use_bin_type=True) + def encode_args(self, *, func_name, args, kwargs): + queued_at = int(timestamp() * 1000) + return msgpack.packb([queued_at, self.__class__.__name__, func_name, args, kwargs], use_bin_type=True) async def close(self): if mode.asyncio_loop: - asyncio.wait(self.__tasks) + await asyncio.wait(self.arq_tasks, loop=self.loop) await super().close() diff --git a/arq/version.py b/arq/version.py new file mode 100644 index 00000000..446f1dbf --- /dev/null +++ b/arq/version.py @@ -0,0 +1,3 @@ +from distutils.version import StrictVersion + +VERSION = StrictVersion('0.0.1') diff --git a/arq/worker.py b/arq/worker.py index 3c4767d9..d7d37bae 100644 --- a/arq/worker.py +++ b/arq/worker.py @@ -8,7 +8,7 @@ import msgpack -from .utils import * +from .utils import RedisMixin, timestamp, cached_property __all__ = [ 'AbstractWorkManager', diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index b3fa8293..00000000 --- a/requirements.txt +++ /dev/null @@ -1,3 +0,0 @@ -aioredis==0.2.7 -click==6.6 -msgpack-python==0.4.7 diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 00000000..16dc9c33 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,22 @@ +[pytest] +testpaths = tests + +[flake8] +max-line-length = 120 +max-complexity = 12 + +[coverage:run] +source = arq +branch = True +concurrency = multiprocessing +parallel = False + +[coverage:report] +exclude_lines = + pragma: no cover + + raise NotImplementedError + raise NotImplemented + +[bdist_wheel] +python-tag = py35 diff --git a/setup.py b/setup.py new file mode 100644 index 00000000..5b8f809a --- /dev/null +++ b/setup.py @@ -0,0 +1,47 @@ +from importlib.machinery import SourceFileLoader +from setuptools import setup + +description = """ +arq +=== +""" + +# avoid loading the package before requirements are installed: +version = SourceFileLoader('version', 'arq/version.py').load_module() + +setup( + name='arq', + version=str(version.VERSION), + description='arq', + long_description=description, + classifiers=[ + 'Development Status :: 3 - Alpha', + 'Programming Language :: Python', + 'Programming Language :: Python :: 3', + 'Programming Language :: Python :: 3.5', + 'Intended Audience :: Developers', + 'Intended Audience :: Information Technology', + 'Intended Audience :: Science/Research', + 'Intended Audience :: System Administrators', + 'License :: OSI Approved :: MIT License', + 'Operating System :: Unix', + 'Topic :: Software Development :: Libraries :: Python Modules', + 'Topic :: Internet', + 'Topic :: Scientific/Engineering', + 'Topic :: System :: Distributed Computing', + 'Topic :: System :: Systems Administration', + 'Topic :: System :: Monitoring', + ], + keywords='arq,asyncio,redis,queue,distributed', + author='Samuel Colvin', + author_email='s@muelcolvin.com', + url='https://github.com/samuelcolvin/arq', + license='MIT', + packages=['arq'], + zip_safe=True, + install_requires=[ + 'aioredis==0.2.7', + 'click==6.6', + 'msgpack-python==0.4.7', + ], +) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 00000000..0f817862 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,88 @@ +import asyncio +import contextlib +import gc +import logging +import os + +import pytest + + +def _teardown_test_loop(_loop): + is_closed = getattr(_loop, 'is_closed') + if is_closed is not None: + closed = is_closed() + else: + closed = _loop._closed + if not closed: + _loop.call_soon(_loop.stop) + _loop.run_forever() + _loop.close() + gc.collect() + asyncio.set_event_loop(None) + + +@contextlib.contextmanager +def loop_context(existing_loop=None): + if existing_loop: + # loop already exists, pass it straight through + yield existing_loop + else: + _loop = asyncio.new_event_loop() + asyncio.set_event_loop(None) + yield _loop + _teardown_test_loop(_loop) + + +def pytest_pycollect_makeitem(collector, name, obj): + """ + Fix pytest collecting for coroutines. + """ + if collector.funcnamefilter(name) and asyncio.iscoroutinefunction(obj): + return list(collector._genfunctions(name, obj)) + + +def pytest_pyfunc_call(pyfuncitem): + """ + Run coroutines in an event loop instead of a normal function call. + """ + if asyncio.iscoroutinefunction(pyfuncitem.function): + existing_loop = pyfuncitem.funcargs.get('loop', None) + with loop_context(existing_loop) as _loop: + testargs = {arg: pyfuncitem.funcargs[arg] + for arg in pyfuncitem._fixtureinfo.argnames} + + task = _loop.create_task(pyfuncitem.obj(**testargs)) + _loop.run_until_complete(task) + + return True + + +@pytest.yield_fixture +def loop(): + with loop_context() as _loop: + yield _loop + + +@pytest.yield_fixture +def debug_logger(): + handler = logging.StreamHandler() + logger = logging.getLogger('.') + logger.addHandler(handler) + logger.setLevel(logging.DEBUG) + + yield + + logger.removeHandler(handler) + + +@pytest.yield_fixture +def tmpworkdir(tmpdir): + """ + Create a temporary working working directory. + """ + cwd = os.getcwd() + os.chdir(tmpdir.strpath) + + yield tmpdir + + os.chdir(cwd) diff --git a/tests/fixtures.py b/tests/fixtures.py new file mode 100644 index 00000000..94df0c89 --- /dev/null +++ b/tests/fixtures.py @@ -0,0 +1,19 @@ +from arq import concurrent, Dispatch + + +class Demo(Dispatch): + @concurrent + async def add_numbers(self, a, b): + with open('add_numbers', 'w') as f: + r = a + b + f.write('{}'.format(r)) + + @concurrent(Dispatch.HIGH_QUEUE) + async def high_add_numbers(self, a, b, c=4): + with open('high_add_numbers', 'w') as f: + r = a + b + c + f.write('{}'.format(r)) + + @concurrent + async def boom(self, a, b): + raise RuntimeError('boom') diff --git a/tests/requirements.txt b/tests/requirements.txt new file mode 100644 index 00000000..16de66a2 --- /dev/null +++ b/tests/requirements.txt @@ -0,0 +1,7 @@ +codecov +coverage +flake8 +pep8 +pytest +pytest-cov +pytest-sugar diff --git a/tests/test_dispatch.py b/tests/test_dispatch.py new file mode 100644 index 00000000..f5e75806 --- /dev/null +++ b/tests/test_dispatch.py @@ -0,0 +1,40 @@ +import pytest + +from arq import mode, Dispatch, concurrent + +from .fixtures import Demo + + +async def test_simple_aloop(tmpworkdir, loop): + mode.set_asyncio_loop() + demo = Demo(loop=loop) + r1 = await demo.add_numbers(1, 2) + assert r1 is None + assert len(demo.arq_tasks) == 1 + coro = list(demo.arq_tasks)[0] + # this is the run job coroutine + r2 = await coro + assert r2 is None + with open('add_numbers') as f: + assert f.read() == '3' + await demo.close() + + +async def test_simple_direct(tmpworkdir, loop): + mode.set_direct() + demo = Demo(loop=loop) + print(demo.arq_tasks) + r1 = await demo.add_numbers(1, 2) + assert r1 is None + assert len(demo.arq_tasks) == 0 + with open('add_numbers') as f: + assert f.read() == '3' + + +async def test_bad_def(): + with pytest.raises(TypeError) as excinfo: + class BadDispatch(Dispatch): + @concurrent + def just_a_function(self): + pass + assert excinfo.value.args[0] == 'test_bad_def..BadDispatch.just_a_function is not a coroutine function'