From 5d67dc422727e63ae4cb1221e9251adbae46b026 Mon Sep 17 00:00:00 2001 From: Samuel Colvin Date: Wed, 17 Apr 2019 19:43:18 +0100 Subject: [PATCH] use pipeline rather than gather --- HISTORY.rst | 1 + Makefile | 2 +- arq/connections.py | 10 +++++++--- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/HISTORY.rst b/HISTORY.rst index 7e8e7fd8..6c4eb977 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -6,6 +6,7 @@ History v0.16.0a4 (unreleased) ...................... * add ``Worker.run_check``, fix #115 +* use ``pipeline`` in ``enqueue_job`` v0.16.0a3 (2010-03-12) ...................... diff --git a/Makefile b/Makefile index c617a697..a869ca45 100644 --- a/Makefile +++ b/Makefile @@ -8,7 +8,7 @@ install: pip install -r requirements.txt pip install -e .[watch] -.PHONY: isort +.PHONY: format format: $(isort) $(black) diff --git a/arq/connections.py b/arq/connections.py index 5b46004f..4e4411cb 100644 --- a/arq/connections.py +++ b/arq/connections.py @@ -78,8 +78,12 @@ async def enqueue_job( expires_ms = to_ms(_expires) with await self as conn: - _, _, job_exists = await asyncio.gather(conn.unwatch(), conn.watch(job_key), conn.exists(job_key)) - if job_exists: + pipe = conn.pipeline() + pipe.unwatch() + pipe.watch(job_key) + job_exists = pipe.exists(job_key) + await pipe.execute() + if await job_exists: return enqueue_time_ms = timestamp_ms() @@ -99,7 +103,7 @@ async def enqueue_job( try: await tr.execute() except MultiExecError: - # job got enqueued since we got 'job_exists' + # job got enqueued since we checked 'job_exists' return return Job(job_id, self)