Skip to content

Commit

Permalink
adding async-timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
samuelcolvin committed Jun 5, 2017
1 parent 13e50d4 commit a15ee0e
Show file tree
Hide file tree
Showing 8 changed files with 27 additions and 15 deletions.
5 changes: 5 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
History
-------

v0.8.0 (TBC)
............
* add ``async-timeout`` dependency and use async timeout around ``shadow_factory``
* change logger name for control process log messages

v0.7.0 (2017-06-01)
...................
* implementing reusable ``Drain`` which takes tasks from a redis list and allows them to be execute asynchronously.
Expand Down
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
.DEFAULT_GOAL := all

.PHONY: install
install:
pip install -U pip setuptools
Expand Down
2 changes: 1 addition & 1 deletion arq/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

__all__ = ['VERSION']

VERSION = StrictVersion('0.7.0')
VERSION = StrictVersion('0.8.0')
24 changes: 14 additions & 10 deletions arq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
from signal import Signals
from typing import Dict, List, Type # noqa

from async_timeout import timeout

from .drain import Drain
from .jobs import ArqError, Job
from .logs import default_log_config
Expand All @@ -25,6 +27,7 @@
__all__ = ['BaseWorker', 'RunWorkerProcess', 'StopJob', 'import_string']

work_logger = logging.getLogger('arq.work')
ctrl_logger = logging.getLogger('arq.control')
jobs_logger = logging.getLogger('arq.jobs')


Expand Down Expand Up @@ -173,9 +176,10 @@ async def run(self):
perform jobs.
"""
self._stopped = False
work_logger.info('Initialising work manager, burst mode: %s', self._burst_mode)
work_logger.info('Initialising work manager, burst mode: %s, creating shadows...', self._burst_mode)

shadows = await self.shadow_factory()
with timeout(10):
shadows = await self.shadow_factory()
assert isinstance(shadows, list), 'shadow_factory should return a list not %s' % type(shadows)
self.job_class = shadows[0].job_class
work_logger.debug('Using first shadows job class "%s"', self.job_class.__name__)
Expand Down Expand Up @@ -416,8 +420,8 @@ def start_worker(worker_path: str, worker_class: str, burst: bool, loop: asyncio
:param loop: asyncio loop use to or None
"""
worker_cls = import_string(worker_path, worker_class)
worker = worker_cls(burst=burst, loop=loop)
work_logger.info('Starting "%s" on pid=%d', worker_cls.__name__, os.getpid())
worker = worker_cls(burst=burst, loop=loop)
try:
worker.run_until_complete()
except HandledExit:
Expand All @@ -444,32 +448,32 @@ def __init__(self, worker_path, worker_class, burst=False):

def run_worker(self, worker_path, worker_class, burst):
name = 'WorkProcess'
work_logger.info('starting work process "%s"', name)
ctrl_logger.info('starting work process "%s"', name)
self.process = Process(target=start_worker, args=(worker_path, worker_class, burst), name=name)
self.process.start()
self.process.join()
if self.process.exitcode == 0:
work_logger.info('worker process exited ok')
ctrl_logger.info('worker process exited ok')
return
work_logger.critical('worker process %s exited badly with exit code %s',
ctrl_logger.critical('worker process %s exited badly with exit code %s',
self.process.pid, self.process.exitcode)
sys.exit(3)
# could restart worker here, but better to leave it up to the real manager eg. docker restart: always

def handle_sig(self, signum, frame):
signal.signal(signal.SIGINT, self.handle_sig_force)
signal.signal(signal.SIGTERM, self.handle_sig_force)
work_logger.info('got signal: %s, waiting for worker pid=%s to finish...', Signals(signum).name,
ctrl_logger.info('got signal: %s, waiting for worker pid=%s to finish...', Signals(signum).name,
self.process and self.process.pid)
# sleep to make sure worker.handle_sig above has executed if it's going to and detached handle_proxy_signal
time.sleep(0.01)
if self.process and self.process.is_alive():
work_logger.debug("sending custom shutdown signal to worker in case it didn't receive the signal")
ctrl_logger.debug("sending custom shutdown signal to worker in case it didn't receive the signal")
os.kill(self.process.pid, SIG_PROXY)

def handle_sig_force(self, signum, frame):
work_logger.warning('got signal: %s again, forcing exit', Signals(signum).name)
ctrl_logger.warning('got signal: %s again, forcing exit', Signals(signum).name)
if self.process and self.process.is_alive():
work_logger.error('sending worker %d SIGTERM', self.process.pid)
ctrl_logger.error('sending worker %d SIGTERM', self.process.pid)
os.kill(self.process.pid, signal.SIGTERM)
raise ImmediateExit('force exit')
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
arq=arq.cli:cli
""",
install_requires=[
'async-timeout==1.1.0',
'aioredis>=0.2.9',
'click>=6.6',
'msgpack-python>=0.4.8',
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,5 @@ def mock_actor_worker(mock_actor):

@pytest.fixture
def caplog(caplog):
caplog.set_loggers(log_names=('arq.main', 'arq.work', 'arq.jobs'), fmt='%(name)s: %(message)s')
caplog.set_loggers(log_names=('arq.control', 'arq.main', 'arq.work', 'arq.jobs'), fmt='%(name)s: %(message)s')
return caplog
4 changes: 2 additions & 2 deletions tests/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ async def test_dispatch_work(tmpworkdir, loop, caplog, redis_conn):
print(log)
assert ('MockRedisDemoActor.add_numbers ▶ dft\n'
'MockRedisDemoActor.high_add_numbers ▶ high\n'
'Initialising work manager, burst mode: True\n'
'Initialising work manager, burst mode: True, creating shadows...\n'
'Using first shadows job class "Job"\n'
'Running worker with 1 shadow listening to 3 queues\n'
'shadows: MockRedisDemoActor | queues: high, dft, low\n'
Expand Down Expand Up @@ -102,7 +102,7 @@ async def test_handle_exception(loop, caplog):
log = re.sub(r'\d{4}-\d+-\d+ \d+:\d+:\d+', '<date time>', log)
log = re.sub(r'\w{3}-\d+ \d+:\d+:\d+', '<date time2>', log)
print(log)
assert ('Initialising work manager, burst mode: True\n'
assert ('Initialising work manager, burst mode: True, creating shadows...\n'
'Running worker with 1 shadow listening to 3 queues\n'
'shadows: MockRedisDemoActor | queues: high, dft, low\n'
'recording health: <date time2> j_complete=0 j_failed=0 j_timedout=0 j_ongoing=0 q_high=0 q_dft=1 q_low=0\n'
Expand Down
2 changes: 1 addition & 1 deletion tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ async def test_separate_log_levels(mock_actor_worker, caplog):
await actor.concat(a='1', b='2')
await worker.run()
log = caplog(('0.0\d\ds', '0.0XXs'))
assert ('arq.work: Initialising work manager, burst mode: True\n'
assert ('arq.work: Initialising work manager, burst mode: True, creating shadows...\n'
'arq.work: Running worker with 1 shadow listening to 3 queues\n'
'arq.work: shadows: MockRedisDemoActor | queues: high, dft, low\n'
'arq.work: drain waiting 5.0s for 1 tasks to finish\n'
Expand Down

0 comments on commit a15ee0e

Please sign in to comment.