Skip to content

Commit

Permalink
non retry mode (python-arq#140)
Browse files Browse the repository at this point in the history
* non retry mode

* update history, fix python-arq#139

* adding max_burst_jobs

* history
  • Loading branch information
samuelcolvin authored Aug 2, 2019
1 parent 06c05d5 commit b2d397a
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 8 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/env
/env/
/env36/
/.idea
__pycache__/
*.py[cod]
Expand Down
4 changes: 3 additions & 1 deletion HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
History
-------

v0.16.1 (unreleased)
v0.16.1 (2019-08-02)
....................
* prevent duplicate ``job_id`` when job result exists, fix #137
* add "don't retry mode" via ``worker.retry_jobs = False``, fix #139
* add ``worker.max_burst_jobs``

v0.16 (2019-07-30)
..................
Expand Down
2 changes: 1 addition & 1 deletion arq/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

__all__ = ['VERSION']

VERSION = StrictVersion('0.16')
VERSION = StrictVersion('0.16.1')
22 changes: 17 additions & 5 deletions arq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,6 @@ class Retry(RuntimeError):
:param defer: duration to wait before rerunning the job
"""

__slots__ = ('defer_score',)

def __init__(self, defer: Optional[SecondsTimedelta] = None):
self.defer_score = to_ms(defer)

Expand Down Expand Up @@ -164,6 +162,8 @@ def __init__(
health_check_interval: SecondsTimedelta = 3600,
health_check_key: Optional[str] = None,
ctx: Optional[Dict] = None,
retry_jobs: bool = True,
max_burst_jobs: int = -1,
):
self.functions: Dict[str, Union[Function, CronJob]] = {f.name: f for f in map(func, functions)}
self.queue_name = queue_name
Expand Down Expand Up @@ -205,6 +205,9 @@ def __init__(
self._add_signal_handler(signal.SIGINT, self.handle_sig)
self._add_signal_handler(signal.SIGTERM, self.handle_sig)
self.on_stop = None
# whether or not to retry jobs on Retry and CancelledError
self.retry_jobs = retry_jobs
self.max_burst_jobs = max_burst_jobs

def run(self) -> None:
"""
Expand All @@ -226,13 +229,17 @@ async def async_run(self) -> None:
self.main_task = self.loop.create_task(self.main())
await self.main_task

async def run_check(self) -> int:
async def run_check(self, retry_jobs: Optional[bool] = None, max_burst_jobs: Optional[int] = None) -> int:
"""
Run :func:`arq.worker.Worker.async_run`, check for failed jobs and raise :class:`arq.worker.FailedJobs`
if any jobs have failed.
:return: number of completed jobs
"""
if retry_jobs is not None:
self.retry_jobs = retry_jobs
if max_burst_jobs is not None:
self.max_burst_jobs = max_burst_jobs
await self.async_run()
if self.jobs_failed:
failed_job_results = [r for r in await self.pool.all_job_results() if not r.success]
Expand Down Expand Up @@ -265,6 +272,11 @@ async def main(self):
await self.heart_beat()

if self.burst:
if (
self.max_burst_jobs >= 0
and self.jobs_complete + self.jobs_retried + self.jobs_failed >= self.max_burst_jobs
):
return
queued_jobs = await self.pool.zcard(self.queue_name)
if queued_jobs == 0:
return
Expand Down Expand Up @@ -405,13 +417,13 @@ async def run_job(self, job_id, score): # noqa: C901
except Exception as e:
finished_ms = timestamp_ms()
t = (finished_ms - start_ms) / 1000
if isinstance(e, Retry):
if self.retry_jobs and isinstance(e, Retry):
incr_score = e.defer_score
logger.info('%6.2fs ↻ %s retrying job in %0.2fs', t, ref, (e.defer_score or 0) / 1000)
if e.defer_score:
incr_score = e.defer_score + (timestamp_ms() - score)
self.jobs_retried += 1
elif isinstance(e, asyncio.CancelledError):
elif self.retry_jobs and isinstance(e, asyncio.CancelledError):
logger.info('%6.2fs ↻ %s cancelled, will be run again', t, ref)
self.jobs_retried += 1
else:
Expand Down
32 changes: 32 additions & 0 deletions tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,38 @@ async def retry(ctx):
assert '0.XXs ← testing:retry ●' in log


async def test_job_retry_dont_retry(arq_redis: ArqRedis, worker, caplog):
async def retry(ctx):
raise Retry(defer=0.01)

caplog.set_level(logging.INFO)
await arq_redis.enqueue_job('retry', _job_id='testing')
worker: Worker = worker(functions=[func(retry, name='retry')])
with pytest.raises(FailedJobs) as exc_info:
await worker.run_check(retry_jobs=False)
assert str(exc_info.value) == '1 job failed <Retry defer 0.01s>'

assert '↻' not in caplog.text
assert '! testing:retry failed, Retry: <Retry defer 0.01s>\n' in caplog.text


async def test_job_retry_max_jobs(arq_redis: ArqRedis, worker, caplog):
async def retry(ctx):
raise Retry(defer=0.01)

caplog.set_level(logging.INFO)
await arq_redis.enqueue_job('retry', _job_id='testing')
worker: Worker = worker(functions=[func(retry, name='retry')])
assert await worker.run_check(max_burst_jobs=1) == 0
assert worker.jobs_complete == 0
assert worker.jobs_retried == 1
assert worker.jobs_failed == 0

log = re.sub(r'(\d+).\d\ds', r'\1.XXs', caplog.text)
assert '0.XXs ↻ testing:retry retrying job in 0.XXs\n' in log
assert '0.XXs → testing:retry() try=2\n' not in log


async def test_job_job_not_found(arq_redis: ArqRedis, worker, caplog):
caplog.set_level(logging.INFO)
await arq_redis.enqueue_job('missing', _job_id='testing')
Expand Down

0 comments on commit b2d397a

Please sign in to comment.