From e0cd916988ebed6d01c26a4d3e9128aa2bf22a7d Mon Sep 17 00:00:00 2001 From: Ross Nordstrom Date: Sun, 19 Mar 2023 06:02:21 -0600 Subject: [PATCH 1/8] Docs: Add details about reusing a unique job id (#391) See: * https://github.com/samuelcolvin/arq/pull/138#issuecomment-523589365 * https://github.com/samuelcolvin/arq/issues/221 --- docs/index.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/index.rst b/docs/index.rst index eb6ad372..58af8221 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -136,7 +136,7 @@ Sometimes you want a job to only be run once at a time (eg. a backup) or once fo invoices for a particular company). *arq* supports this via custom job ids, see :func:`arq.connections.ArqRedis.enqueue_job`. It guarantees -that a job with a particular ID cannot be enqueued again until its execution has finished. +that a job with a particular ID cannot be enqueued again until its execution has finished and its result has cleared. To control when a finished job's result clears, you can use the `keep_result` setting on your worker, see :func:`arq.worker.func`. .. literalinclude:: examples/job_ids.py From 9109c2e59d2b13fa59d246da03d19d7844a6fa19 Mon Sep 17 00:00:00 2001 From: stradivari96 Date: Sat, 20 May 2023 17:36:34 +0200 Subject: [PATCH 2/8] Delete setup.py (#398) --- setup.py | 28 ---------------------------- 1 file changed, 28 deletions(-) delete mode 100644 setup.py diff --git a/setup.py b/setup.py deleted file mode 100644 index 94cd7b8d..00000000 --- a/setup.py +++ /dev/null @@ -1,28 +0,0 @@ -import sys - -sys.stderr.write( - """ -=============================== -Unsupported installation method -=============================== -arq no longer supports installation with `python setup.py install`. -Please use `python -m pip install .` instead. -""" -) -sys.exit(1) - - -# The below code will never execute, however GitHub is particularly -# picky about where it finds Python packaging metadata. -# See: https://github.com/github/feedback/discussions/6456 -# -# To be removed once GitHub catches up. - -setup( - name='arq', - install_requires=[ - 'redis[hiredis]>=4.2.0', - 'click>=8.0', - 'typing-extensions>=4.1.0', - ], -) From ab2dda2011ab27007650c4918d3704f3bf7ac13d Mon Sep 17 00:00:00 2001 From: Rishabh Mittal Date: Mon, 30 Oct 2023 23:39:26 +0530 Subject: [PATCH 3/8] =?UTF-8?q?=F0=9F=94=A8=20Adding=20a=20job=20counter?= =?UTF-8?q?=20to=20address=20Semaphore=20issues=20(#408)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * ๐Ÿ”จ Adding a job counter to address Semaphore issues * ๐Ÿงช Test function for semaphore blocker --- arq/worker.py | 25 +++++++++++++++++++++---- tests/test_worker.py | 30 ++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 4 deletions(-) diff --git a/arq/worker.py b/arq/worker.py index 81afd5b7..398409b5 100644 --- a/arq/worker.py +++ b/arq/worker.py @@ -236,7 +236,11 @@ def __init__( self.on_job_start = on_job_start self.on_job_end = on_job_end self.after_job_end = after_job_end - self.sem = asyncio.BoundedSemaphore(max_jobs) + + self.max_jobs = max_jobs + self.sem = asyncio.BoundedSemaphore(max_jobs + 1) + self.job_counter: int = 0 + self.job_timeout_s = to_seconds(job_timeout) self.keep_result_s = to_seconds(keep_result) self.keep_result_forever = keep_result_forever @@ -374,13 +378,13 @@ async def _poll_iteration(self) -> None: return count = min(burst_jobs_remaining, count) if self.allow_pick_jobs: - async with self.sem: # don't bother with zrangebyscore until we have "space" to run the jobs + if self.job_counter < self.max_jobs: now = timestamp_ms() job_ids = await self.pool.zrangebyscore( self.queue_name, min=float('-inf'), start=self._queue_read_offset, num=count, max=now ) - await self.start_jobs(job_ids) + await self.start_jobs(job_ids) if self.allow_abort_jobs: await self._cancel_aborted_jobs() @@ -419,12 +423,23 @@ async def _cancel_aborted_jobs(self) -> None: self.aborting_tasks.update(aborted) await self.pool.zrem(abort_jobs_ss, *aborted) + def _release_sem_dec_counter_on_complete(self) -> None: + self.job_counter = self.job_counter - 1 + self.sem.release() + async def start_jobs(self, job_ids: List[bytes]) -> None: """ For each job id, get the job definition, check it's not running and start it in a task """ for job_id_b in job_ids: await self.sem.acquire() + + if self.job_counter >= self.max_jobs: + self.sem.release() + return None + + self.job_counter = self.job_counter + 1 + job_id = job_id_b.decode() in_progress_key = in_progress_key_prefix + job_id async with self.pool.pipeline(transaction=True) as pipe: @@ -433,6 +448,7 @@ async def start_jobs(self, job_ids: List[bytes]) -> None: score = await pipe.zscore(self.queue_name, job_id) if ongoing_exists or not score: # job already started elsewhere, or already finished and removed from queue + self.job_counter = self.job_counter - 1 self.sem.release() logger.debug('job %s already running elsewhere', job_id) continue @@ -445,11 +461,12 @@ async def start_jobs(self, job_ids: List[bytes]) -> None: await pipe.execute() except (ResponseError, WatchError): # job already started elsewhere since we got 'existing' + self.job_counter = self.job_counter - 1 self.sem.release() logger.debug('multi-exec error, job %s already started elsewhere', job_id) else: t = self.loop.create_task(self.run_job(job_id, int(score))) - t.add_done_callback(lambda _: self.sem.release()) + t.add_done_callback(lambda _: self._release_sem_dec_counter_on_complete()) self.tasks[job_id] = t async def run_job(self, job_id: str, score: int) -> None: # noqa: C901 diff --git a/tests/test_worker.py b/tests/test_worker.py index aa56085b..23dd91d2 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -984,6 +984,36 @@ async def test(ctx): assert result['called'] == 4 +async def test_job_cancel_on_max_jobs(arq_redis: ArqRedis, worker, caplog): + async def longfunc(ctx): + await asyncio.sleep(3600) + + async def wait_and_abort(job, delay=0.1): + await asyncio.sleep(delay) + assert await job.abort() is True + + caplog.set_level(logging.INFO) + await arq_redis.zadd(abort_jobs_ss, {b'foobar': int(1e9)}) + job = await arq_redis.enqueue_job('longfunc', _job_id='testing') + + worker: Worker = worker( + functions=[func(longfunc, name='longfunc')], allow_abort_jobs=True, poll_delay=0.1, max_jobs=1 + ) + assert worker.jobs_complete == 0 + assert worker.jobs_failed == 0 + assert worker.jobs_retried == 0 + await asyncio.gather(wait_and_abort(job), worker.main()) + await worker.main() + assert worker.jobs_complete == 0 + assert worker.jobs_failed == 1 + assert worker.jobs_retried == 0 + log = re.sub(r'\d+.\d\ds', 'X.XXs', '\n'.join(r.message for r in caplog.records)) + assert 'X.XXs โ†’ testing:longfunc()\n X.XXs โŠ˜ testing:longfunc aborted' in log + assert worker.aborting_tasks == set() + assert worker.tasks == {} + assert worker.job_tasks == {} + + async def test_worker_timezone_defaults_to_system_timezone(worker): worker = worker(functions=[func(foobar)]) assert worker.timezone is not None From ec1532b29d490e66b5c22424b51781865ea4cf44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Kr=C3=BCger=20Svensson?= Date: Fri, 29 Mar 2024 15:36:05 +0100 Subject: [PATCH 4/8] docs: add documentation on how to retrieve running jobs (#377) --- docs/examples/job_ids.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/docs/examples/job_ids.py b/docs/examples/job_ids.py index 9de9b8cc..1148591c 100644 --- a/docs/examples/job_ids.py +++ b/docs/examples/job_ids.py @@ -2,6 +2,8 @@ from arq import create_pool from arq.connections import RedisSettings +from arq.jobs import Job + async def the_task(ctx): print('running the task with id', ctx['job_id']) @@ -37,6 +39,14 @@ async def main(): > None """ + # you can retrieve jobs by using arq.jobs.Job + await redis.enqueue_job('the_task', _job_id='my_job') + job5 = Job(job_id='my_job', redis=redis) + print(job5) + """ + + """ + class WorkerSettings: functions = [the_task] From e27ad93c1bd93624acfa9ef9ee67239cf4301dc7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Kr=C3=BCger=20Svensson?= Date: Mon, 1 Apr 2024 12:23:14 +0200 Subject: [PATCH 5/8] feat: add job_id to JobDef, closing #376 (#378) --- arq/connections.py | 1 + arq/jobs.py | 6 +++++- arq/worker.py | 3 +++ tests/test_jobs.py | 24 +++++++++++++++++++----- tests/test_main.py | 9 ++++++--- 5 files changed, 34 insertions(+), 9 deletions(-) diff --git a/arq/connections.py b/arq/connections.py index d4fc4434..69ac8ce2 100644 --- a/arq/connections.py +++ b/arq/connections.py @@ -193,6 +193,7 @@ async def _get_job_def(self, job_id: bytes, score: int) -> JobDef: assert v is not None, f'job "{key}" not found' jd = deserialize_job(v, deserializer=self.job_deserializer) jd.score = score + jd.job_id = job_id.decode() return jd async def queued_jobs(self, *, queue_name: Optional[str] = None) -> List[JobDef]: diff --git a/arq/jobs.py b/arq/jobs.py index 8028cbe7..d0c0a5ef 100644 --- a/arq/jobs.py +++ b/arq/jobs.py @@ -47,6 +47,7 @@ class JobDef: job_try: int enqueue_time: datetime score: Optional[int] + job_id: Optional[str] def __post_init__(self) -> None: if isinstance(self.score, float): @@ -60,7 +61,6 @@ class JobResult(JobDef): start_time: datetime finish_time: datetime queue_name: str - job_id: Optional[str] = None class Job: @@ -238,6 +238,7 @@ def serialize_result( finished_ms: int, ref: str, queue_name: str, + job_id: str, *, serializer: Optional[Serializer] = None, ) -> Optional[bytes]: @@ -252,6 +253,7 @@ def serialize_result( 'st': start_ms, 'ft': finished_ms, 'q': queue_name, + 'id': job_id, } if serializer is None: serializer = pickle.dumps @@ -281,6 +283,7 @@ def deserialize_job(r: bytes, *, deserializer: Optional[Deserializer] = None) -> job_try=d['t'], enqueue_time=ms_to_datetime(d['et']), score=None, + job_id=None, ) except Exception as e: raise DeserializationError('unable to deserialize job') from e @@ -315,6 +318,7 @@ def deserialize_result(r: bytes, *, deserializer: Optional[Deserializer] = None) start_time=ms_to_datetime(d['st']), finish_time=ms_to_datetime(d['ft']), queue_name=d.get('q', ''), + job_id=d.get('id', ''), ) except Exception as e: raise DeserializationError('unable to deserialize job result') from e diff --git a/arq/worker.py b/arq/worker.py index 398409b5..7ff5393a 100644 --- a/arq/worker.py +++ b/arq/worker.py @@ -501,6 +501,7 @@ async def job_failed(exc: BaseException) -> None: ref=f'{job_id}:{function_name}', serializer=self.job_serializer, queue_name=self.queue_name, + job_id=job_id, ) await asyncio.shield(self.finish_failed_job(job_id, result_data_)) @@ -556,6 +557,7 @@ async def job_failed(exc: BaseException) -> None: timestamp_ms(), ref, self.queue_name, + job_id=job_id, serializer=self.job_serializer, ) return await asyncio.shield(self.finish_failed_job(job_id, result_data)) @@ -649,6 +651,7 @@ async def job_failed(exc: BaseException) -> None: finished_ms, ref, self.queue_name, + job_id=job_id, serializer=self.job_serializer, ) diff --git a/tests/test_jobs.py b/tests/test_jobs.py index 634a8b03..f8f6c8c4 100644 --- a/tests/test_jobs.py +++ b/tests/test_jobs.py @@ -2,7 +2,7 @@ import pickle import pytest -from dirty_equals import IsNow +from dirty_equals import IsNow, IsStr from arq import Worker, func from arq.connections import ArqRedis, RedisSettings, create_pool @@ -89,6 +89,7 @@ async def foobar(ctx, *args, **kwargs): finish_time=IsNow(tz='utc'), score=None, queue_name=expected_queue_name, + job_id=IsStr(), ) results = await arq_redis.all_job_results() assert results == [ @@ -139,9 +140,9 @@ class Foobar: def __getstate__(self): raise TypeError("this doesn't pickle") - r1 = serialize_result('foobar', (1,), {}, 1, 123, True, Foobar(), 123, 123, 'testing', 'test-queue') + r1 = serialize_result('foobar', (1,), {}, 1, 123, True, Foobar(), 123, 123, 'testing', 'test-queue', 'job_1') assert isinstance(r1, bytes) - r2 = serialize_result('foobar', (Foobar(),), {}, 1, 123, True, Foobar(), 123, 123, 'testing', 'test-queue') + r2 = serialize_result('foobar', (Foobar(),), {}, 1, 123, True, Foobar(), 123, 123, 'testing', 'test-queue', 'job_1') assert r2 is None @@ -154,7 +155,19 @@ def custom_serializer(x): return b'0123456789' r1 = serialize_result( - 'foobar', (1,), {}, 1, 123, True, Foobar(), 123, 123, 'testing', 'test-queue', serializer=custom_serializer + 'foobar', + (1,), + {}, + 1, + 123, + True, + Foobar(), + 123, + 123, + 'testing', + 'test-queue', + 'job_1', + serializer=custom_serializer, ) assert r1 == b'0123456789' r2 = serialize_result( @@ -169,6 +182,7 @@ def custom_serializer(x): 123, 'testing', 'test-queue', + 'job_1', serializer=custom_serializer, ) assert r2 == b'0123456789' @@ -213,7 +227,7 @@ async def test_get_job_result(arq_redis: ArqRedis): async def test_result_pole_delay_dep(arq_redis: ArqRedis): j = Job('foobar', arq_redis) - r = serialize_result('foobar', (1,), {}, 1, 123, True, 42, 123, 123, 'testing', 'test-queue') + r = serialize_result('foobar', (1,), {}, 1, 123, True, 42, 123, 123, 'testing', 'test-queue', 'job_1') await arq_redis.set(result_key_prefix + j.job_id, r) with pytest.warns( DeprecationWarning, match='"pole_delay" is deprecated, use the correct spelling "poll_delay" instead' diff --git a/tests/test_main.py b/tests/test_main.py index 7c3a9835..198c815b 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -238,11 +238,11 @@ async def foobar(ctx): async def test_get_jobs(arq_redis: ArqRedis): - await arq_redis.enqueue_job('foobar', a=1, b=2, c=3) + await arq_redis.enqueue_job('foobar', a=1, b=2, c=3, _job_id='1') await asyncio.sleep(0.01) - await arq_redis.enqueue_job('second', 4, b=5, c=6) + await arq_redis.enqueue_job('second', 4, b=5, c=6, _job_id='2') await asyncio.sleep(0.01) - await arq_redis.enqueue_job('third', 7, b=8) + await arq_redis.enqueue_job('third', 7, b=8, _job_id='3') jobs = await arq_redis.queued_jobs() assert [dataclasses.asdict(j) for j in jobs] == [ { @@ -252,6 +252,7 @@ async def test_get_jobs(arq_redis: ArqRedis): 'job_try': None, 'enqueue_time': IsNow(tz='utc'), 'score': IsInt(), + 'job_id': '1', }, { 'function': 'second', @@ -260,6 +261,7 @@ async def test_get_jobs(arq_redis: ArqRedis): 'job_try': None, 'enqueue_time': IsNow(tz='utc'), 'score': IsInt(), + 'job_id': '2', }, { 'function': 'third', @@ -268,6 +270,7 @@ async def test_get_jobs(arq_redis: ArqRedis): 'job_try': None, 'enqueue_time': IsNow(tz='utc'), 'score': IsInt(), + 'job_id': '3', }, ] assert jobs[0].score < jobs[1].score < jobs[2].score From b59e71674634ceb53e2f6a55fcd6d3c6e65b8598 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Kr=C3=BCger=20Svensson?= Date: Mon, 1 Apr 2024 12:30:38 +0200 Subject: [PATCH 6/8] chore: update dependencies, fixing tests (#382) * chore: update dependencies, fixing tests * Commit message --------- Co-authored-by: Marcelo Trylesinski --- arq/cli.py | 2 +- requirements/docs.txt | 4 ++-- requirements/linting.txt | 10 ++-------- requirements/pyproject.txt | 14 +++----------- requirements/testing.in | 2 +- requirements/testing.txt | 18 +++++------------- 6 files changed, 14 insertions(+), 36 deletions(-) diff --git a/arq/cli.py b/arq/cli.py index e4d2ef96..3d3aa300 100644 --- a/arq/cli.py +++ b/arq/cli.py @@ -60,7 +60,7 @@ async def watch_reload(path: str, worker_settings: 'WorkerSettingsType') -> None except ImportError as e: # pragma: no cover raise ImportError('watchfiles not installed, use `pip install watchfiles`') from e - loop = asyncio.get_event_loop() + loop = asyncio.get_running_loop() stop_event = asyncio.Event() def worker_on_stop(s: Signals) -> None: diff --git a/requirements/docs.txt b/requirements/docs.txt index 5d1651e5..7473b36b 100644 --- a/requirements/docs.txt +++ b/requirements/docs.txt @@ -1,5 +1,5 @@ # -# This file is autogenerated by pip-compile with python 3.9 +# This file is autogenerated by pip-compile with python 3.11 # To update, run: # # pip-compile --output-file=requirements/docs.txt requirements/docs.in @@ -35,7 +35,7 @@ requests==2.28.1 snowballstemmer==2.2.0 # via sphinx sphinx==5.1.1 - # via -r docs.in + # via -r requirements/docs.in sphinxcontrib-applehelp==1.0.2 # via sphinx sphinxcontrib-devhelp==1.0.2 diff --git a/requirements/linting.txt b/requirements/linting.txt index 57176e06..faf0a6ba 100644 --- a/requirements/linting.txt +++ b/requirements/linting.txt @@ -1,5 +1,5 @@ # -# This file is autogenerated by pip-compile with python 3.9 +# This file is autogenerated by pip-compile with python 3.11 # To update, run: # # pip-compile --output-file=requirements/linting.txt requirements/linting.in @@ -34,15 +34,9 @@ pycodestyle==2.9.1 # via flake8 pyflakes==2.5.0 # via flake8 -tomli==2.0.1 - # via - # black - # mypy types-pytz==2022.2.1.0 # via -r requirements/linting.in types-redis==4.2.8 # via -r requirements/linting.in typing-extensions==4.3.0 - # via - # black - # mypy + # via mypy diff --git a/requirements/pyproject.txt b/requirements/pyproject.txt index 5c605c88..c2c38af6 100644 --- a/requirements/pyproject.txt +++ b/requirements/pyproject.txt @@ -1,5 +1,5 @@ # -# This file is autogenerated by pip-compile with python 3.9 +# This file is autogenerated by pip-compile with python 3.11 # To update, run: # # pip-compile --extra=watch --output-file=requirements/pyproject.txt pyproject.toml @@ -10,17 +10,11 @@ async-timeout==4.0.2 # via redis click==8.1.3 # via arq (pyproject.toml) -deprecated==1.2.13 - # via redis -hiredis==2.0.0 +hiredis==2.1.0 # via redis idna==3.3 # via anyio -packaging==21.3 - # via redis -pyparsing==3.0.9 - # via packaging -redis[hiredis]==4.3.4 +redis[hiredis]==4.4.0 # via arq (pyproject.toml) sniffio==1.2.0 # via anyio @@ -28,5 +22,3 @@ typing-extensions==4.3.0 # via arq (pyproject.toml) watchfiles==0.16.1 # via arq (pyproject.toml) -wrapt==1.14.1 - # via deprecated diff --git a/requirements/testing.in b/requirements/testing.in index 2b39e898..5a32ec5d 100644 --- a/requirements/testing.in +++ b/requirements/testing.in @@ -3,7 +3,7 @@ dirty-equals>=0.4,<1 msgpack>=1,<2 pydantic>=1.9.2,<2 pytest>=7,<8 -pytest-asyncio>=0.19,<0.20 +pytest-asyncio>=0.20.3 pytest-mock>=3,<4 pytest-sugar>=0.9,<1 pytest-timeout>=2,<3 diff --git a/requirements/testing.txt b/requirements/testing.txt index 243d054a..a639ab8a 100644 --- a/requirements/testing.txt +++ b/requirements/testing.txt @@ -1,5 +1,5 @@ # -# This file is autogenerated by pip-compile with python 3.9 +# This file is autogenerated by pip-compile with python 3.11 # To update, run: # # pip-compile --output-file=requirements/testing.txt requirements/testing.in @@ -10,8 +10,6 @@ attrs==22.1.0 # via pytest coverage[toml]==6.4.4 # via -r requirements/testing.in -deprecated==1.2.13 - # via redis dirty-equals==0.4 # via -r requirements/testing.in iniconfig==1.1.1 @@ -22,7 +20,6 @@ packaging==21.3 # via # pytest # pytest-sugar - # redis pluggy==1.0.0 # via pytest psutil==5.9.1 @@ -40,7 +37,7 @@ pytest==7.1.2 # pytest-mock # pytest-sugar # pytest-timeout -pytest-asyncio==0.19.0 +pytest-asyncio==0.20.3 # via -r requirements/testing.in pytest-mock==3.8.2 # via -r requirements/testing.in @@ -52,21 +49,16 @@ pytz==2022.2.1 # via # -r requirements/testing.in # dirty-equals -# manually removed to avoid conflict with redis version from pyproject.toml -# redis==4.2.2 -# # via redislite +redis==4.4.0 + # via redislite redislite==6.2.805324 # via -r requirements/testing.in termcolor==1.1.0 # via pytest-sugar tomli==2.0.1 - # via - # coverage - # pytest + # via pytest typing-extensions==4.3.0 # via pydantic -wrapt==1.14.1 - # via deprecated # The following packages are considered to be unsafe in a requirements file: # setuptools From 5769e1095cfdd20af7b135c36670272559eb3352 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Kr=C3=BCger=20Svensson?= Date: Mon, 1 Apr 2024 12:32:25 +0200 Subject: [PATCH 7/8] refactor: refactor all asserts into raise , close #371 (#379) --- arq/connections.py | 14 ++++++++------ arq/cron.py | 11 +++++++---- arq/worker.py | 9 ++++++--- 3 files changed, 21 insertions(+), 13 deletions(-) diff --git a/arq/connections.py b/arq/connections.py index 69ac8ce2..b3a91b48 100644 --- a/arq/connections.py +++ b/arq/connections.py @@ -50,7 +50,8 @@ class RedisSettings: @classmethod def from_dsn(cls, dsn: str) -> 'RedisSettings': conf = urlparse(dsn) - assert conf.scheme in {'redis', 'rediss', 'unix'}, 'invalid DSN scheme' + if conf.scheme not in {'redis', 'rediss', 'unix'}: + raise RuntimeError('invalid DSN scheme') query_db = parse_qs(conf.query).get('db') if query_db: # e.g. redis://localhost:6379?db=1 @@ -138,7 +139,8 @@ async def enqueue_job( _queue_name = self.default_queue_name job_id = _job_id or uuid4().hex job_key = job_key_prefix + job_id - assert not (_defer_until and _defer_by), "use either 'defer_until' or 'defer_by' or neither, not both" + if _defer_until and _defer_by: + raise RuntimeError("use either 'defer_until' or 'defer_by' or neither, not both") defer_by_ms = to_ms(_defer_by) expires_ms = to_ms(_expires) @@ -190,7 +192,8 @@ async def all_job_results(self) -> List[JobResult]: async def _get_job_def(self, job_id: bytes, score: int) -> JobDef: key = job_key_prefix + job_id.decode() v = await self.get(key) - assert v is not None, f'job "{key}" not found' + if v is None: + raise RuntimeError(f'job "{key}" not found') jd = deserialize_job(v, deserializer=self.job_deserializer) jd.score = score jd.job_id = job_id.decode() @@ -222,9 +225,8 @@ async def create_pool( """ settings: RedisSettings = RedisSettings() if settings_ is None else settings_ - assert not ( - type(settings.host) is str and settings.sentinel - ), "str provided for 'host' but 'sentinel' is true; list of sentinels expected" + if isinstance(settings.host, str) and settings.sentinel: + raise RuntimeError("str provided for 'host' but 'sentinel' is true; list of sentinels expected") if settings.sentinel: diff --git a/arq/cron.py b/arq/cron.py index 2eca6c75..f62ea0bd 100644 --- a/arq/cron.py +++ b/arq/cron.py @@ -58,9 +58,10 @@ def _get_next_dt(dt_: datetime, options: Options) -> Optional[datetime]: # noqa next_v = getattr(dt_, field) if isinstance(v, int): mismatch = next_v != v - else: - assert isinstance(v, (set, list, tuple)), v + elif isinstance(v, (set, list, tuple)): mismatch = next_v not in v + else: + raise RuntimeError(v) # print(field, v, next_v, mismatch) if mismatch: micro = max(dt_.microsecond - options.microsecond, 0) @@ -82,7 +83,8 @@ def _get_next_dt(dt_: datetime, options: Options) -> Optional[datetime]: # noqa elif field == 'second': return dt_ + timedelta(seconds=1) - timedelta(microseconds=micro) else: - assert field == 'microsecond', field + if field != 'microsecond': + raise RuntimeError(field) return dt_ + timedelta(microseconds=options.microsecond - dt_.microsecond) return None @@ -173,7 +175,8 @@ def cron( else: coroutine_ = coroutine - assert asyncio.iscoroutinefunction(coroutine_), f'{coroutine_} is not a coroutine function' + if not asyncio.iscoroutinefunction(coroutine_): + raise RuntimeError(f'{coroutine_} is not a coroutine function') timeout = to_seconds(timeout) keep_result = to_seconds(keep_result) diff --git a/arq/worker.py b/arq/worker.py index 7ff5393a..2bdab0f0 100644 --- a/arq/worker.py +++ b/arq/worker.py @@ -85,7 +85,8 @@ def func( else: coroutine_ = coroutine - assert asyncio.iscoroutinefunction(coroutine_), f'{coroutine_} is not a coroutine function' + if not asyncio.iscoroutinefunction(coroutine_): + raise RuntimeError(f'{coroutine_} is not a coroutine function') timeout = to_seconds(timeout) keep_result = to_seconds(keep_result) @@ -226,10 +227,12 @@ def __init__( self.queue_name = queue_name self.cron_jobs: List[CronJob] = [] if cron_jobs is not None: - assert all(isinstance(cj, CronJob) for cj in cron_jobs), 'cron_jobs, must be instances of CronJob' + if not all(isinstance(cj, CronJob) for cj in cron_jobs): + raise RuntimeError('cron_jobs, must be instances of CronJob') self.cron_jobs = list(cron_jobs) self.functions.update({cj.name: cj for cj in self.cron_jobs}) - assert len(self.functions) > 0, 'at least one function or cron_job must be registered' + if len(self.functions) == 0: + raise RuntimeError('at least one function or cron_job must be registered') self.burst = burst self.on_startup = on_startup self.on_shutdown = on_shutdown From 94cd8782b4f0764a17962186a349d32125cb98e3 Mon Sep 17 00:00:00 2001 From: Piotr Janiszewski <13190648+iamlikeme@users.noreply.github.com> Date: Mon, 1 Apr 2024 12:33:12 +0200 Subject: [PATCH 8/8] Fix: timezone info occasionally removed from cron job execution time (#383) * Expose the bug in tests * Do not remove timezone when incrementing month in _get_next_dt --- arq/cron.py | 4 +-- tests/test_cron.py | 88 ++++++++++++++++++++++++++++++++++++---------- 2 files changed, 71 insertions(+), 21 deletions(-) diff --git a/arq/cron.py b/arq/cron.py index f62ea0bd..53f053f7 100644 --- a/arq/cron.py +++ b/arq/cron.py @@ -67,9 +67,9 @@ def _get_next_dt(dt_: datetime, options: Options) -> Optional[datetime]: # noqa micro = max(dt_.microsecond - options.microsecond, 0) if field == 'month': if dt_.month == 12: - return datetime(dt_.year + 1, 1, 1) + return datetime(dt_.year + 1, 1, 1, tzinfo=dt_.tzinfo) else: - return datetime(dt_.year, dt_.month + 1, 1) + return datetime(dt_.year, dt_.month + 1, 1, tzinfo=dt_.tzinfo) elif field in ('day', 'weekday'): return ( dt_ diff --git a/tests/test_cron.py b/tests/test_cron.py index 5300041d..16b5b507 100644 --- a/tests/test_cron.py +++ b/tests/test_cron.py @@ -1,7 +1,7 @@ import asyncio import logging import re -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from random import random import pytest @@ -12,37 +12,87 @@ from arq.constants import in_progress_key_prefix from arq.cron import cron, next_cron +tz = timezone(offset=timedelta(hours=3)) + @pytest.mark.parametrize( 'previous,expected,kwargs', [ - (datetime(2016, 6, 1, 12, 10, 10), datetime(2016, 6, 1, 12, 10, 20, microsecond=123_456), dict(second=20)), - (datetime(2016, 6, 1, 12, 10, 10), datetime(2016, 6, 1, 12, 11, 0, microsecond=123_456), dict(minute=11)), - (datetime(2016, 6, 1, 12, 10, 10), datetime(2016, 6, 1, 12, 10, 20), dict(second=20, microsecond=0)), - (datetime(2016, 6, 1, 12, 10, 10), datetime(2016, 6, 1, 12, 11, 0), dict(minute=11, microsecond=0)), ( - datetime(2016, 6, 1, 12, 10, 11), - datetime(2017, 6, 1, 12, 10, 10, microsecond=123_456), + datetime(2016, 6, 1, 12, 10, 10, tzinfo=tz), + datetime(2016, 6, 1, 12, 10, 20, microsecond=123_456, tzinfo=tz), + dict(second=20), + ), + ( + datetime(2016, 6, 1, 12, 10, 10, tzinfo=tz), + datetime(2016, 6, 1, 12, 11, 0, microsecond=123_456, tzinfo=tz), + dict(minute=11), + ), + ( + datetime(2016, 6, 1, 12, 10, 10, tzinfo=tz), + datetime(2016, 6, 1, 12, 10, 20, tzinfo=tz), + dict(second=20, microsecond=0), + ), + ( + datetime(2016, 6, 1, 12, 10, 10, tzinfo=tz), + datetime(2016, 6, 1, 12, 11, 0, tzinfo=tz), + dict(minute=11, microsecond=0), + ), + ( + datetime(2016, 6, 1, 12, 10, 11, tzinfo=tz), + datetime(2017, 6, 1, 12, 10, 10, microsecond=123_456, tzinfo=tz), dict(month=6, day=1, hour=12, minute=10, second=10), ), ( - datetime(2016, 6, 1, 12, 10, 10, microsecond=1), - datetime(2016, 7, 1, 12, 10, 10), + datetime(2016, 6, 1, 12, 10, 10, microsecond=1, tzinfo=tz), + datetime(2016, 7, 1, 12, 10, 10, tzinfo=tz), dict(day=1, hour=12, minute=10, second=10, microsecond=0), ), - (datetime(2032, 1, 31, 0, 0, 0), datetime(2032, 2, 28, 0, 0, 0, microsecond=123_456), dict(day=28)), - (datetime(2032, 1, 1, 0, 5), datetime(2032, 1, 1, 4, 0, microsecond=123_456), dict(hour=4)), - (datetime(2032, 1, 1, 0, 0), datetime(2032, 1, 1, 4, 2, microsecond=123_456), dict(hour=4, minute={2, 4, 6})), - (datetime(2032, 1, 1, 0, 5), datetime(2032, 1, 1, 4, 2, microsecond=123_456), dict(hour=4, minute={2, 4, 6})), - (datetime(2032, 2, 5, 0, 0, 0), datetime(2032, 3, 31, 0, 0, 0, microsecond=123_456), dict(day=31)), ( - datetime(2001, 1, 1, 0, 0, 0), # Monday - datetime(2001, 1, 7, 0, 0, 0, microsecond=123_456), + datetime(2032, 1, 31, 0, 0, 0, tzinfo=tz), + datetime(2032, 2, 28, 0, 0, 0, microsecond=123_456, tzinfo=tz), + dict(day=28), + ), + ( + datetime(2032, 1, 1, 0, 5, tzinfo=tz), + datetime(2032, 1, 1, 4, 0, microsecond=123_456, tzinfo=tz), + dict(hour=4), + ), + ( + datetime(2032, 1, 1, 0, 0, tzinfo=tz), + datetime(2032, 1, 1, 4, 2, microsecond=123_456, tzinfo=tz), + dict(hour=4, minute={2, 4, 6}), + ), + ( + datetime(2032, 1, 1, 0, 5, tzinfo=tz), + datetime(2032, 1, 1, 4, 2, microsecond=123_456, tzinfo=tz), + dict(hour=4, minute={2, 4, 6}), + ), + ( + datetime(2032, 2, 5, 0, 0, 0, tzinfo=tz), + datetime(2032, 3, 31, 0, 0, 0, microsecond=123_456, tzinfo=tz), + dict(day=31), + ), + ( + datetime(2001, 1, 1, 0, 0, 0, tzinfo=tz), # Monday + datetime(2001, 1, 7, 0, 0, 0, microsecond=123_456, tzinfo=tz), dict(weekday='Sun'), # Sunday ), - (datetime(2001, 1, 1, 0, 0, 0), datetime(2001, 1, 7, 0, 0, 0, microsecond=123_456), dict(weekday=6)), # Sunday - (datetime(2001, 1, 1, 0, 0, 0), datetime(2001, 11, 7, 0, 0, 0, microsecond=123_456), dict(month=11, weekday=2)), - (datetime(2001, 1, 1, 0, 0, 0), datetime(2001, 1, 3, 0, 0, 0, microsecond=123_456), dict(weekday='wed')), + ( + datetime(2001, 1, 1, 0, 0, 0, tzinfo=tz), + datetime(2001, 1, 7, 0, 0, 0, microsecond=123_456, tzinfo=tz), + dict(weekday=6), + ), # Sunday + ( + datetime(2001, 1, 1, 0, 0, 0, tzinfo=tz), + datetime(2001, 11, 7, 0, 0, 0, microsecond=123_456, tzinfo=tz), + dict(month=11, weekday=2), + ), + ( + datetime(2001, 1, 1, 0, 0, 0, tzinfo=tz), + datetime(2001, 1, 3, 0, 0, 0, microsecond=123_456, tzinfo=tz), + dict(weekday='wed'), + ), ], ) def test_next_cron(previous, expected, kwargs):