Skip to content

Commit

Permalink
fixing worker semaphore (#128)
Browse files Browse the repository at this point in the history
* fixing worker semaphore

* add tests for many expired jobs

* tweak history
  • Loading branch information
samuelcolvin authored May 14, 2019
1 parent 26135d3 commit c12c83d
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 5 deletions.
4 changes: 4 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
History
-------

v0.16.0b3 (2019-05-14)
......................
* fix semaphore on worker with many expired jobs

v0.16.0b2 (2019-05-14)
......................
* add support for different queues, #127 thanks @tsutsarin
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
.DEFAULT_GOAL := all
isort = isort -rc arq tests
black = black -S -l 120 --py36 arq tests
black = black -S -l 120 --target-version py37 arq tests

.PHONY: install
install:
Expand Down
2 changes: 1 addition & 1 deletion arq/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

__all__ = ['VERSION']

VERSION = StrictVersion('0.16b2')
VERSION = StrictVersion('0.16b3')
7 changes: 4 additions & 3 deletions arq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ async def main(self):
await self.on_startup(self.ctx)

async for _ in poll(self.poll_delay_s): # noqa F841
async with self.sem: # don't both with zrangebyscore until we have "space" to run the jobs
async with self.sem: # don't bother with zrangebyscore until we have "space" to run the jobs
now = timestamp_ms()
job_ids = await self.pool.zrangebyscore(self.queue_name, max=now)
await self.run_jobs(job_ids)
Expand Down Expand Up @@ -286,7 +286,9 @@ async def run_jobs(self, job_ids):
# job already started elsewhere since we got 'existing'
self.sem.release()
else:
self.tasks.append(self.loop.create_task(self.run_job(job_id, score)))
t = self.loop.create_task(self.run_job(job_id, score))
t.add_done_callback(lambda _: self.sem.release())
self.tasks.append(t)

async def run_job(self, job_id, score): # noqa: C901
v, job_try, _ = await asyncio.gather(
Expand Down Expand Up @@ -405,7 +407,6 @@ async def finish_job(self, job_id, finish, result_data, result_timeout_s, incr_s
tr.zincrby(self.queue_name, incr_score, job_id)
tr.delete(*delete_keys)
await tr.execute()
self.sem.release()

async def abort_job(self, job_id):
with await self.pool as conn:
Expand Down
18 changes: 18 additions & 0 deletions tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,3 +345,21 @@ async def test_error_success(arq_redis: ArqRedis, worker):
assert (worker.jobs_complete, worker.jobs_failed, worker.jobs_retried) == (0, 1, 0)
info = await j.result_info()
assert info.success is False


async def test_many_jobs_expire(arq_redis: ArqRedis, worker, caplog):
caplog.set_level(logging.INFO)
await arq_redis.enqueue_job('foobar')
await asyncio.gather(*[arq_redis.zadd(default_queue_name, 1, f'testing-{i}') for i in range(100)])
worker: Worker = worker(functions=[foobar])
assert worker.jobs_complete == 0
assert worker.jobs_failed == 0
assert worker.jobs_retried == 0
await worker.main()
assert worker.jobs_complete == 1
assert worker.jobs_failed == 100
assert worker.jobs_retried == 0

log = '\n'.join(r.message for r in caplog.records)
assert 'job testing-0 expired' in log
assert log.count(' expired') == 100

0 comments on commit c12c83d

Please sign in to comment.