Skip to content

Commit

Permalink
uprev test dependencies (#331)
Browse files Browse the repository at this point in the history
* uprev test dependencies

* add requirements-setup.txt, fix mypy

* simplify file structure

* using newer redis_types

* fix linting
  • Loading branch information
samuelcolvin authored Aug 23, 2022
1 parent 980d85e commit a5e6742
Show file tree
Hide file tree
Showing 18 changed files with 176 additions and 191 deletions.
7 changes: 3 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ jobs:
with:
python-version: 3.9

- run: pip install -r tests/requirements-linting.txt
- run: pip install .
- run: pip install -r requirements/linting.txt -r requirements/setup.txt

- run: make lint

Expand All @@ -39,7 +38,7 @@ jobs:
with:
python-version: 3.9

- run: pip install -r docs/requirements.txt
- run: pip install -r requirements/docs.txt -r requirements/setup.txt
- run: pip install .

- run: make docs
Expand Down Expand Up @@ -73,7 +72,7 @@ jobs:
with:
python-version: ${{ matrix.python-version }}

- run: pip install -r tests/requirements-testing.txt
- run: pip install -r requirements/testing.txt -r requirements/setup.txt
- run: pip install .[watch]

- run: make test
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ black = black arq tests
.PHONY: install
install:
pip install -U pip setuptools
pip install -r requirements.txt
pip install -r requirements/all.txt
pip install -e .[watch]

.PHONY: format
Expand Down
30 changes: 19 additions & 11 deletions arq/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from dataclasses import dataclass
from datetime import datetime, timedelta
from operator import attrgetter
from typing import Any, Callable, List, Optional, Tuple, Union
from typing import TYPE_CHECKING, Any, Callable, List, Optional, Tuple, Union
from urllib.parse import urlparse
from uuid import uuid4

Expand Down Expand Up @@ -67,7 +67,13 @@ def __repr__(self) -> str:
expires_extra_ms = 86_400_000


class ArqRedis(Redis): # type: ignore[misc]
if TYPE_CHECKING:
BaseRedis = Redis[bytes]
else:
BaseRedis = Redis


class ArqRedis(BaseRedis):
"""
Thin subclass of ``redis.asyncio.Redis`` which adds :func:`arq.connections.enqueue_job`.
Expand Down Expand Up @@ -147,8 +153,8 @@ async def enqueue_job(

job = serialize_job(function, args, kwargs, _job_try, enqueue_time_ms, serializer=self.job_serializer)
pipe.multi()
pipe.psetex(job_key, expires_ms, job)
pipe.zadd(_queue_name, {job_id: score})
pipe.psetex(job_key, expires_ms, job) # type: ignore[no-untyped-call]
pipe.zadd(_queue_name, {job_id: score}) # type: ignore[unused-coroutine]
try:
await pipe.execute()
except WatchError:
Expand All @@ -174,7 +180,9 @@ async def all_job_results(self) -> List[JobResult]:
return sorted(results, key=attrgetter('enqueue_time'))

async def _get_job_def(self, job_id: bytes, score: int) -> JobDef:
v = await self.get(job_key_prefix + job_id.decode())
key = job_key_prefix + job_id.decode()
v = await self.get(key)
assert v is not None, f'job "{key}" not found'
jd = deserialize_job(v, deserializer=self.job_deserializer)
jd.score = score
return jd
Expand Down Expand Up @@ -209,7 +217,7 @@ async def create_pool(
if settings.sentinel:

def pool_factory(*args: Any, **kwargs: Any) -> ArqRedis:
client = Sentinel(
client = Sentinel( # type: ignore[misc]
*args,
sentinels=settings.host,
ssl=settings.ssl,
Expand Down Expand Up @@ -262,12 +270,12 @@ def pool_factory(*args: Any, **kwargs: Any) -> ArqRedis:
return pool


async def log_redis_info(redis: Redis, log_func: Callable[[str], Any]) -> None:
async def log_redis_info(redis: 'Redis[bytes]', log_func: Callable[[str], Any]) -> None:
async with redis.pipeline(transaction=True) as pipe:
pipe.info(section='Server')
pipe.info(section='Memory')
pipe.info(section='Clients')
pipe.dbsize()
pipe.info(section='Server') # type: ignore[unused-coroutine]
pipe.info(section='Memory') # type: ignore[unused-coroutine]
pipe.info(section='Clients') # type: ignore[unused-coroutine]
pipe.dbsize() # type: ignore[unused-coroutine]
info_server, info_memory, info_clients, key_count = await pipe.execute()

redis_version = info_server.get('redis_version', '?')
Expand Down
5 changes: 3 additions & 2 deletions arq/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class Job:
def __init__(
self,
job_id: str,
redis: Redis,
redis: 'Redis[bytes]',
_queue_name: str = default_queue_name,
_deserializer: Optional[Deserializer] = None,
):
Expand Down Expand Up @@ -118,7 +118,8 @@ async def info(self) -> Optional[JobDef]:
if v:
info = deserialize_job(v, deserializer=self._deserializer)
if info:
info.score = await self._redis.zscore(self._queue_name, self.job_id)
s = await self._redis.zscore(self._queue_name, self.job_id)
info.score = None if s is None else int(s)
return info

async def result_info(self) -> Optional[JobResult]:
Expand Down
44 changes: 25 additions & 19 deletions arq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,10 @@ async def _cancel_aborted_jobs(self) -> None:
Go through job_ids in the abort_jobs_ss sorted set and cancel those tasks.
"""
async with self.pool.pipeline(transaction=True) as pipe:
pipe.zrange(abort_jobs_ss, start=0, end=-1)
pipe.zremrangebyscore(abort_jobs_ss, min=timestamp_ms() + abort_job_max_age, max=float('inf'))
pipe.zrange(abort_jobs_ss, start=0, end=-1) # type: ignore[unused-coroutine]
pipe.zremrangebyscore( # type: ignore[unused-coroutine]
abort_jobs_ss, min=timestamp_ms() + abort_job_max_age, max=float('inf')
)
abort_job_ids, _ = await pipe.execute()

aborted: Set[str] = set()
Expand Down Expand Up @@ -396,26 +398,28 @@ async def start_jobs(self, job_ids: List[bytes]) -> None:
continue

pipe.multi()
pipe.psetex(in_progress_key, int(self.in_progress_timeout_s * 1000), b'1')
pipe.psetex( # type: ignore[no-untyped-call]
in_progress_key, int(self.in_progress_timeout_s * 1000), b'1'
)
try:
await pipe.execute()
except (ResponseError, WatchError):
# job already started elsewhere since we got 'existing'
self.sem.release()
logger.debug('multi-exec error, job %s already started elsewhere', job_id)
else:
t = self.loop.create_task(self.run_job(job_id, score))
t = self.loop.create_task(self.run_job(job_id, int(score)))
t.add_done_callback(lambda _: self.sem.release())
self.tasks[job_id] = t

async def run_job(self, job_id: str, score: int) -> None: # noqa: C901
start_ms = timestamp_ms()
async with self.pool.pipeline(transaction=True) as pipe:
pipe.get(job_key_prefix + job_id)
pipe.incr(retry_key_prefix + job_id)
pipe.expire(retry_key_prefix + job_id, 88400)
pipe.get(job_key_prefix + job_id) # type: ignore[unused-coroutine]
pipe.incr(retry_key_prefix + job_id) # type: ignore[unused-coroutine]
pipe.expire(retry_key_prefix + job_id, 88400) # type: ignore[unused-coroutine]
if self.allow_abort_jobs:
pipe.zrem(abort_jobs_ss, job_id)
pipe.zrem(abort_jobs_ss, job_id) # type: ignore[unused-coroutine]
v, job_try, _, abort_job = await pipe.execute()
else:
v, job_try, _ = await pipe.execute()
Expand Down Expand Up @@ -622,35 +626,35 @@ async def finish_job(
if keep_in_progress is None:
delete_keys += [in_progress_key]
else:
tr.pexpire(in_progress_key, to_ms(keep_in_progress))
tr.pexpire(in_progress_key, to_ms(keep_in_progress)) # type: ignore[unused-coroutine]

if finish:
if result_data:
expire = None if keep_result_forever else result_timeout_s
tr.set(result_key_prefix + job_id, result_data, px=to_ms(expire))
tr.set(result_key_prefix + job_id, result_data, px=to_ms(expire)) # type: ignore[unused-coroutine]
delete_keys += [retry_key_prefix + job_id, job_key_prefix + job_id]
tr.zrem(abort_jobs_ss, job_id)
tr.zrem(self.queue_name, job_id)
tr.zrem(abort_jobs_ss, job_id) # type: ignore[unused-coroutine]
tr.zrem(self.queue_name, job_id) # type: ignore[unused-coroutine]
elif incr_score:
tr.zincrby(self.queue_name, incr_score, job_id)
tr.zincrby(self.queue_name, incr_score, job_id) # type: ignore[unused-coroutine]
if delete_keys:
tr.delete(*delete_keys)
tr.delete(*delete_keys) # type: ignore[unused-coroutine]
await tr.execute()

async def finish_failed_job(self, job_id: str, result_data: Optional[bytes]) -> None:
async with self.pool.pipeline(transaction=True) as tr:
tr.delete(
tr.delete( # type: ignore[unused-coroutine]
retry_key_prefix + job_id,
in_progress_key_prefix + job_id,
job_key_prefix + job_id,
)
tr.zrem(abort_jobs_ss, job_id)
tr.zrem(self.queue_name, job_id)
tr.zrem(abort_jobs_ss, job_id) # type: ignore[unused-coroutine]
tr.zrem(self.queue_name, job_id) # type: ignore[unused-coroutine]
# result_data would only be None if serializing the result fails
keep_result = self.keep_result_forever or self.keep_result_s > 0
if result_data is not None and keep_result: # pragma: no branch
expire = 0 if self.keep_result_forever else self.keep_result_s
tr.set(result_key_prefix + job_id, result_data, px=to_ms(expire))
tr.set(result_key_prefix + job_id, result_data, px=to_ms(expire)) # type: ignore[unused-coroutine]
await tr.execute()

async def heart_beat(self) -> None:
Expand Down Expand Up @@ -703,7 +707,9 @@ async def record_health(self) -> None:
f'{datetime.now():%b-%d %H:%M:%S} j_complete={self.jobs_complete} j_failed={self.jobs_failed} '
f'j_retried={self.jobs_retried} j_ongoing={pending_tasks} queued={queued}'
)
await self.pool.psetex(self.health_check_key, int((self.health_check_interval + 1) * 1000), info.encode())
await self.pool.psetex( # type: ignore[no-untyped-call]
self.health_check_key, int((self.health_check_interval + 1) * 1000), info.encode()
)
log_suffix = info[info.index('j_complete=') :]
if self._last_health_check_log and log_suffix != self._last_health_check_log:
logger.info('recording health: %s', info)
Expand Down
3 changes: 0 additions & 3 deletions requirements.txt

This file was deleted.

4 changes: 4 additions & 0 deletions requirements/all.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-r ./docs.txt
-r ./linting.txt
-r ./testing.txt
-r ./setup.txt
File renamed without changes.
4 changes: 2 additions & 2 deletions docs/requirements.txt → requirements/docs.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# This file is autogenerated by pip-compile with python 3.9
# To update, run:
#
# pip-compile --output-file=docs/requirements.txt docs/requirements.in
# pip-compile --output-file=requirements/docs.txt requirements/docs.in
#
alabaster==0.7.12
# via sphinx
Expand Down Expand Up @@ -37,7 +37,7 @@ requests==2.28.1
snowballstemmer==2.2.0
# via sphinx
sphinx==5.1.1
# via -r docs/requirements.in
# via -r requirements/docs.in
sphinxcontrib-applehelp==1.0.2
# via sphinx
sphinxcontrib-devhelp==1.0.2
Expand Down
9 changes: 9 additions & 0 deletions requirements/linting.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
black==22.6.0
flake8==5.0.4
flake8-quotes==3.3.1
isort[colors]==5.10.1
mypy==0.971
pycodestyle==2.9.1
pyflakes==2.5.0
types-pytz==2022.2.1.0
types_redis==4.2.8
34 changes: 17 additions & 17 deletions tests/requirements-linting.txt → requirements/linting.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,26 @@
# This file is autogenerated by pip-compile with python 3.9
# To update, run:
#
# pip-compile --output-file=requirements-linting.txt requirements-linting.in
# pip-compile --output-file=requirements/linting.txt requirements/linting.in
#
black==22.6.0
# via -r requirements-linting.in
# via -r requirements/linting.in
click==8.1.3
# via black
colorama==0.4.5
# via isort
flake8==4.0.1
flake8==5.0.4
# via
# -r requirements-linting.in
# -r requirements/linting.in
# flake8-quotes
flake8-quotes==3.3.1
# via -r requirements-linting.in
# via -r requirements/linting.in
isort[colors]==5.10.1
# via -r requirements-linting.in
mccabe==0.6.1
# via -r requirements/linting.in
mccabe==0.7.0
# via flake8
mypy==0.931
# via -r requirements-linting.in
mypy==0.971
# via -r requirements/linting.in
mypy-extensions==0.4.3
# via
# black
Expand All @@ -30,22 +30,22 @@ pathspec==0.9.0
# via black
platformdirs==2.5.2
# via black
pycodestyle==2.8.0
pycodestyle==2.9.1
# via
# -r requirements-linting.in
# -r requirements/linting.in
# flake8
pyflakes==2.4.0
pyflakes==2.5.0
# via
# -r requirements-linting.in
# -r requirements/linting.in
# flake8
tomli==2.0.1
# via
# black
# mypy
types-pytz==2021.3.5
# via -r requirements-linting.in
types-redis==4.1.17
# via -r requirements-linting.in
types-pytz==2022.2.1.0
# via -r requirements/linting.in
types-redis==4.2.8
# via -r requirements/linting.in
typing-extensions==4.3.0
# via
# black
Expand Down
28 changes: 28 additions & 0 deletions requirements/setup.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#
# This file is autogenerated by pip-compile with python 3.9
# To update, run:
#
# pip-compile --output-file=requirements/setup.txt setup.py
#
async-timeout==4.0.2
# via redis
click==8.1.3
# via arq (setup.py)
deprecated==1.2.13
# via redis
hiredis==2.0.0
# via redis
packaging==21.3
# via redis
pydantic==1.9.2
# via arq (setup.py)
pyparsing==3.0.9
# via packaging
redis[hiredis]==4.2.2
# via arq (setup.py)
typing-extensions==4.3.0
# via
# arq (setup.py)
# pydantic
wrapt==1.14.1
# via deprecated
9 changes: 9 additions & 0 deletions requirements/testing.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
coverage[toml]==6.4.4
dirty-equals==0.4
msgpack==1.0.4
pytest==7.1.2
pytest-asyncio==0.19.0
pytest-mock==3.8.2
pytest-sugar==0.9.5
pytest-timeout==2.1.0
pytz==2022.2.1
Loading

0 comments on commit a5e6742

Please sign in to comment.