Skip to content

Commit

Permalink
improved error when a job is aborted
Browse files Browse the repository at this point in the history
  • Loading branch information
samuelcolvin committed Jul 30, 2019
1 parent 11f7ccc commit b50824b
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 10 deletions.
4 changes: 4 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
History
-------

v0.16 (unreleased)
......................
* improved error when a job is aborted (eg. function not found)

v0.16.0b3 (2019-05-14)
......................
* fix semaphore on worker with many expired jobs
Expand Down
61 changes: 54 additions & 7 deletions arq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,13 @@ def __str__(self):
return repr(self)


class JobExecutionFailed(RuntimeError):
def __eq__(self, other):
if isinstance(other, JobExecutionFailed):
return self.args == other.args
return False


class FailedJobs(RuntimeError):
def __init__(self, count, job_results):
self.count = count
Expand All @@ -108,9 +115,9 @@ def __init__(self, count, job_results):
def __str__(self):
if self.count == 1 and self.job_results:
exc = self.job_results[0].result
return f'1 job failed "{exc.__class__.__name__}: {exc}"'
return f'1 job failed {exc!r}'
else:
return f'{self.count} jobs failed'
return f'{self.count} jobs failed:\n' + '\n'.join(repr(r.result) for r in self.job_results)

def __repr__(self):
return f'<{str(self)}>'
Expand Down Expand Up @@ -291,6 +298,7 @@ async def run_jobs(self, job_ids):
self.tasks.append(t)

async def run_job(self, job_id, score): # noqa: C901
start_ms = timestamp_ms()
v, job_try, _ = await asyncio.gather(
self.pool.get(job_key_prefix + job_id, encoding=None),
self.pool.incr(retry_key_prefix + job_id),
Expand All @@ -299,7 +307,19 @@ async def run_job(self, job_id, score): # noqa: C901
if not v:
logger.warning('job %s expired', job_id)
self.jobs_failed += 1
return await asyncio.shield(self.abort_job(job_id))
result_data = pickle_result(
'<unknown>',
(),
{},
job_try,
0,
False,
JobExecutionFailed('job expired'),
start_ms,
timestamp_ms(),
f'{job_id}:<unknown function>',
)
return await asyncio.shield(self.abort_job(job_id, result_data))

function_name, args, kwargs, enqueue_job_try, enqueue_time_ms = unpickle_job_raw(v)

Expand All @@ -308,7 +328,19 @@ async def run_job(self, job_id, score): # noqa: C901
except KeyError:
logger.warning('job %s, function %r not found', job_id, function_name)
self.jobs_failed += 1
return await asyncio.shield(self.abort_job(job_id))
result_data = pickle_result(
function_name,
args,
kwargs,
job_try,
enqueue_time_ms,
False,
JobExecutionFailed(f'function {function_name!r} not found'),
start_ms,
timestamp_ms(),
f'{job_id}:{function_name}',
)
return await asyncio.shield(self.abort_job(job_id, result_data))

if hasattr(function, 'next_run'):
# cron_job
Expand All @@ -325,7 +357,19 @@ async def run_job(self, job_id, score): # noqa: C901
t = (timestamp_ms() - enqueue_time_ms) / 1000
logger.warning('%6.2fs ! %s max retries %d exceeded', t, ref, max_tries)
self.jobs_failed += 1
return await asyncio.shield(self.abort_job(job_id))
result_data = pickle_result(
function_name,
args,
kwargs,
job_try,
enqueue_time_ms,
False,
JobExecutionFailed(f'max {max_tries} retries exceeded'),
start_ms,
timestamp_ms(),
ref,
)
return await asyncio.shield(self.abort_job(job_id, result_data))

result = no_result
exc_extra = None
Expand Down Expand Up @@ -393,7 +437,9 @@ async def run_job(self, job_id, score): # noqa: C901

await asyncio.shield(self.finish_job(job_id, finish, result_data, result_timeout_s, incr_score))

async def finish_job(self, job_id, finish, result_data, result_timeout_s, incr_score):
async def finish_job(
self, job_id: str, finish: bool, result_data: bytes, result_timeout_s: Optional[int], incr_score: int
):
with await self.pool as conn:
await conn.unwatch()
tr = conn.multi_exec()
Expand All @@ -408,12 +454,13 @@ async def finish_job(self, job_id, finish, result_data, result_timeout_s, incr_s
tr.delete(*delete_keys)
await tr.execute()

async def abort_job(self, job_id):
async def abort_job(self, job_id: str, result_data: bytes):
with await self.pool as conn:
await conn.unwatch()
tr = conn.multi_exec()
tr.delete(retry_key_prefix + job_id, in_progress_key_prefix + job_id, job_key_prefix + job_id)
tr.zrem(self.queue_name, job_id)
tr.setex(result_key_prefix + job_id, self.keep_result_s, result_data)
await tr.execute()

async def heart_beat(self):
Expand Down
43 changes: 40 additions & 3 deletions tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from arq.connections import ArqRedis
from arq.constants import default_queue_name, health_check_key_suffix, job_key_prefix
from arq.worker import FailedJobs, Retry, Worker, async_check_health, check_health, func, run_worker
from arq.worker import FailedJobs, JobExecutionFailed, Retry, Worker, async_check_health, check_health, func, run_worker


async def foobar(ctx):
Expand Down Expand Up @@ -122,6 +122,18 @@ async def test_job_job_not_found(arq_redis: ArqRedis, worker, caplog):
assert "job testing, function 'missing' not found" in log


async def test_job_job_not_found_run_check(arq_redis: ArqRedis, worker, caplog):
caplog.set_level(logging.INFO)
await arq_redis.enqueue_job('missing', _job_id='testing')
worker: Worker = worker(functions=[foobar])
with pytest.raises(FailedJobs) as exc_info:
await worker.run_check()

assert exc_info.value.count == 1
assert len(exc_info.value.job_results) == 1
assert exc_info.value.job_results[0].result == JobExecutionFailed("function 'missing' not found")


async def test_retry_lots(arq_redis: ArqRedis, worker, caplog):
async def retry(ctx):
raise Retry()
Expand All @@ -138,6 +150,17 @@ async def retry(ctx):
assert ' X.XXs ! testing:retry max retries 5 exceeded' in log


async def test_retry_lots_check(arq_redis: ArqRedis, worker, caplog):
async def retry(ctx):
raise Retry()

caplog.set_level(logging.INFO)
await arq_redis.enqueue_job('retry', _job_id='testing')
worker: Worker = worker(functions=[func(retry, name='retry')])
with pytest.raises(FailedJobs, match='max 5 retries exceeded'):
await worker.run_check()


async def test_cancel_error(arq_redis: ArqRedis, worker, caplog):
async def retry(ctx):
if ctx['job_try'] == 1:
Expand Down Expand Up @@ -172,6 +195,20 @@ async def test_job_expired(arq_redis: ArqRedis, worker, caplog):
assert 'job testing expired' in log


async def test_job_expired_run_check(arq_redis: ArqRedis, worker, caplog):
caplog.set_level(logging.INFO)
await arq_redis.enqueue_job('foobar', _job_id='testing')
await arq_redis.delete(job_key_prefix + 'testing')
worker: Worker = worker(functions=[foobar])
with pytest.raises(FailedJobs) as exc_info:
await worker.run_check()

assert str(exc_info.value) == "1 job failed JobExecutionFailed('job expired')"
assert exc_info.value.count == 1
assert len(exc_info.value.job_results) == 1
assert exc_info.value.job_results[0].result == JobExecutionFailed('job expired')


async def test_job_old(arq_redis: ArqRedis, worker, caplog):
caplog.set_level(logging.INFO)
await arq_redis.enqueue_job('foobar', _job_id='testing', _defer_by=-2)
Expand Down Expand Up @@ -311,15 +348,15 @@ async def test_run_check_passes(arq_redis: ArqRedis, worker):
async def test_run_check_error(arq_redis: ArqRedis, worker):
await arq_redis.enqueue_job('fails')
worker: Worker = worker(functions=[func(fails, name='fails')])
with pytest.raises(FailedJobs, match='1 job failed "TypeError: my type error"'):
with pytest.raises(FailedJobs, match=r"1 job failed TypeError\('my type error'\)"):
await worker.run_check()


async def test_run_check_error2(arq_redis: ArqRedis, worker):
await arq_redis.enqueue_job('fails')
await arq_redis.enqueue_job('fails')
worker: Worker = worker(functions=[func(fails, name='fails')])
with pytest.raises(FailedJobs, match='2 jobs failed') as exc_info:
with pytest.raises(FailedJobs, match='2 jobs failed:\n') as exc_info:
await worker.run_check()
assert len(exc_info.value.job_results) == 2

Expand Down

0 comments on commit b50824b

Please sign in to comment.