Skip to content

Commit

Permalink
working on full mypy coverage (#181)
Browse files Browse the repository at this point in the history
* working on full mypy coverage

* fix errors

* mypy coverage for worker.py

* typing tests and coverage

* fix linting

* fixing use of Protcol and Literal in older python

* trying to fix coverage

* fix coverage

* fix coverage

* tweak cli.py
  • Loading branch information
samuelcolvin authored Apr 24, 2020
1 parent 20b40fd commit eee0b66
Show file tree
Hide file tree
Showing 20 changed files with 378 additions and 216 deletions.
8 changes: 5 additions & 3 deletions .codecov.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
coverage:
status:
patch: false
project: false
precision: 2
range: [95, 100]

comment:
layout: 'header, diff, flags, files, footer'
4 changes: 2 additions & 2 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ concurrency = multiprocessing
[report]
exclude_lines =
pragma: no cover

raise NotImplementedError
raise NotImplemented
if TYPE_CHECKING:
@overload
3 changes: 3 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ jobs:
- name: lint
run: make lint

- name: mypy
run: make mypy

- name: test
run: |
make test
Expand Down
1 change: 1 addition & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ v0.19.0 (unreleased)
....................
* Python 3.8 support, #178
* fix concurrency with multiple workers, #180
* full mypy coverage, #181

v0.18.4 (2019-12-19)
....................
Expand Down
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
The MIT License (MIT)

Copyright (c) 2016 Samuel Colvin
Copyright (c) 2017, 2018, 2019, 2020 Samuel Colvin and other contributors

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
Expand Down
3 changes: 3 additions & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
include LICENSE
include README.md
include HISTORY.rst
12 changes: 9 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,16 @@ test:
pytest --cov=arq

.PHONY: testcov
testcov:
pytest --cov=arq && (echo "building coverage html"; coverage html)
testcov: test
@echo "building coverage html"
@coverage html

.PHONY: mypy
mypy:
mypy arq

.PHONY: all
all: lint testcov
all: lint mypy testcov

.PHONY: clean
clean:
Expand All @@ -38,6 +43,7 @@ clean:
rm -f `find . -type f -name '.*~' `
rm -rf .cache
rm -rf .pytest_cache
rm -rf .mypy_cache
rm -rf htmlcov
rm -rf *.egg-info
rm -f .coverage
Expand Down
25 changes: 17 additions & 8 deletions arq/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os
import sys
from signal import Signals
from typing import TYPE_CHECKING, cast

import click
from pydantic.utils import import_string
Expand All @@ -11,6 +12,9 @@
from .version import VERSION
from .worker import check_health, create_worker, run_worker

if TYPE_CHECKING:
from .typing import WorkerSettingsType

burst_help = 'Batch mode: exit once no jobs are found in any queue.'
health_check_help = 'Health Check: run a health check and exit.'
watch_help = 'Watch a directory and reload the worker upon changes.'
Expand All @@ -24,37 +28,42 @@
@click.option('--check', is_flag=True, help=health_check_help)
@click.option('--watch', type=click.Path(exists=True, dir_okay=True, file_okay=False), help=watch_help)
@click.option('-v', '--verbose', is_flag=True, help=verbose_help)
def cli(*, worker_settings, burst, check, watch, verbose):
def cli(*, worker_settings: str, burst: bool, check: bool, watch: str, verbose: bool) -> None:
"""
Job queues in python with asyncio and redis.
CLI to run the arq worker.
"""
sys.path.append(os.getcwd())
worker_settings = import_string(worker_settings)
worker_settings_ = cast('WorkerSettingsType', import_string(worker_settings))
logging.config.dictConfig(default_log_config(verbose))

if check:
exit(check_health(worker_settings))
exit(check_health(worker_settings_))
else:
kwargs = {} if burst is None else {'burst': burst}
if watch:
loop = asyncio.get_event_loop()
loop.run_until_complete(watch_reload(watch, worker_settings, loop))
asyncio.get_event_loop().run_until_complete(watch_reload(watch, worker_settings_))
else:
run_worker(worker_settings, **kwargs)
run_worker(worker_settings_, **kwargs)


async def watch_reload(path, worker_settings, loop):
async def watch_reload(path: str, worker_settings: 'WorkerSettingsType') -> None:
try:
from watchgod import awatch
except ImportError as e: # pragma: no cover
raise ImportError('watchgod not installed, use `pip install watchgod`') from e

loop = asyncio.get_event_loop()
stop_event = asyncio.Event()

def worker_on_stop(s: Signals) -> None:
if s != Signals.SIGUSR1: # pragma: no cover
stop_event.set()

worker = create_worker(worker_settings)
try:
worker.on_stop = lambda s: s != Signals.SIGUSR1 and stop_event.set()
worker.on_stop = worker_on_stop
loop.create_task(worker.async_run())
async for _ in awatch(path, stop_event=stop_event):
print('\nfiles changed, reloading arq worker...')
Expand Down
32 changes: 17 additions & 15 deletions arq/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from datetime import datetime, timedelta
from operator import attrgetter
from ssl import SSLContext
from typing import Any, List, Optional, Tuple, Union
from typing import Any, Callable, List, Optional, Tuple, Union
from uuid import uuid4

import aioredis
Expand All @@ -30,23 +30,23 @@ class RedisSettings:
port: int = 6379
database: int = 0
password: Optional[str] = None
ssl: [bool, None, SSLContext] = None
ssl: Union[bool, None, SSLContext] = None
conn_timeout: int = 1
conn_retries: int = 5
conn_retry_delay: int = 1

sentinel: bool = False
sentinel_master: str = 'mymaster'

def __repr__(self):
def __repr__(self) -> str:
return '<RedisSettings {}>'.format(' '.join(f'{k}={v}' for k, v in self.__dict__.items()))


# extra time after the job is expected to start when the job key should expire, 1 day in ms
expires_extra_ms = 86_400_000


class ArqRedis(Redis):
class ArqRedis(Redis): # type: ignore
"""
Thin subclass of ``aioredis.Redis`` which adds :func:`arq.connections.enqueue_job`.
Expand All @@ -58,10 +58,10 @@ class ArqRedis(Redis):

def __init__(
self,
pool_or_conn,
pool_or_conn: Any,
job_serializer: Optional[Serializer] = None,
job_deserializer: Optional[Deserializer] = None,
**kwargs,
**kwargs: Any,
) -> None:
self.job_serializer = job_serializer
self.job_deserializer = job_deserializer
Expand Down Expand Up @@ -108,7 +108,7 @@ async def enqueue_job(
job_result_exists = pipe.exists(result_key_prefix + job_id)
await pipe.execute()
if await job_exists or await job_result_exists:
return
return None

enqueue_time_ms = timestamp_ms()
if _defer_until is not None:
Expand All @@ -130,13 +130,15 @@ async def enqueue_job(
# job got enqueued since we checked 'job_exists'
# https://github.com/samuelcolvin/arq/issues/131, avoid warnings in log
await asyncio.gather(*tr._results, return_exceptions=True)
return
return None
return Job(job_id, redis=self, _queue_name=_queue_name, _deserializer=self.job_deserializer)

async def _get_job_result(self, key) -> JobResult:
async def _get_job_result(self, key: str) -> JobResult:
job_id = key[len(result_key_prefix) :]
job = Job(job_id, self, _deserializer=self.job_deserializer)
r = await job.result_info()
if r is None:
raise KeyError(f'job "{key}" not found')
r.job_id = job_id
return r

Expand All @@ -148,7 +150,7 @@ async def all_job_results(self) -> List[JobResult]:
results = await asyncio.gather(*[self._get_job_result(k) for k in keys])
return sorted(results, key=attrgetter('enqueue_time'))

async def _get_job_def(self, job_id, score) -> JobDef:
async def _get_job_def(self, job_id: str, score: int) -> JobDef:
v = await self.get(job_key_prefix + job_id, encoding=None)
jd = deserialize_job(v, deserializer=self.job_deserializer)
jd.score = score
Expand All @@ -163,7 +165,7 @@ async def queued_jobs(self, *, queue_name: str = default_queue_name) -> List[Job


async def create_pool(
settings: RedisSettings = None,
settings_: RedisSettings = None,
*,
retry: int = 0,
job_serializer: Optional[Serializer] = None,
Expand All @@ -175,16 +177,16 @@ async def create_pool(
Similar to ``aioredis.create_redis_pool`` except it returns a :class:`arq.connections.ArqRedis` instance,
thus allowing job enqueuing.
"""
settings = settings or RedisSettings()
settings: RedisSettings = RedisSettings() if settings_ is None else settings_

assert not (
type(settings.host) is str and settings.sentinel
), "str provided for 'host' but 'sentinel' is true; list of sentinels expected"

if settings.sentinel:
addr = settings.host
addr: Any = settings.host

async def pool_factory(*args, **kwargs):
async def pool_factory(*args: Any, **kwargs: Any) -> Redis:
client = await aioredis.sentinel.create_sentinel_pool(*args, ssl=settings.ssl, **kwargs)
return client.master_for(settings.sentinel_master)

Expand Down Expand Up @@ -222,7 +224,7 @@ async def pool_factory(*args, **kwargs):
)


async def log_redis_info(redis, log_func):
async def log_redis_info(redis: Redis, log_func: Callable[[str], Any]) -> None:
with await redis as r:
info, key_count = await asyncio.gather(r.info(), r.dbsize())
log_func(
Expand Down
Loading

0 comments on commit eee0b66

Please sign in to comment.