diff --git a/.gitignore b/.gitignore index 3d3ecc0a..eb0d9b8d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ -/env +/env/ +/env36/ /.idea __pycache__/ *.py[cod] diff --git a/HISTORY.rst b/HISTORY.rst index f77b2b83..305906ac 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -3,9 +3,11 @@ History ------- -v0.16.1 (unreleased) +v0.16.1 (2019-08-02) .................... * prevent duplicate ``job_id`` when job result exists, fix #137 +* add "don't retry mode" via ``worker.retry_jobs = False``, fix #139 +* add ``worker.max_burst_jobs`` v0.16 (2019-07-30) .................. diff --git a/arq/version.py b/arq/version.py index 560a9b71..3cbfc4e6 100644 --- a/arq/version.py +++ b/arq/version.py @@ -2,4 +2,4 @@ __all__ = ['VERSION'] -VERSION = StrictVersion('0.16') +VERSION = StrictVersion('0.16.1') diff --git a/arq/worker.py b/arq/worker.py index eace6918..1c422090 100644 --- a/arq/worker.py +++ b/arq/worker.py @@ -88,8 +88,6 @@ class Retry(RuntimeError): :param defer: duration to wait before rerunning the job """ - __slots__ = ('defer_score',) - def __init__(self, defer: Optional[SecondsTimedelta] = None): self.defer_score = to_ms(defer) @@ -164,6 +162,8 @@ def __init__( health_check_interval: SecondsTimedelta = 3600, health_check_key: Optional[str] = None, ctx: Optional[Dict] = None, + retry_jobs: bool = True, + max_burst_jobs: int = -1, ): self.functions: Dict[str, Union[Function, CronJob]] = {f.name: f for f in map(func, functions)} self.queue_name = queue_name @@ -205,6 +205,9 @@ def __init__( self._add_signal_handler(signal.SIGINT, self.handle_sig) self._add_signal_handler(signal.SIGTERM, self.handle_sig) self.on_stop = None + # whether or not to retry jobs on Retry and CancelledError + self.retry_jobs = retry_jobs + self.max_burst_jobs = max_burst_jobs def run(self) -> None: """ @@ -226,13 +229,17 @@ async def async_run(self) -> None: self.main_task = self.loop.create_task(self.main()) await self.main_task - async def run_check(self) -> int: + async def run_check(self, retry_jobs: Optional[bool] = None, max_burst_jobs: Optional[int] = None) -> int: """ Run :func:`arq.worker.Worker.async_run`, check for failed jobs and raise :class:`arq.worker.FailedJobs` if any jobs have failed. :return: number of completed jobs """ + if retry_jobs is not None: + self.retry_jobs = retry_jobs + if max_burst_jobs is not None: + self.max_burst_jobs = max_burst_jobs await self.async_run() if self.jobs_failed: failed_job_results = [r for r in await self.pool.all_job_results() if not r.success] @@ -265,6 +272,11 @@ async def main(self): await self.heart_beat() if self.burst: + if ( + self.max_burst_jobs >= 0 + and self.jobs_complete + self.jobs_retried + self.jobs_failed >= self.max_burst_jobs + ): + return queued_jobs = await self.pool.zcard(self.queue_name) if queued_jobs == 0: return @@ -405,13 +417,13 @@ async def run_job(self, job_id, score): # noqa: C901 except Exception as e: finished_ms = timestamp_ms() t = (finished_ms - start_ms) / 1000 - if isinstance(e, Retry): + if self.retry_jobs and isinstance(e, Retry): incr_score = e.defer_score logger.info('%6.2fs ↻ %s retrying job in %0.2fs', t, ref, (e.defer_score or 0) / 1000) if e.defer_score: incr_score = e.defer_score + (timestamp_ms() - score) self.jobs_retried += 1 - elif isinstance(e, asyncio.CancelledError): + elif self.retry_jobs and isinstance(e, asyncio.CancelledError): logger.info('%6.2fs ↻ %s cancelled, will be run again', t, ref) self.jobs_retried += 1 else: diff --git a/tests/test_worker.py b/tests/test_worker.py index c3054913..9fc8f271 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -110,6 +110,38 @@ async def retry(ctx): assert '0.XXs ← testing:retry ●' in log +async def test_job_retry_dont_retry(arq_redis: ArqRedis, worker, caplog): + async def retry(ctx): + raise Retry(defer=0.01) + + caplog.set_level(logging.INFO) + await arq_redis.enqueue_job('retry', _job_id='testing') + worker: Worker = worker(functions=[func(retry, name='retry')]) + with pytest.raises(FailedJobs) as exc_info: + await worker.run_check(retry_jobs=False) + assert str(exc_info.value) == '1 job failed ' + + assert '↻' not in caplog.text + assert '! testing:retry failed, Retry: \n' in caplog.text + + +async def test_job_retry_max_jobs(arq_redis: ArqRedis, worker, caplog): + async def retry(ctx): + raise Retry(defer=0.01) + + caplog.set_level(logging.INFO) + await arq_redis.enqueue_job('retry', _job_id='testing') + worker: Worker = worker(functions=[func(retry, name='retry')]) + assert await worker.run_check(max_burst_jobs=1) == 0 + assert worker.jobs_complete == 0 + assert worker.jobs_retried == 1 + assert worker.jobs_failed == 0 + + log = re.sub(r'(\d+).\d\ds', r'\1.XXs', caplog.text) + assert '0.XXs ↻ testing:retry retrying job in 0.XXs\n' in log + assert '0.XXs → testing:retry() try=2\n' not in log + + async def test_job_job_not_found(arq_redis: ArqRedis, worker, caplog): caplog.set_level(logging.INFO) await arq_redis.enqueue_job('missing', _job_id='testing')