Skip to content

Commit

Permalink
watch mode (#114)
Browse files Browse the repository at this point in the history
* watch mode

* tests and docs

* tweak worker

* improve cli
  • Loading branch information
samuelcolvin authored Mar 6, 2019
1 parent 98982eb commit f911cf3
Show file tree
Hide file tree
Showing 13 changed files with 107 additions and 44 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ __pycache__/
/htmlcov/
/build
/dist
/test.py
/demo.py
*.egg-info
/docs/_build/
/.mypy_cache/
Expand Down
1 change: 1 addition & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ History
v0.16.0a1 (2010-03-06)
......................
* add ``job_try`` argument to ``enqueue_job``, #113
* adding ``--watch`` mode to the worker (requires ``watchgod``), #114

v0.16.0a1 (2010-03-05)
......................
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ black = black -S -l 120 --py36 arq tests
install:
pip install -U pip setuptools
pip install -r requirements.txt
pip install -e .
pip install -e .[watch]

.PHONY: isort
format:
Expand Down
35 changes: 31 additions & 4 deletions arq/cli.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
import asyncio
import logging.config
from signal import Signals

import click
from pydantic.utils import import_string

from .logs import default_log_config
from .version import VERSION
from .worker import check_health, run_worker
from .worker import check_health, create_worker, run_worker

burst_help = 'Batch mode: exit once no jobs are found in any queue.'
health_check_help = 'Health Check: run a health check and exit'
health_check_help = 'Health Check: run a health check and exit.'
watch_help = 'Watch a directory and reload the worker upon changes.'
verbose_help = 'Enable verbose output.'


Expand All @@ -17,8 +20,9 @@
@click.argument('worker-settings', type=str, required=True)
@click.option('--burst/--no-burst', default=None, help=burst_help)
@click.option('--check', is_flag=True, help=health_check_help)
@click.option('--watch', type=click.Path(exists=True, dir_okay=True, file_okay=False), help=watch_help)
@click.option('-v', '--verbose', is_flag=True, help=verbose_help)
def cli(*, worker_settings, burst, check, verbose):
def cli(*, worker_settings, burst, check, watch, verbose):
"""
Job queues in python with asyncio and redis.
Expand All @@ -31,4 +35,27 @@ def cli(*, worker_settings, burst, check, verbose):
exit(check_health(worker_settings))
else:
kwargs = {} if burst is None else {'burst': burst}
run_worker(worker_settings, **kwargs)
if watch:
loop = asyncio.get_event_loop()
loop.run_until_complete(watch_reload(watch, worker_settings, loop))
else:
run_worker(worker_settings, **kwargs)


async def watch_reload(path, worker_settings, loop):
try:
from watchgod import awatch
except ImportError as e: # pragma: no cover
raise ImportError('watchgod not installed, use `pip install watchgod`') from e

stop_event = asyncio.Event()
worker = create_worker(worker_settings)
try:
worker.on_stop = lambda s: s != Signals.SIGUSR1 and stop_event.set()
loop.create_task(worker.async_run())
async for _ in awatch(path, stop_event=stop_event):
print('\nfiles changed, reloading arq worker...')
worker.handle_sig(Signals.SIGUSR1)
loop.create_task(worker.async_run())
finally:
await worker.close()
2 changes: 1 addition & 1 deletion arq/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

__all__ = ['VERSION']

VERSION = StrictVersion('0.16a1')
VERSION = StrictVersion('0.16a2')
31 changes: 22 additions & 9 deletions arq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,12 @@ def __init__(
self.jobs_failed = 0
self._last_health_check = 0
self._last_health_check_log = None

def run(self):
self._add_signal_handler(signal.SIGINT, self.handle_sig)
self._add_signal_handler(signal.SIGTERM, self.handle_sig)
self.main_task = self.loop.create_task(self.arun())
self.on_stop = None

def run(self):
self.main_task = self.loop.create_task(self.main())
try:
self.loop.run_until_complete(self.main_task)
except asyncio.CancelledError:
Expand All @@ -182,7 +183,15 @@ def run(self):
finally:
self.loop.run_until_complete(self.close())

async def arun(self):
async def async_run(self):
self.main_task = self.loop.create_task(self.main())
try:
await self.main_task
except asyncio.CancelledError:
# happens on shutdown, fine
raise

async def main(self):
if self.pool is None:
self.pool = await create_pool(self.redis_settings)

Expand Down Expand Up @@ -409,9 +418,10 @@ def _add_signal_handler(self, signal, handler):
self.loop.add_signal_handler(signal, partial(handler, signal))

def handle_sig(self, signum):
sig = Signals(signum)
logger.info(
'shutdown on %s ◆ %d jobs complete ◆ %d failed ◆ %d retries ◆ %d ongoing to cancel',
Signals(signum).name,
sig.name,
self.jobs_complete,
self.jobs_failed,
self.jobs_retried,
Expand All @@ -421,6 +431,7 @@ def handle_sig(self, signum):
if not t.done():
t.cancel()
self.main_task and self.main_task.cancel()
self.on_stop and self.on_stop(sig)

async def close(self):
if not self.pool:
Expand All @@ -446,10 +457,12 @@ def get_kwargs(settings_cls):
return {k: v for k, v in d.items() if k in worker_args}


def run_worker(settings_cls, **kwargs):
kwargs_ = get_kwargs(settings_cls)
kwargs_.update(kwargs)
worker = Worker(**kwargs_)
def create_worker(settings_cls, **kwargs) -> Worker:
return Worker(**{**get_kwargs(settings_cls), **kwargs})


def run_worker(settings_cls, **kwargs) -> Worker:
worker = create_worker(settings_cls, **kwargs)
worker.run()
return worker

Expand Down
9 changes: 8 additions & 1 deletion docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,15 @@ To execute the jobs, either after running ``demo.py`` or before/during::

arq demo.WorkerSettings

Append (``--burst``) to stop the worker once all jobs have finished. See :class:`arq.worker.Worker` for more available
Append ``--burst`` to stop the worker once all jobs have finished. See :class:`arq.worker.Worker` for more available
properties of ``WorkerSettings``.

You can also watch for changes and reload the worker when the source changes::

arq demo.WorkerSettings --watch path/to/src

This requires watchgod_ to be installed (``pip install watchgod``).

For details on the *arq* CLI::

arq --help
Expand Down Expand Up @@ -222,4 +228,5 @@ Reference
.. |license| image:: https://img.shields.io/pypi/l/arq.svg
:target: https://github.com/samuelcolvin/arq
.. _asyncio: https://docs.python.org/3/library/asyncio.html
.. _watchgod: https://pypi.org/project/watchgod/
.. _rq: http://python-rq.org/
3 changes: 3 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,7 @@
'click>=6.7',
'pydantic>=0.20',
],
extras_require={
'watch': ['watchgod>=0.4'],
}
)
12 changes: 12 additions & 0 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,15 @@ def test_check():
result = runner.invoke(cli, ['tests.test_cli.WorkerSettings', '--check'])
assert result.exit_code == 1
assert 'Health check failed: no health check sentinel value found' in result.output


async def mock_awatch():
yield [1]


def test_run_watch(mocker):
mocker.patch('watchgod.awatch', return_value=mock_awatch())
runner = CliRunner()
result = runner.invoke(cli, ['tests.test_cli.WorkerSettings', '--watch', 'tests'])
assert result.exit_code == 0
assert '1 files changes, reloading arq worker...'
4 changes: 2 additions & 2 deletions tests/test_cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ async def foobar(ctx):
async def test_job_successful(worker, caplog):
caplog.set_level(logging.INFO)
worker: Worker = worker(cron_jobs=[cron(foobar, hour=1, run_at_startup=True)])
await worker.arun()
await worker.main()
assert worker.jobs_complete == 1
assert worker.jobs_failed == 0
assert worker.jobs_retried == 0
Expand All @@ -104,7 +104,7 @@ async def test_job_successful(worker, caplog):
async def test_not_run(worker, caplog):
caplog.set_level(logging.INFO)
worker: Worker = worker(cron_jobs=[cron(foobar, hour=1, run_at_startup=False)])
await worker.arun()
await worker.main()
assert worker.jobs_complete == 0
assert worker.jobs_failed == 0
assert worker.jobs_retried == 0
Expand Down
2 changes: 1 addition & 1 deletion tests/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async def foobar(ctx, *args, **kwargs):
assert isinstance(j, Job)
assert JobStatus.queued == await j.status()
worker: Worker = worker(functions=[func(foobar, name='foobar')])
await worker.arun()
await worker.main()
r = await j.result(pole_delay=0)
assert r == 42
assert JobStatus.complete == await j.status()
Expand Down
16 changes: 8 additions & 8 deletions tests/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from arq.constants import queue_name
from arq.jobs import Job
from arq.utils import timestamp_ms
from arq.worker import Worker, func, Retry
from arq.worker import Retry, Worker, func


async def test_enqueue_job(arq_redis: ArqRedis, worker):
Expand All @@ -20,9 +20,9 @@ async def foobar(ctx):

j = await arq_redis.enqueue_job('foobar')
worker: Worker = worker(functions=[func(foobar, name='foobar')])
await worker.arun()
await worker.main()
r = await j.result(pole_delay=0)
assert r == 42
assert r == 42 # 1


async def test_job_error(arq_redis: ArqRedis, worker):
Expand All @@ -31,7 +31,7 @@ async def foobar(ctx):

j = await arq_redis.enqueue_job('foobar')
worker: Worker = worker(functions=[func(foobar, name='foobar')])
await worker.arun()
await worker.main()

with pytest.raises(RuntimeError, match='foobar error'):
await j.result(pole_delay=0)
Expand Down Expand Up @@ -93,7 +93,7 @@ async def count(ctx, v):
await asyncio.gather(*tasks)

worker: Worker = worker(functions=[func(count, name='count')])
await worker.arun()
await worker.main()
assert counter.most_common(1)[0][1] == 1 # no job go enqueued twice


Expand All @@ -103,12 +103,12 @@ async def foobar(ctx):

j1 = await arq_redis.enqueue_job('foobar')
w: Worker = worker(functions=[func(foobar, name='foobar')])
await w.arun()
await w.main()
r = await j1.result(pole_delay=0)
assert r == 1

j2 = await arq_redis.enqueue_job('foobar', _job_try=3)
await w.arun()
await w.main()
r = await j2.result(pole_delay=0)
assert r == 3

Expand All @@ -121,6 +121,6 @@ async def foobar(ctx):

j1 = await arq_redis.enqueue_job('foobar', _job_try=3)
w: Worker = worker(functions=[func(foobar, name='foobar')])
await w.arun()
await w.main()
r = await j1.result(pole_delay=0)
assert r == 4
Loading

0 comments on commit f911cf3

Please sign in to comment.