diff --git a/.codecov.yml b/.codecov.yml index c9526802..0ea43a02 100644 --- a/.codecov.yml +++ b/.codecov.yml @@ -1,6 +1,9 @@ coverage: precision: 2 range: [95, 100] + status: + patch: false + project: false comment: layout: 'header, diff, flags, files, footer' diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8b7f0460..fb0f71b5 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -49,7 +49,7 @@ jobs: fail-fast: false matrix: os: [ubuntu] - python-version: ['3.6', '3.7', '3.8', '3.9'] + python-version: ['3.7', '3.8', '3.9'] env: PYTHON: ${{ matrix.python-version }} diff --git a/Makefile b/Makefile index 35fd24c5..0155352b 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ .DEFAULT_GOAL := all isort = isort arq tests -black = black -S -l 120 --target-version py37 arq tests +black = black arq tests .PHONY: install install: @@ -15,7 +15,7 @@ format: .PHONY: lint lint: - flake8 arq/ tests/ + flake8 --max-complexity 10 --max-line-length 120 --ignore E203,W503 arq/ tests/ $(isort) --check-only --df $(black) --check diff --git a/arq/cli.py b/arq/cli.py index 8cc03227..97f62992 100644 --- a/arq/cli.py +++ b/arq/cli.py @@ -43,7 +43,7 @@ def cli(*, worker_settings: str, burst: bool, check: bool, watch: str, verbose: else: kwargs = {} if burst is None else {'burst': burst} if watch: - asyncio.get_event_loop().run_until_complete(watch_reload(watch, worker_settings_)) + asyncio.run(watch_reload(watch, worker_settings_)) else: run_worker(worker_settings_, **kwargs) diff --git a/arq/connections.py b/arq/connections.py index 770d86ba..59b6b47f 100644 --- a/arq/connections.py +++ b/arq/connections.py @@ -266,7 +266,10 @@ async def pool_factory(*args: Any, **kwargs: Any) -> Redis: async def log_redis_info(redis: Redis, log_func: Callable[[str], Any]) -> None: with await redis as r: info_server, info_memory, info_clients, key_count = await asyncio.gather( - r.info(section='Server'), r.info(section='Memory'), r.info(section='Clients'), r.dbsize(), + r.info(section='Server'), + r.info(section='Memory'), + r.info(section='Clients'), + r.dbsize(), ) redis_version = info_server.get('server', {}).get('redis_version', '?') diff --git a/arq/version.py b/arq/version.py index 83522fc0..b47d1126 100644 --- a/arq/version.py +++ b/arq/version.py @@ -1,3 +1,5 @@ __all__ = ['VERSION'] -VERSION = 'dev' +# version is set automatically in CI before release, +# see https://gist.github.com/samuelcolvin/da2f521da5d2195fbfd65da3b8f58589 +VERSION = '0.0.dev0' diff --git a/arq/worker.py b/arq/worker.py index 4a32450e..bb299ab1 100644 --- a/arq/worker.py +++ b/arq/worker.py @@ -600,7 +600,13 @@ async def job_failed(exc: BaseException) -> None: await asyncio.shield( self.finish_job( - job_id, finish, result_data, result_timeout_s, keep_result_forever, incr_score, keep_in_progress, + job_id, + finish, + result_data, + result_timeout_s, + keep_result_forever, + incr_score, + keep_in_progress, ) ) @@ -642,7 +648,9 @@ async def finish_failed_job(self, job_id: str, result_data: Optional[bytes]) -> await conn.unwatch() tr = conn.multi_exec() tr.delete( - retry_key_prefix + job_id, in_progress_key_prefix + job_id, job_key_prefix + job_id, + retry_key_prefix + job_id, + in_progress_key_prefix + job_id, + job_key_prefix + job_id, ) tr.zrem(abort_jobs_ss, job_id) tr.zrem(self.queue_name, job_id) @@ -798,5 +806,4 @@ def check_health(settings_cls: 'WorkerSettingsType') -> int: redis_settings = cast(Optional[RedisSettings], cls_kwargs.get('redis_settings')) health_check_key = cast(Optional[str], cls_kwargs.get('health_check_key')) queue_name = cast(Optional[str], cls_kwargs.get('queue_name')) - loop = asyncio.get_event_loop() - return loop.run_until_complete(async_check_health(redis_settings, health_check_key, queue_name)) + return asyncio.run(async_check_health(redis_settings, health_check_key, queue_name)) diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 00000000..de7d5f94 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,59 @@ +[tool.pytest.ini_options] +testpaths = 'tests' +filterwarnings = ['error', 'ignore::DeprecationWarning:aioredis'] +asyncio_mode = 'auto' +timeout = 10 + +[tool.coverage.run] +source = ['src'] +branch = true + +[tool.coverage.report] +precision = 2 +exclude_lines = [ + 'pragma: no cover', + 'raise NotImplementedError', + 'raise NotImplemented', + 'if TYPE_CHECKING:', + '@overload', +] + +[tool.black] +color = true +line-length = 120 +target-version = ['py39'] +skip-string-normalization = true + +[tool.isort] +line_length = 120 +known_third_party = 'foxglove' +multi_line_output = 3 +include_trailing_comma = true +force_grid_wrap = 0 +combine_as_imports = true +color_output = true + +[tool.mypy] +follow_imports = 'silent' +strict_optional = true +warn_redundant_casts = true +warn_unused_ignores = true +disallow_any_generics = true +check_untyped_defs = true +no_implicit_reexport = true +warn_unused_configs = true +disallow_subclassing_any = true +disallow_incomplete_defs = true +disallow_untyped_decorators = true +disallow_untyped_calls = true + +# for strict mypy: (this is the tricky one :-)) +disallow_untyped_defs = true + +# remaining arguments from `mypy --strict` which cause errors +#no_implicit_optional = true +#warn_return_any = true + +[[tool.mypy.overrides]] +module = ['aioredis', 'watchgod'] +ignore_missing_imports = true diff --git a/setup.cfg b/setup.cfg deleted file mode 100644 index 12a7f168..00000000 --- a/setup.cfg +++ /dev/null @@ -1,59 +0,0 @@ -[tool:pytest] -testpaths = tests -timeout = 5 -filterwarnings = - error - ignore::DeprecationWarning:aioredis - -[flake8] -max-complexity = 10 -max-line-length = 120 -ignore = E203, W503 - -[coverage:run] -source = arq -branch = True - -[coverage:report] -precision = 2 -exclude_lines = - pragma: no cover - raise NotImplementedError - raise NotImplemented - if TYPE_CHECKING: - @overload - -[isort] -line_length=120 -known_third_party=pytest -multi_line_output=3 -include_trailing_comma=True -force_grid_wrap=0 -combine_as_imports=True - -[mypy] -follow_imports = silent -strict_optional = True -warn_redundant_casts = True -warn_unused_ignores = True -disallow_any_generics = True -check_untyped_defs = True -no_implicit_reexport = True -warn_unused_configs = True -disallow_subclassing_any = True -disallow_incomplete_defs = True -disallow_untyped_decorators = True -disallow_untyped_calls = True - -# for strict mypy: (this is the tricky one :-)) -disallow_untyped_defs = True - -# remaining arguments from `mypy --strict` which cause errors -;no_implicit_optional = True -;warn_return_any = True - -[mypy-aioredis] -ignore_missing_imports = true - -[mypy-watchgod] -ignore_missing_imports = true diff --git a/setup.py b/setup.py index 4bb91b72..d3bc9120 100644 --- a/setup.py +++ b/setup.py @@ -31,7 +31,6 @@ 'Programming Language :: Python', 'Programming Language :: Python :: 3', 'Programming Language :: Python :: 3 :: Only', - 'Programming Language :: Python :: 3.6', 'Programming Language :: Python :: 3.7', 'Programming Language :: Python :: 3.8', 'Programming Language :: Python :: 3.9', diff --git a/tests/conftest.py b/tests/conftest.py index e071f857..5492abe9 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -9,7 +9,13 @@ from arq.worker import Worker -@pytest.yield_fixture +@pytest.fixture(name='loop') +def _fix_loop(event_loop): + asyncio.set_event_loop(event_loop) + return event_loop + + +@pytest.fixture async def arq_redis(loop): redis_ = await create_redis_pool( ('localhost', 6379), encoding='utf8', loop=loop, commands_factory=ArqRedis, minsize=5 @@ -20,7 +26,7 @@ async def arq_redis(loop): await redis_.wait_closed() -@pytest.yield_fixture +@pytest.fixture async def arq_redis_msgpack(loop): redis_ = await create_redis_pool( ('localhost', 6379), @@ -36,7 +42,7 @@ async def arq_redis_msgpack(loop): await redis_.wait_closed() -@pytest.yield_fixture +@pytest.fixture async def worker(arq_redis): worker_: Worker = None @@ -68,3 +74,20 @@ async def create_pool_(settings, *args, **kwargs): p.close() await asyncio.gather(*[p.wait_closed() for p in pools]) + + +@pytest.fixture(name='cancel_remaining_task') +def fix_cancel_remaining_task(loop): + async def cancel_remaining_task(): + tasks = asyncio.all_tasks(loop) + cancelled = [] + for task in tasks: + # in repr works in 3.7 where get_coro() is not available + if 'cancel_remaining_task()' not in repr(task): + cancelled.append(task) + task.cancel() + await asyncio.gather(*cancelled, return_exceptions=True) + + yield + + loop.run_until_complete(cancel_remaining_task()) diff --git a/tests/requirements-linting.txt b/tests/requirements-linting.txt index 92e22d95..405bd395 100644 --- a/tests/requirements-linting.txt +++ b/tests/requirements-linting.txt @@ -1,8 +1,7 @@ -black==19.10b0 -flake8==3.7.9 -flake8-quotes==3 -isort==5.8.0 -mypy==0.812 -pycodestyle==2.5.0 -pyflakes==2.1.1 -twine==3.1.1 +black==21.12b0 +flake8==4.0.1 +flake8-quotes==3.3.1 +isort[colors]==5.10.1 +mypy==0.931 +pycodestyle==2.8.0 +pyflakes==2.4.0 diff --git a/tests/requirements-testing.txt b/tests/requirements-testing.txt index 420e112c..d23e7c4e 100644 --- a/tests/requirements-testing.txt +++ b/tests/requirements-testing.txt @@ -1,10 +1,10 @@ -coverage==5.1 -msgpack==0.6.1 -pytest==5.3.5 -pytest-aiohttp==0.3.0 -pytest-cov==2.8.1 -pytest-mock==3 -pytest-sugar==0.9.2 -pytest-timeout==1.3.3 -pytest-toolbox==0.4 -twine==3.1.1 +coverage==6.3 +dirty-equals==0.1 +msgpack==1.0.3 +pytest==6.2.5 +pytest-asyncio==0.17.2 +pytest-cov==3.0.0 +pytest-mock==3.7.0 +pytest-sugar==0.9.4 +pytest-timeout==2.1.0 +twine==3.7.1 diff --git a/tests/test_cli.py b/tests/test_cli.py index d44529c3..6db2f405 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,3 +1,6 @@ +import asyncio + +import pytest from click.testing import CliRunner from arq.cli import cli @@ -19,14 +22,16 @@ def test_help(): assert result.output.startswith('Usage: arq [OPTIONS] WORKER_SETTINGS\n') -def test_run(): +def test_run(loop): runner = CliRunner() result = runner.invoke(cli, ['tests.test_cli.WorkerSettings']) assert result.exit_code == 0 assert 'Starting worker for 1 functions: foobar' in result.output + tasks = asyncio.all_tasks(loop) + assert not tasks -def test_check(): +def test_check(loop): runner = CliRunner() result = runner.invoke(cli, ['tests.test_cli.WorkerSettings', '--check']) assert result.exit_code == 1 @@ -37,7 +42,8 @@ async def mock_awatch(): yield [1] -def test_run_watch(mocker): +@pytest.mark.filterwarnings('ignore::DeprecationWarning') +def test_run_watch(mocker, loop): mocker.patch('watchgod.awatch', return_value=mock_awatch()) runner = CliRunner() result = runner.invoke(cli, ['tests.test_cli.WorkerSettings', '--watch', 'tests']) diff --git a/tests/test_cron.py b/tests/test_cron.py index 8df3da15..2b7555de 100644 --- a/tests/test_cron.py +++ b/tests/test_cron.py @@ -154,7 +154,8 @@ async def try_sleep(ctx): raise asyncio.CancelledError worker: Worker = worker( - cron_jobs=[cron(try_sleep, microsecond=20, run_at_startup=True, max_tries=2)], poll_delay=0.01, + cron_jobs=[cron(try_sleep, microsecond=20, run_at_startup=True, max_tries=2)], + poll_delay=0.01, ) await worker.main() assert worker.jobs_complete == 1 diff --git a/tests/test_jobs.py b/tests/test_jobs.py index 83df2cb0..e8f23216 100644 --- a/tests/test_jobs.py +++ b/tests/test_jobs.py @@ -2,7 +2,7 @@ import pickle import pytest -from pytest_toolbox.comparison import CloseToNow +from dirty_equals import IsNow from arq import Worker, func from arq.connections import ArqRedis, RedisSettings, create_pool @@ -49,11 +49,11 @@ async def foobar(ctx, *args, **kwargs): function='foobar', args=(1, 2), kwargs={'c': 3}, - enqueue_time=CloseToNow(), + enqueue_time=IsNow(tz='utc'), success=True, result=42, - start_time=CloseToNow(), - finish_time=CloseToNow(), + start_time=IsNow(tz='utc'), + finish_time=IsNow(tz='utc'), score=None, queue_name=expected_queue_name, ) @@ -64,11 +64,11 @@ async def foobar(ctx, *args, **kwargs): args=(1, 2), kwargs={'c': 3}, job_try=1, - enqueue_time=CloseToNow(), + enqueue_time=IsNow(tz='utc'), success=True, result=42, - start_time=CloseToNow(), - finish_time=CloseToNow(), + start_time=IsNow(tz='utc'), + finish_time=IsNow(tz='utc'), score=None, queue_name=expected_queue_name, job_id=j.job_id, diff --git a/tests/test_main.py b/tests/test_main.py index 263fe1ac..69e340db 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -9,7 +9,7 @@ import msgpack import pytest -from pytest_toolbox.comparison import AnyInt, CloseToNow +from dirty_equals import IsInt, IsNow from arq.connections import ArqRedis from arq.constants import default_queue_name @@ -130,7 +130,7 @@ async def test_job_info(arq_redis: ArqRedis): t_before = time() j = await arq_redis.enqueue_job('foobar', 123, a=456) info = await j.info() - assert info.enqueue_time == CloseToNow() + assert info.enqueue_time == IsNow(tz='utc') assert info.job_try is None assert info.function == 'foobar' assert info.args == (123,) @@ -172,9 +172,10 @@ async def count(ctx, v): tasks = [] for i in range(50): - tasks.extend( - [arq_redis.enqueue_job('count', i, _job_id=f'v-{i}'), arq_redis.enqueue_job('count', i, _job_id=f'v-{i}')] - ) + tasks += [ + arq_redis.enqueue_job('count', i, _job_id=f'v-{i}'), + arq_redis.enqueue_job('count', i, _job_id=f'v-{i}'), + ] shuffle(tasks) await asyncio.gather(*tasks) @@ -249,24 +250,24 @@ async def test_get_jobs(arq_redis: ArqRedis): 'args': (), 'kwargs': {'a': 1, 'b': 2, 'c': 3}, 'job_try': None, - 'enqueue_time': CloseToNow(), - 'score': AnyInt(), + 'enqueue_time': IsNow(tz='utc'), + 'score': IsInt(), }, { 'function': 'second', 'args': (4,), 'kwargs': {'b': 5, 'c': 6}, 'job_try': None, - 'enqueue_time': CloseToNow(), - 'score': AnyInt(), + 'enqueue_time': IsNow(tz='utc'), + 'score': IsInt(), }, { 'function': 'third', 'args': (7,), 'kwargs': {'b': 8}, 'job_try': None, - 'enqueue_time': CloseToNow(), - 'score': AnyInt(), + 'enqueue_time': IsNow(tz='utc'), + 'score': IsInt(), }, ] assert jobs[0].score < jobs[1].score < jobs[2].score diff --git a/tests/test_utils.py b/tests/test_utils.py index 9f30fb09..4d7c8120 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -3,6 +3,7 @@ from datetime import timedelta import pytest +from aioredis.errors import MasterReplyError from pydantic import BaseModel, validator import arq.typing @@ -26,18 +27,13 @@ async def test_redis_timeout(mocker, create_pool): assert arq.utils.asyncio.sleep.call_count == 5 -async def test_redis_sentinel_failure(create_pool): - """ - FIXME: this is currently causing 3 "Task was destroyed but it is pending!" warnings - """ +async def test_redis_sentinel_failure(create_pool, cancel_remaining_task): settings = RedisSettings() settings.host = [('localhost', 6379), ('localhost', 6379)] settings.sentinel = True - try: + with pytest.raises(MasterReplyError, match='unknown command `SENTINEL`'): pool = await create_pool(settings) await pool.ping('ping') - except Exception as e: - assert 'unknown command `SENTINEL`' in str(e) async def test_redis_success_log(caplog, create_pool): diff --git a/tests/test_worker.py b/tests/test_worker.py index 1822fdf9..e9c8c1c2 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -853,7 +853,11 @@ async def test(ctx): await arq_redis.enqueue_job('func', _job_id='testing') worker: Worker = worker( - functions=[func(test, name='func')], on_job_start=on_start, on_job_end=on_end, job_timeout=0.2, poll_delay=0.1, + functions=[func(test, name='func')], + on_job_start=on_start, + on_job_end=on_end, + job_timeout=0.2, + poll_delay=0.1, ) assert worker.jobs_complete == 0 assert worker.jobs_failed == 0