Skip to content

Commit

Permalink
adding tests and setup
Browse files Browse the repository at this point in the history
  • Loading branch information
samuelcolvin committed Jul 27, 2016
1 parent 7434f59 commit 70dbe0e
Show file tree
Hide file tree
Showing 15 changed files with 287 additions and 15 deletions.
20 changes: 20 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
language: python

cache: pip

services:
- redis-server

python:
- "3.5"

install:
- pip install -e .
- pip install -r tests/requirements.txt

script:
- flake8 arq/ tests/
- py.test --cov=arq

after_success:
- codecov
21 changes: 21 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
The MIT License (MIT)

Copyright (c) 2016 Samuel Colvin

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
arq
===

[![Build Status](https://travis-ci.org/samuelcolvin/arq.svg?branch=master)](https://travis-ci.org/samuelcolvin/arq)

[rq](https://github.com/nvie/rq) meets asyncio using [aioredis](https://github.com/aio-libs/aioredis).
2 changes: 2 additions & 0 deletions arq/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# flake8: noqa
from .utils import *
from .main import *
from .worker import *
from .version import VERSION
22 changes: 11 additions & 11 deletions arq/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import msgpack

from .utils import *
from .utils import RedisMixin, timestamp
from .worker import run_job


Expand Down Expand Up @@ -47,44 +47,44 @@ class Dispatch(RedisMixin):
HIGH_QUEUE = b'high'
DEFAULT_QUEUE = b'dft'
LOW_QUEUE = b'low'
__tasks = set()

DEFAULT_QUEUES = (
HIGH_QUEUE,
DEFAULT_QUEUE,
LOW_QUEUE
)

def __init__(self, **kwargs):
self.arq_tasks = set()
super().__init__(**kwargs)

async def enqueue_job(self, func_name, *args, queue=None, **kwargs):
queue = queue or self.DEFAULT_QUEUE
class_name = self.__class__.__name__
data = self.encode_args(
queued_at=int(timestamp() * 1000),
class_name=class_name,
func_name=func_name,
args=args,
kwargs=kwargs,
)
logger.debug('%s.%s ▶ %s (mode: %s)', class_name, func_name, queue.decode(), mode)
logger.debug('%s.%s ▶ %s (mode: %s)', self.__class__.__name__, func_name, queue.decode(), mode)

if mode.direct or mode.asyncio_loop:
coro = run_job(queue, data, lambda j: self)
if mode.direct:
await coro
else:
self.__tasks.add(self.loop.create_task(coro))
self.arq_tasks.add(self.loop.create_task(coro))
else:
pool = await self.init_redis_pool()
async with pool.get() as redis:
await redis.rpush(queue, data)

@staticmethod
def encode_args(*, queued_at, class_name, func_name, args, kwargs):
return msgpack.packb([queued_at, class_name, func_name, args, kwargs], use_bin_type=True)
def encode_args(self, *, func_name, args, kwargs):
queued_at = int(timestamp() * 1000)
return msgpack.packb([queued_at, self.__class__.__name__, func_name, args, kwargs], use_bin_type=True)

async def close(self):
if mode.asyncio_loop:
asyncio.wait(self.__tasks)
await asyncio.wait(self.arq_tasks, loop=self.loop)
await super().close()


Expand Down
3 changes: 3 additions & 0 deletions arq/version.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from distutils.version import StrictVersion

VERSION = StrictVersion('0.0.1')
2 changes: 1 addition & 1 deletion arq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

import msgpack

from .utils import *
from .utils import RedisMixin, timestamp, cached_property

__all__ = [
'AbstractWorkManager',
Expand Down
3 changes: 0 additions & 3 deletions requirements.txt

This file was deleted.

22 changes: 22 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[pytest]
testpaths = tests

[flake8]
max-line-length = 120
max-complexity = 12

[coverage:run]
source = arq
branch = True
concurrency = multiprocessing
parallel = False

[coverage:report]
exclude_lines =
pragma: no cover

raise NotImplementedError
raise NotImplemented

[bdist_wheel]
python-tag = py35
47 changes: 47 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from importlib.machinery import SourceFileLoader
from setuptools import setup

description = """
arq
===
"""

# avoid loading the package before requirements are installed:
version = SourceFileLoader('version', 'arq/version.py').load_module()

setup(
name='arq',
version=str(version.VERSION),
description='arq',
long_description=description,
classifiers=[
'Development Status :: 3 - Alpha',
'Programming Language :: Python',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.5',
'Intended Audience :: Developers',
'Intended Audience :: Information Technology',
'Intended Audience :: Science/Research',
'Intended Audience :: System Administrators',
'License :: OSI Approved :: MIT License',
'Operating System :: Unix',
'Topic :: Software Development :: Libraries :: Python Modules',
'Topic :: Internet',
'Topic :: Scientific/Engineering',
'Topic :: System :: Distributed Computing',
'Topic :: System :: Systems Administration',
'Topic :: System :: Monitoring',
],
keywords='arq,asyncio,redis,queue,distributed',
author='Samuel Colvin',
author_email='s@muelcolvin.com',
url='https://github.com/samuelcolvin/arq',
license='MIT',
packages=['arq'],
zip_safe=True,
install_requires=[
'aioredis==0.2.7',
'click==6.6',
'msgpack-python==0.4.7',
],
)
Empty file added tests/__init__.py
Empty file.
88 changes: 88 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import asyncio
import contextlib
import gc
import logging
import os

import pytest


def _teardown_test_loop(_loop):
is_closed = getattr(_loop, 'is_closed')
if is_closed is not None:
closed = is_closed()
else:
closed = _loop._closed
if not closed:
_loop.call_soon(_loop.stop)
_loop.run_forever()
_loop.close()
gc.collect()
asyncio.set_event_loop(None)


@contextlib.contextmanager
def loop_context(existing_loop=None):
if existing_loop:
# loop already exists, pass it straight through
yield existing_loop
else:
_loop = asyncio.new_event_loop()
asyncio.set_event_loop(None)
yield _loop
_teardown_test_loop(_loop)


def pytest_pycollect_makeitem(collector, name, obj):
"""
Fix pytest collecting for coroutines.
"""
if collector.funcnamefilter(name) and asyncio.iscoroutinefunction(obj):
return list(collector._genfunctions(name, obj))


def pytest_pyfunc_call(pyfuncitem):
"""
Run coroutines in an event loop instead of a normal function call.
"""
if asyncio.iscoroutinefunction(pyfuncitem.function):
existing_loop = pyfuncitem.funcargs.get('loop', None)
with loop_context(existing_loop) as _loop:
testargs = {arg: pyfuncitem.funcargs[arg]
for arg in pyfuncitem._fixtureinfo.argnames}

task = _loop.create_task(pyfuncitem.obj(**testargs))
_loop.run_until_complete(task)

return True


@pytest.yield_fixture
def loop():
with loop_context() as _loop:
yield _loop


@pytest.yield_fixture
def debug_logger():
handler = logging.StreamHandler()
logger = logging.getLogger('.')
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)

yield

logger.removeHandler(handler)


@pytest.yield_fixture
def tmpworkdir(tmpdir):
"""
Create a temporary working working directory.
"""
cwd = os.getcwd()
os.chdir(tmpdir.strpath)

yield tmpdir

os.chdir(cwd)
19 changes: 19 additions & 0 deletions tests/fixtures.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from arq import concurrent, Dispatch


class Demo(Dispatch):
@concurrent
async def add_numbers(self, a, b):
with open('add_numbers', 'w') as f:
r = a + b
f.write('{}'.format(r))

@concurrent(Dispatch.HIGH_QUEUE)
async def high_add_numbers(self, a, b, c=4):
with open('high_add_numbers', 'w') as f:
r = a + b + c
f.write('{}'.format(r))

@concurrent
async def boom(self, a, b):
raise RuntimeError('boom')
7 changes: 7 additions & 0 deletions tests/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
codecov
coverage
flake8
pep8
pytest
pytest-cov
pytest-sugar
40 changes: 40 additions & 0 deletions tests/test_dispatch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import pytest

from arq import mode, Dispatch, concurrent

from .fixtures import Demo


async def test_simple_aloop(tmpworkdir, loop):
mode.set_asyncio_loop()
demo = Demo(loop=loop)
r1 = await demo.add_numbers(1, 2)
assert r1 is None
assert len(demo.arq_tasks) == 1
coro = list(demo.arq_tasks)[0]
# this is the run job coroutine
r2 = await coro
assert r2 is None
with open('add_numbers') as f:
assert f.read() == '3'
await demo.close()


async def test_simple_direct(tmpworkdir, loop):
mode.set_direct()
demo = Demo(loop=loop)
print(demo.arq_tasks)
r1 = await demo.add_numbers(1, 2)
assert r1 is None
assert len(demo.arq_tasks) == 0
with open('add_numbers') as f:
assert f.read() == '3'


async def test_bad_def():
with pytest.raises(TypeError) as excinfo:
class BadDispatch(Dispatch):
@concurrent
def just_a_function(self):
pass
assert excinfo.value.args[0] == 'test_bad_def.<locals>.BadDispatch.just_a_function is not a coroutine function'

0 comments on commit 70dbe0e

Please sign in to comment.