Skip to content

Commit

Permalink
prevent duplicate jobs when result exists (python-arq#138)
Browse files Browse the repository at this point in the history
  • Loading branch information
samuelcolvin authored Aug 2, 2019
1 parent c4e89b3 commit 06c05d5
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 1 deletion.
4 changes: 4 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
History
-------

v0.16.1 (unreleased)
....................
* prevent duplicate ``job_id`` when job result exists, fix #137

v0.16 (2019-07-30)
..................
* improved error when a job is aborted (eg. function not found)
Expand Down
3 changes: 2 additions & 1 deletion arq/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ async def enqueue_job(
pipe.unwatch()
pipe.watch(job_key)
job_exists = pipe.exists(job_key)
job_result_exists = pipe.exists(result_key_prefix + job_id)
await pipe.execute()
if await job_exists:
if await job_exists or await job_result_exists:
return

enqueue_time_ms = timestamp_ms()
Expand Down
14 changes: 14 additions & 0 deletions tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from arq.connections import ArqRedis
from arq.constants import default_queue_name, health_check_key_suffix, job_key_prefix
from arq.jobs import Job, JobStatus
from arq.worker import FailedJobs, JobExecutionFailed, Retry, Worker, async_check_health, check_health, func, run_worker


Expand Down Expand Up @@ -403,3 +404,16 @@ async def test_many_jobs_expire(arq_redis: ArqRedis, worker, caplog):
log = '\n'.join(r.message for r in caplog.records)
assert 'job testing-0 expired' in log
assert log.count(' expired') == 100


async def test_repeat_job_result(arq_redis: ArqRedis, worker):
j1 = await arq_redis.enqueue_job('foobar', _job_id='job_id')
assert isinstance(j1, Job)
assert await j1.status() == JobStatus.queued

assert await arq_redis.enqueue_job('foobar', _job_id='job_id') is None

await worker(functions=[foobar]).run_check()
assert await j1.status() == JobStatus.complete

assert await arq_redis.enqueue_job('foobar', _job_id='job_id') is None

0 comments on commit 06c05d5

Please sign in to comment.