Skip to content

Commit

Permalink
adding coverage for Drain
Browse files Browse the repository at this point in the history
  • Loading branch information
samuelcolvin committed May 22, 2017
1 parent bdc8d64 commit 9ff28a7
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 48 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ isort:
lint:
python setup.py check -rms
flake8 arq/ tests/
pytest arq -p no:sugar -q --cache-clear
pytest arq -p no:sugar -q
mypy --ignore-missing-imports --follow-imports=skip arq/

.PHONY: test
Expand Down
49 changes: 24 additions & 25 deletions arq/drain.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ def __init__(self, *,
self.loop = redis_pool._loop
self.re_queue = re_queue
self.max_concurrent_tasks = max_concurrent_tasks
self.shutdown_delay = max(shutdown_delay, 0.5)
self.shutdown_delay = max(shutdown_delay, 0.1)
self.timeout_seconds = timeout_seconds
self.job_class = job_class
self._pending_tasks = set() # type: Set[asyncio.futures.Future]
self.pending_tasks = set() # type: Set[asyncio.futures.Future]
self.task_exception = None # type: Exception

self.jobs_complete, self.jobs_failed, self.jobs_timed_out = 0, 0, 0
Expand Down Expand Up @@ -67,9 +67,8 @@ async def iter(self, *raw_queues: bytes, pop_timeout=1):
:param pop_timeout: how long to wait on each blpop before yielding None.
:yield: tuple: (queue_name, data) or (None, None) if all jobs are empty
"""
work_logger.debug('starting main blpop loop, queues: %s', raw_queues)
if not self.running:
raise RuntimeError('cannot get jobs from the queues when processor is not running')
work_logger.debug('starting main blpop loop')
assert self.running, 'cannot get jobs from the queues when processor is not running'
while self.running:
msg = await self.redis.blpop(*raw_queues, timeout=pop_timeout)
if msg is None:
Expand All @@ -80,38 +79,38 @@ async def iter(self, *raw_queues: bytes, pop_timeout=1):
await self.wait()

def add(self, coro, job):
if self.running:
work_logger.debug('scheduling job from queue %s', job.queue)
task = self.loop.create_task(coro(job))
task.job = job
work_logger.debug('scheduling job from queue %s', job.queue)
task = self.loop.create_task(coro(job))
task.job = job

task.add_done_callback(self._job_callback)
self.loop.call_later(self.timeout_seconds, self._cancel_job, task, job)
self._pending_tasks.add(task)
else:
work_logger.warning('Job added when processor is not running, no task started.')
task.add_done_callback(self._job_callback)
self.loop.call_later(self.timeout_seconds, self._cancel_job, task, job)
self.pending_tasks.add(task)

async def wait(self):
pt_cnt = len(self._pending_tasks)
pt_cnt = len(self.pending_tasks)
while True:
if pt_cnt < self.max_concurrent_tasks:
return
work_logger.info('%d pending tasks, waiting for one to finish', pt_cnt)
_, self._pending_tasks = await asyncio.wait(self._pending_tasks, loop=self.loop,
return_when=asyncio.FIRST_COMPLETED)
pt_cnt = len(self._pending_tasks)
_, self.pending_tasks = await asyncio.wait(self.pending_tasks, loop=self.loop,
return_when=asyncio.FIRST_COMPLETED)
pt_cnt = len(self.pending_tasks)

async def finish(self, timeout=None):
timeout = timeout or self.shutdown_delay
if self._pending_tasks:
if self.pending_tasks:
with await self._finish_lock:
work_logger.info('processor waiting %0.1fs for %d tasks to finish', timeout, len(self._pending_tasks))
_, pending = await asyncio.wait(self._pending_tasks, timeout=timeout, loop=self.loop)
if pending and self.re_queue:
pipe = self.redis.pipeline()
work_logger.info('processor waiting %0.1fs for %d tasks to finish', timeout, len(self.pending_tasks))
_, pending = await asyncio.wait(self.pending_tasks, timeout=timeout, loop=self.loop)
if pending:
if self.re_queue:
pipe = self.redis.pipeline()
for task in pending:
pipe.rpush(task.job.raw_queue, task.job.raw_data)
await pipe.execute()
for task in pending:
pipe.rpush(task.job.raw_queue, task.job.raw_data)
await pipe.execute()
task.cancel()

def _job_callback(self, task):
self.jobs_complete += 1
Expand Down
7 changes: 4 additions & 3 deletions arq/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,16 @@ class Actor(RedisMixin, metaclass=ActorMeta):
LOW_QUEUE,
)

def __init__(self, *args, is_shadow=False, concurrency_enabled=True, **kwargs):
def __init__(self, *args, worker=None, concurrency_enabled=True, **kwargs):
"""
:param is_shadow: whether the actor should be in shadow mode, this should only be set by the worker
:param worker: reference to the worker which is managing this actor in shadow mode
:param concurrency_enabled: **For testing only** if set to False methods are called directly not queued
:param kwargs: other keyword arguments, see :class:`arq.utils.RedisMixin` for all available options
"""
self.queue_lookup = {q: self.QUEUE_PREFIX + q.encode() for q in self.queues}
self.name = self.name or self.__class__.__name__
self.is_shadow = is_shadow
self.worker = worker
self.is_shadow = bool(worker)
self._bind_concurrent()
self._concurrency_enabled = concurrency_enabled
super().__init__(*args, **kwargs)
Expand Down
24 changes: 16 additions & 8 deletions arq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,25 +76,30 @@ class BaseWorker(RedisMixin):

drain_class = Drain

#: Whether or not to re-queue jobs if the worker quits before the job has time to finish.
re_queue = False

def __init__(self, *,
burst: bool=False,
shadows: list=None,
queues: list=None,
timeout_seconds: int=None,
re_queue: bool=None,
**kwargs) -> None:
"""
:param burst: if true the worker will close as soon as no new jobs are found in the queue lists
:param shadows: list of :class:`arq.main.Actor` classes for the worker to run,
overrides shadows already defined in the class definition
:param queues: list of queue names for the worker to listen on, if None queues is taken from the shadows
:param timeout_seconds: maximum duration of a job, after that the job will be cancelled by the event loop
:param re_queue: Whether or not to re-queue jobs if the worker quits before the job has time to finish.
:param kwargs: other keyword arguments, see :class:`arq.utils.RedisMixin` for all available options
"""
self._burst_mode = burst
self.shadows = shadows or self.shadows
self.queues = queues
self.timeout_seconds = timeout_seconds or self.timeout_seconds
self._pending_tasks = set() # type: Set[asyncio.futures.Future]
self.re_queue = self.re_queue if re_queue is None else re_queue

self._shadow_lookup = {} # type: Dict[str, Actor]
self.start = None # type: float
Expand Down Expand Up @@ -129,7 +134,7 @@ async def shadow_kwargs(self):
"""
return dict(
redis_settings=self.redis_settings,
is_shadow=True,
worker=self,
loop=self.loop,
existing_pool=await self.get_redis_pool(),
)
Expand Down Expand Up @@ -195,7 +200,7 @@ async def run(self):

self.drain = self.drain_class(
redis_pool=await self.get_redis_pool(),
re_queue=False,
re_queue=self.re_queue,
max_concurrent_tasks=self.max_concurrent_tasks,
shutdown_delay=self.shutdown_delay - 1,
timeout_seconds=self.timeout_seconds,
Expand Down Expand Up @@ -258,6 +263,11 @@ def jobs_timed_out(self):
def running(self):
return self.drain and self.drain.running

@running.setter
def running(self, v):
if self.drain:
self.drain.running = v

async def record_health(self, redis, redis_queues, queue_lookup):
now_ts = timestamp()
if (now_ts - self.last_health_check) < self.health_check_interval:
Expand All @@ -271,7 +281,7 @@ async def record_health(self, redis, redis_queues, queue_lookup):
jobs_complete=self.jobs_complete,
jobs_failed=self.jobs_failed,
jobs_timed_out=self.jobs_timed_out,
pending_tasks=sum(not t.done() for t in self._pending_tasks),
pending_tasks=sum(not t.done() for t in self.drain.pending_tasks),
)
for redis_queue in redis_queues:
info += ' q_{}={}'.format(queue_lookup[redis_queue], await redis.llen(redis_queue))
Expand Down Expand Up @@ -371,8 +381,7 @@ async def close(self):
self._closed = True

def handle_proxy_signal(self, signum, frame):
if self.drain:
self.drain.running = False
self.running = False
work_logger.info('pid=%d, got signal proxied from main process, stopping...', os.getpid())
signal.signal(signal.SIGINT, self.handle_sig_force)
signal.signal(signal.SIGTERM, self.handle_sig_force)
Expand All @@ -381,8 +390,7 @@ def handle_proxy_signal(self, signum, frame):
raise HandledExit()

def handle_sig(self, signum, frame):
if self.drain:
self.drain.running = False
self.running = False
work_logger.info('pid=%d, got signal: %s, stopping...', os.getpid(), Signals(signum).name)
signal.signal(SIG_PROXY, signal.SIG_IGN)
signal.signal(signal.SIGINT, self.handle_sig_force)
Expand Down
5 changes: 5 additions & 0 deletions tests/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ class StartupWorker(BaseWorker):
shadows = [DemoActor, StartupActor]


class FastShutdownWorker(BaseWorker):
shadows = [DemoActor]
shutdown_delay = 0.1


class DrainQuit2(Drain):
def _job_callback(self, task):
super()._job_callback(task)
Expand Down
19 changes: 14 additions & 5 deletions tests/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
import msgpack
import pytest

from arq import Actor, BaseWorker, concurrent
from arq import Actor, BaseWorker, Job, concurrent
from arq.jobs import ArqError
from arq.testing import MockRedisWorker as MockRedisBaseWorker

from .fixtures import (ChildActor, DemoActor, FoobarActor, MockRedisDemoActor, MockRedisWorker, ParentActor,
Expand Down Expand Up @@ -64,6 +65,7 @@ async def test_dispatch_work(tmpworkdir, loop, caplog, redis_conn):
log = re.sub("QUIT-.*", "QUIT-<random>", log)
log = re.sub(r'\d{4}-\d+-\d+ \d+:\d+:\d+', '<date time>', log)
log = re.sub(r'\w{3}-\d+ \d+:\d+:\d+', '<date time2>', log)
print(log)
assert ('MockRedisDemoActor.add_numbers ▶ dft\n'
'MockRedisDemoActor.high_add_numbers ▶ high\n'
'Initialising work manager, burst mode: True\n'
Expand All @@ -72,11 +74,11 @@ async def test_dispatch_work(tmpworkdir, loop, caplog, redis_conn):
'shadows: MockRedisDemoActor | queues: high, dft, low\n'
'populating quit queue to prompt exit: QUIT-<random>\n'
'starting main blpop loop\n'
'recording health: <date time2> j_complete=0 j_failed=0 j_timedout=0 j_ongoing=0 q_high=1 q_dft=1 q_low=0\n'
'recording health: <date time2> j_complete=0 j_failed=0 j_timedout=0 j_ongoing=0 q_high=0 q_dft=1 q_low=0\n'
'scheduling job from queue high\n'
'scheduling job from queue dft\n'
'got job from the quit queue, stopping\n'
'shutting down worker, waiting for 2 jobs to finish\n'
'processor waiting 5.0s for 2 tasks to finish\n'
'high queued 0.0XXs → MockRedisDemoActor.high_add_numbers(3, 4, c=5)\n'
'high ran in 0.0XXs ← MockRedisDemoActor.high_add_numbers ● 12\n'
'dft queued 0.0XXs → MockRedisDemoActor.add_numbers(1, 2)\n'
Expand All @@ -99,11 +101,12 @@ async def test_handle_exception(loop, caplog):
log = re.sub('"/.*?/(\w+/\w+)\.py"', r'"/path/to/\1.py"', log)
log = re.sub(r'\d{4}-\d+-\d+ \d+:\d+:\d+', '<date time>', log)
log = re.sub(r'\w{3}-\d+ \d+:\d+:\d+', '<date time2>', log)
print(log)
assert ('Initialising work manager, burst mode: True\n'
'Running worker with 1 shadow listening to 3 queues\n'
'shadows: MockRedisDemoActor | queues: high, dft, low\n'
'recording health: <date time2> j_complete=0 j_failed=0 j_timedout=0 j_ongoing=0 q_high=0 q_dft=1 q_low=0\n'
'shutting down worker, waiting for 1 jobs to finish\n'
'recording health: <date time2> j_complete=0 j_failed=0 j_timedout=0 j_ongoing=0 q_high=0 q_dft=0 q_low=0\n'
'processor waiting 5.0s for 1 tasks to finish\n'
'dft queued 0.0XXs → MockRedisDemoActor.boom()\n'
'dft ran in 0.0XXs ! MockRedisDemoActor.boom(): RuntimeError\n'
'Traceback (most recent call last):\n'
Expand Down Expand Up @@ -242,3 +245,9 @@ async def test_bind_replication(tmpdir, loop):
await worker.run()
assert file1.read() == 'Parent'
assert file2.read() == 'Child'


def test_job_no_queue():
with pytest.raises(ArqError) as exc_info:
Job(b'foo')
assert 'either queue_name or raw_queue are required' in str(exc_info)
32 changes: 26 additions & 6 deletions tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
from arq.worker import import_string, start_worker

from .example import ActorTest
from .fixtures import (EXAMPLE_FILE, DemoActor, FoobarActor, MockRedisDemoActor, MockRedisWorker, MockRedisWorkerQuit,
StartupActor, StartupWorker, Worker, WorkerFail, WorkerQuit, kill_parent)
from .fixtures import (EXAMPLE_FILE, DemoActor, FastShutdownWorker, FoobarActor, MockRedisDemoActor, MockRedisWorker,
MockRedisWorkerQuit, StartupActor, StartupWorker, Worker, WorkerFail, WorkerQuit, kill_parent)


async def test_run_job_burst(tmpworkdir, redis_conn, actor):
Expand Down Expand Up @@ -264,8 +264,6 @@ async def test_job_timeout(loop, caplog):
await worker.run()
log = caplog(
('(\d.\d\d)\d', r'\1X'),
(', line \d+,', ', line <no>,'),
('"/.*?/(\w+/\w+)\.py"', r'"/path/to/\1.py"'),
)
print(log)
assert ('arq.jobs: dft queued 0.00Xs → MockRedisDemoActor.sleeper(0.2)\n'
Expand Down Expand Up @@ -345,8 +343,8 @@ async def test_startup_shutdown(tmpworkdir, redis_conn, loop):
assert worker.jobs_failed == 0
finally:
await actor.close(True)
assert tmpworkdir.join('events').read() == ('startup[True],concurrent_func[foobar],'
'shutdown[True],shutdown[False],')
assert tmpworkdir.join('events').read() == ('startup[True],concurrent_func[foobar],'
'shutdown[True],shutdown[False],')


def test_check_successful(redis_conn, loop):
Expand All @@ -365,3 +363,25 @@ async def test_check_successful_real_value(redis_conn, loop):
await worker.run()
assert 1 == await redis_conn.exists(b'arq:health-check')
assert 0 == await Worker(loop=loop)._check_health()


async def test_does_re_enqueue_job(loop, redis_conn):
worker = FastShutdownWorker(burst=True, loop=loop, re_queue=True)

actor = DemoActor(loop=loop)
await actor.sleeper(0.2)
await actor.close(True)

await worker.run()
assert 1 == await redis_conn.llen(b'arq:q:dft')


async def test_doesnt_re_enqueue_job(loop, redis_conn):
worker = FastShutdownWorker(burst=True, loop=loop, re_queue=False)

actor = DemoActor(loop=loop)
await actor.sleeper(0.2)
await actor.close(True)

await worker.run()
assert 0 == await redis_conn.llen(b'arq:q:dft')

0 comments on commit 9ff28a7

Please sign in to comment.