diff --git a/HISTORY.rst b/HISTORY.rst index 8f169dc6..af6d9982 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -3,6 +3,10 @@ History ------- +v0.16 (unreleased) +...................... +* improved error when a job is aborted (eg. function not found) + v0.16.0b3 (2019-05-14) ...................... * fix semaphore on worker with many expired jobs diff --git a/arq/worker.py b/arq/worker.py index 1fbb83ec..eace6918 100644 --- a/arq/worker.py +++ b/arq/worker.py @@ -100,6 +100,13 @@ def __str__(self): return repr(self) +class JobExecutionFailed(RuntimeError): + def __eq__(self, other): + if isinstance(other, JobExecutionFailed): + return self.args == other.args + return False + + class FailedJobs(RuntimeError): def __init__(self, count, job_results): self.count = count @@ -108,9 +115,9 @@ def __init__(self, count, job_results): def __str__(self): if self.count == 1 and self.job_results: exc = self.job_results[0].result - return f'1 job failed "{exc.__class__.__name__}: {exc}"' + return f'1 job failed {exc!r}' else: - return f'{self.count} jobs failed' + return f'{self.count} jobs failed:\n' + '\n'.join(repr(r.result) for r in self.job_results) def __repr__(self): return f'<{str(self)}>' @@ -291,6 +298,7 @@ async def run_jobs(self, job_ids): self.tasks.append(t) async def run_job(self, job_id, score): # noqa: C901 + start_ms = timestamp_ms() v, job_try, _ = await asyncio.gather( self.pool.get(job_key_prefix + job_id, encoding=None), self.pool.incr(retry_key_prefix + job_id), @@ -299,7 +307,19 @@ async def run_job(self, job_id, score): # noqa: C901 if not v: logger.warning('job %s expired', job_id) self.jobs_failed += 1 - return await asyncio.shield(self.abort_job(job_id)) + result_data = pickle_result( + '', + (), + {}, + job_try, + 0, + False, + JobExecutionFailed('job expired'), + start_ms, + timestamp_ms(), + f'{job_id}:', + ) + return await asyncio.shield(self.abort_job(job_id, result_data)) function_name, args, kwargs, enqueue_job_try, enqueue_time_ms = unpickle_job_raw(v) @@ -308,7 +328,19 @@ async def run_job(self, job_id, score): # noqa: C901 except KeyError: logger.warning('job %s, function %r not found', job_id, function_name) self.jobs_failed += 1 - return await asyncio.shield(self.abort_job(job_id)) + result_data = pickle_result( + function_name, + args, + kwargs, + job_try, + enqueue_time_ms, + False, + JobExecutionFailed(f'function {function_name!r} not found'), + start_ms, + timestamp_ms(), + f'{job_id}:{function_name}', + ) + return await asyncio.shield(self.abort_job(job_id, result_data)) if hasattr(function, 'next_run'): # cron_job @@ -325,7 +357,19 @@ async def run_job(self, job_id, score): # noqa: C901 t = (timestamp_ms() - enqueue_time_ms) / 1000 logger.warning('%6.2fs ! %s max retries %d exceeded', t, ref, max_tries) self.jobs_failed += 1 - return await asyncio.shield(self.abort_job(job_id)) + result_data = pickle_result( + function_name, + args, + kwargs, + job_try, + enqueue_time_ms, + False, + JobExecutionFailed(f'max {max_tries} retries exceeded'), + start_ms, + timestamp_ms(), + ref, + ) + return await asyncio.shield(self.abort_job(job_id, result_data)) result = no_result exc_extra = None @@ -393,7 +437,9 @@ async def run_job(self, job_id, score): # noqa: C901 await asyncio.shield(self.finish_job(job_id, finish, result_data, result_timeout_s, incr_score)) - async def finish_job(self, job_id, finish, result_data, result_timeout_s, incr_score): + async def finish_job( + self, job_id: str, finish: bool, result_data: bytes, result_timeout_s: Optional[int], incr_score: int + ): with await self.pool as conn: await conn.unwatch() tr = conn.multi_exec() @@ -408,12 +454,13 @@ async def finish_job(self, job_id, finish, result_data, result_timeout_s, incr_s tr.delete(*delete_keys) await tr.execute() - async def abort_job(self, job_id): + async def abort_job(self, job_id: str, result_data: bytes): with await self.pool as conn: await conn.unwatch() tr = conn.multi_exec() tr.delete(retry_key_prefix + job_id, in_progress_key_prefix + job_id, job_key_prefix + job_id) tr.zrem(self.queue_name, job_id) + tr.setex(result_key_prefix + job_id, self.keep_result_s, result_data) await tr.execute() async def heart_beat(self): diff --git a/tests/test_worker.py b/tests/test_worker.py index 723c24a4..80fb6196 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -9,7 +9,7 @@ from arq.connections import ArqRedis from arq.constants import default_queue_name, health_check_key_suffix, job_key_prefix -from arq.worker import FailedJobs, Retry, Worker, async_check_health, check_health, func, run_worker +from arq.worker import FailedJobs, JobExecutionFailed, Retry, Worker, async_check_health, check_health, func, run_worker async def foobar(ctx): @@ -122,6 +122,18 @@ async def test_job_job_not_found(arq_redis: ArqRedis, worker, caplog): assert "job testing, function 'missing' not found" in log +async def test_job_job_not_found_run_check(arq_redis: ArqRedis, worker, caplog): + caplog.set_level(logging.INFO) + await arq_redis.enqueue_job('missing', _job_id='testing') + worker: Worker = worker(functions=[foobar]) + with pytest.raises(FailedJobs) as exc_info: + await worker.run_check() + + assert exc_info.value.count == 1 + assert len(exc_info.value.job_results) == 1 + assert exc_info.value.job_results[0].result == JobExecutionFailed("function 'missing' not found") + + async def test_retry_lots(arq_redis: ArqRedis, worker, caplog): async def retry(ctx): raise Retry() @@ -138,6 +150,17 @@ async def retry(ctx): assert ' X.XXs ! testing:retry max retries 5 exceeded' in log +async def test_retry_lots_check(arq_redis: ArqRedis, worker, caplog): + async def retry(ctx): + raise Retry() + + caplog.set_level(logging.INFO) + await arq_redis.enqueue_job('retry', _job_id='testing') + worker: Worker = worker(functions=[func(retry, name='retry')]) + with pytest.raises(FailedJobs, match='max 5 retries exceeded'): + await worker.run_check() + + async def test_cancel_error(arq_redis: ArqRedis, worker, caplog): async def retry(ctx): if ctx['job_try'] == 1: @@ -172,6 +195,20 @@ async def test_job_expired(arq_redis: ArqRedis, worker, caplog): assert 'job testing expired' in log +async def test_job_expired_run_check(arq_redis: ArqRedis, worker, caplog): + caplog.set_level(logging.INFO) + await arq_redis.enqueue_job('foobar', _job_id='testing') + await arq_redis.delete(job_key_prefix + 'testing') + worker: Worker = worker(functions=[foobar]) + with pytest.raises(FailedJobs) as exc_info: + await worker.run_check() + + assert str(exc_info.value) == "1 job failed JobExecutionFailed('job expired')" + assert exc_info.value.count == 1 + assert len(exc_info.value.job_results) == 1 + assert exc_info.value.job_results[0].result == JobExecutionFailed('job expired') + + async def test_job_old(arq_redis: ArqRedis, worker, caplog): caplog.set_level(logging.INFO) await arq_redis.enqueue_job('foobar', _job_id='testing', _defer_by=-2) @@ -311,7 +348,7 @@ async def test_run_check_passes(arq_redis: ArqRedis, worker): async def test_run_check_error(arq_redis: ArqRedis, worker): await arq_redis.enqueue_job('fails') worker: Worker = worker(functions=[func(fails, name='fails')]) - with pytest.raises(FailedJobs, match='1 job failed "TypeError: my type error"'): + with pytest.raises(FailedJobs, match=r"1 job failed TypeError\('my type error'\)"): await worker.run_check() @@ -319,7 +356,7 @@ async def test_run_check_error2(arq_redis: ArqRedis, worker): await arq_redis.enqueue_job('fails') await arq_redis.enqueue_job('fails') worker: Worker = worker(functions=[func(fails, name='fails')]) - with pytest.raises(FailedJobs, match='2 jobs failed') as exc_info: + with pytest.raises(FailedJobs, match='2 jobs failed:\n') as exc_info: await worker.run_check() assert len(exc_info.value.job_results) == 2