Skip to content

Commit

Permalink
use pipeline rather than gather
Browse files Browse the repository at this point in the history
  • Loading branch information
samuelcolvin committed Apr 17, 2019
1 parent 2105f14 commit 5d67dc4
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 4 deletions.
1 change: 1 addition & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ History
v0.16.0a4 (unreleased)
......................
* add ``Worker.run_check``, fix #115
* use ``pipeline`` in ``enqueue_job``

v0.16.0a3 (2010-03-12)
......................
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ install:
pip install -r requirements.txt
pip install -e .[watch]

.PHONY: isort
.PHONY: format
format:
$(isort)
$(black)
Expand Down
10 changes: 7 additions & 3 deletions arq/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,12 @@ async def enqueue_job(
expires_ms = to_ms(_expires)

with await self as conn:
_, _, job_exists = await asyncio.gather(conn.unwatch(), conn.watch(job_key), conn.exists(job_key))
if job_exists:
pipe = conn.pipeline()
pipe.unwatch()
pipe.watch(job_key)
job_exists = pipe.exists(job_key)
await pipe.execute()
if await job_exists:
return

enqueue_time_ms = timestamp_ms()
Expand All @@ -99,7 +103,7 @@ async def enqueue_job(
try:
await tr.execute()
except MultiExecError:
# job got enqueued since we got 'job_exists'
# job got enqueued since we checked 'job_exists'
return
return Job(job_id, self)

Expand Down

0 comments on commit 5d67dc4

Please sign in to comment.