Skip to content

Commit

Permalink
scheduled tasks (#50)
Browse files Browse the repository at this point in the history
* splitting Concurrent to add Bindable class

* working on next_datetime

* next_cron working

* add weekday to next_cron

* cleaning up next_cron

* add microseconds to next_cron

* adding cron decorator

* adding cron to actors

* cron working

* cron tests with mocked now

* adding unique option to cron and tests

* configure isort and uprev
  • Loading branch information
samuelcolvin authored Jun 22, 2017
1 parent 3c0d7f0 commit 5cc3bf5
Show file tree
Hide file tree
Showing 10 changed files with 489 additions and 43 deletions.
1 change: 1 addition & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ History
v0.9.0 (2017-XX-XX)
...................
* allow set encoding in msgpack for jobs #49
* cron tasks allowing scheduling of functions in the future #50

v0.8.1 (2017-06-05)
...................
Expand Down
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ lint:
test:
TZ=Asia/Singapore pytest --cov=arq && coverage combine

.PHONY: .test-build-cov
.test-build-cov:
.PHONY: testcov
testcov:
TZ=Asia/Singapore pytest --cov=arq && (echo "building coverage html"; coverage combine; coverage html)

.PHONY: all
all: .test-build-cov lint
all: testcov lint

.PHONY: clean
clean:
Expand Down
195 changes: 163 additions & 32 deletions arq/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,19 @@
===========
Defines the main ``Actor`` class and ``@concurrent`` decorator for using arq from within your code.
Also defines the ``@cron`` decorator for declaring cron job functions.
"""
import asyncio
import inspect
import logging
from datetime import datetime
from typing import Any, Callable, Dict, List, Union # noqa

from .jobs import Job
from .utils import RedisMixin
from .utils import RedisMixin, next_cron, to_unix_ms

__all__ = ['Actor', 'concurrent']
__all__ = ['Actor', 'concurrent', 'cron']

main_logger = logging.getLogger('arq.main')

Expand Down Expand Up @@ -45,6 +50,8 @@ class Actor(RedisMixin, metaclass=ActorMeta):
#: prefix prepended to all queue names to create the list keys in redis
QUEUE_PREFIX = b'arq:q:'

CRON_SENTINEL_PREFIX = b'arq:cron:'

#: if not None this name is used instead of the class name when encoding and referencing jobs,
#: if None the class's name is used
name: str = None
Expand Down Expand Up @@ -72,7 +79,7 @@ def __init__(self, *args, worker=None, concurrency_enabled=True, **kwargs):
self.name = self.name or self.__class__.__name__
self.worker = worker
self.is_shadow = bool(worker)
self._bind_concurrent()
self.con_jobs: List[CronJob] = list(self._bind_decorators())
self._concurrency_enabled = concurrency_enabled
super().__init__(*args, **kwargs)

Expand All @@ -88,10 +95,13 @@ async def shutdown(self):
"""
pass

def _bind_concurrent(self):
def _bind_decorators(self):
for attr_name in dir(self.__class__):
v = getattr(self.__class__, attr_name)
isinstance(v, Concurrent) and v.bind(self)
if isinstance(v, Bindable):
new_v = v.bind(self)
if isinstance(v, CronJob):
yield new_v

async def enqueue_job(self, func_name: str, *args, queue: str=None, **kwargs):
"""
Expand All @@ -107,19 +117,59 @@ async def enqueue_job(self, func_name: str, *args, queue: str=None, **kwargs):
:param kwargs: key word arguments to pass to the function
"""
queue = queue or self.DEFAULT_QUEUE
data = self.job_class.encode(class_name=self.name, func_name=func_name, args=args, kwargs=kwargs)
main_logger.debug('%s.%s ▶ %s', self.name, func_name, queue)

if self._concurrency_enabled:
queue_list = self.queue_lookup[queue]
# use the pool directly rather than get_redis_conn to avoid one extra await
pool = self._redis_pool or await self.get_redis_pool()
main_logger.debug('%s.%s ▶ %s', self.name, func_name, queue)
async with pool.get() as redis:
await redis.rpush(queue_list, data)
await self.job_future(redis, queue, func_name, *args, **kwargs)
else:
main_logger.debug('%s.%s ▶ %s (called directly)', self.name, func_name, queue)
data = self.job_class.encode(class_name=self.name, func_name=func_name, args=args, kwargs=kwargs)
j = self.job_class(data, queue_name=queue)
await getattr(self, j.func_name).direct(*j.args, **j.kwargs)

def job_future(self, redis, queue: str, func_name: str, *args, **kwargs):
return redis.rpush(
self.queue_lookup[queue],
self.job_class.encode(class_name=self.name, func_name=func_name, args=args, kwargs=kwargs),
)

def _now(self):
# allow easier mocking
return datetime.now()

async def run_cron(self):
n = self._now()
pool = self._redis_pool or await self.get_redis_pool()
to_run = set()

for cron_job in self.con_jobs:
if n >= cron_job.next_run:
to_run.add((cron_job, cron_job.next_run))
cron_job.set_next(n)

if not to_run:
return

main_logger.debug('cron, %d jobs to run', len(to_run))
async with pool.get() as redis:
job_futures = set()
for cron_job, run_at in to_run:
if cron_job.unique:
sentinel_key = self.CRON_SENTINEL_PREFIX + f'{self.name}.{cron_job.__name__}'.encode()
sentinel_value = str(to_unix_ms(run_at)[0]).encode()
v, _ = await asyncio.gather(
redis.getset(sentinel_key, sentinel_value),
redis.expire(sentinel_key, 3600),
)
if v == sentinel_value:
# if v is equal to sentinel value, another worker has already set it and is doing this cron run
continue
job_futures.add(self.job_future(redis, cron_job.dft_queue or self.DEFAULT_QUEUE, cron_job.__name__))

job_futures and await asyncio.gather(*job_futures)

async def close(self, shutdown=False):
"""
Close down the actor, eg. close the associated redis pool, optionally also calling shutdown.
Expand All @@ -135,24 +185,15 @@ def __repr__(self):
return f'<{self.__class__.__name__}({self.name}) at 0x{id(self):02x}>'


class Concurrent:
"""
Class used to describe a concurrent function. This is what the ``@concurrent`` decorator returns.
You shouldn't have to use this directly, but instead apply the ``@concurrent`` decorator
"""
__slots__ = ['_func', '_dft_queue', '_self_obj']

def __init__(self, *, func, dft_queue=None, self_obj=None):
self._self_obj = self_obj
class Bindable:
def __init__(self, *, func, self_obj=None, **kwargs):
self._self_obj: Actor = self_obj
# if we're already bound we assume func is of the correct type and skip repeat logging
if not self.bound:
if not inspect.iscoroutinefunction(func):
raise TypeError(f'{func.__qualname__} is not a coroutine function')

main_logger.debug('registering concurrent function %s', func.__qualname__)
self._func = func
self._dft_queue = dft_queue
if not self.bound and not inspect.iscoroutinefunction(func):
raise TypeError(f'{func.__qualname__} is not a coroutine function')
self._func: Callable = func
self._dft_queue: str = kwargs['dft_queue']
self._kwargs: Dict[str, Any] = kwargs

def bind(self, obj: object):
"""
Expand All @@ -161,13 +202,18 @@ def bind(self, obj: object):
:param obj: object to bind the function to eg. "self" in the eyes of func.
"""
new_inst = Concurrent(func=self._func, dft_queue=self._dft_queue, self_obj=obj)
new_inst = self.__class__(func=self._func, self_obj=obj, **self._kwargs)
setattr(obj, self._func.__name__, new_inst)
return new_inst

@property
def bound(self):
return self._self_obj is not None

@property
def dft_queue(self):
return self._dft_queue

async def __call__(self, *args, **kwargs):
return await self.defer(*args, **kwargs)

Expand All @@ -177,14 +223,28 @@ async def defer(self, *args, queue_name=None, **kwargs):
async def direct(self, *args, **kwargs):
return await self._func(self._self_obj, *args, **kwargs)

@property
def __doc__(self):
return self._func.__doc__

@property
def __name__(self):
return self._func.__name__


class Concurrent(Bindable):
"""
Class used to describe a concurrent function. This is what the ``@concurrent`` decorator returns.
You shouldn't have to use this directly, but instead apply the ``@concurrent`` decorator
"""
__slots__ = '_func', '_dft_queue', '_self_obj', '_kwargs'

def __init__(self, *, func, self_obj=None, dft_queue=None):
super().__init__(func=func, self_obj=self_obj, dft_queue=dft_queue)
if not self.bound:
main_logger.debug('registering concurrent function %s', func.__qualname__)

@property
def __doc__(self):
return self._func.__doc__

def __repr__(self):
return f'<concurrent function {self._func.__qualname__} of {self._self_obj!r}>'

Expand All @@ -201,3 +261,74 @@ def concurrent(func_or_queue):
return lambda f: Concurrent(func=f, dft_queue=func_or_queue)
else:
return Concurrent(func=func_or_queue)


class CronJob(Bindable):
__slots__ = '_func', '_dft_queue', '_self_obj', '_kwargs', 'run_at_startup', 'unique', 'cron_kwargs', 'next_run'

def __init__(self, *, func, self_obj=None, **kwargs):
super().__init__(func=func, self_obj=self_obj, **kwargs)
if not self.bound:
main_logger.debug('registering cron function %s', func.__qualname__)
kwargs2 = kwargs.copy()
self.run_at_startup = kwargs2.pop('run_at_startup')
self.unique = kwargs2.pop('unique')
kwargs2.pop('dft_queue')
self.cron_kwargs = kwargs2
self.next_run = None
if self.bound:
now = self._self_obj._now()
if self.run_at_startup:
self.next_run = now
else:
self.set_next(now)

def set_next(self, dt: datetime):
self.next_run = next_cron(dt, **self.cron_kwargs)

def __repr__(self):
return f'<cron function {self._func.__qualname__} of {self._self_obj!r}>'


def cron(*,
dft_queue=None,
run_at_startup=False,
unique=True,
month: Union[None, set, int]=None,
day: Union[None, set, int]=None,
weekday: Union[None, set, int, str]=None,
hour: Union[None, set, int]=None,
minute: Union[None, set, int]=None,
second: Union[None, set, int]=0,
microsecond: int=123456):
"""
Decorator which defines a functions as a cron job, eg. it should be executed at specific times...
If you wish to call the function directly you can access the original function at ``<func>.direct``.
:param dft_queue: default queue to use
:param run_at_startup: whether to run as worker starts
:param unique: whether the job should be only be executed on one worker.
:param month: month(s) to run the job on, 1 - 12
:param day: day(s) to run the job on, 1 - 31
:param weekday: week day(s) to run the job on, 0 - 6 or mon - sun
:param hour: hour(s) to run the job on, 0 - 23
:param minute: minute(s) to run the job on, 0 - 59
:param second: second(s) to run the job on, 0 - 59
:param microsecond: microsecond(s) to run the job on,
defaults to 123456 as the world is busier at the top of a second 0 - 1e6
"""

return lambda f: CronJob(
func=f,
dft_queue=dft_queue,
run_at_startup=run_at_startup,
unique=unique,
month=month,
day=day,
weekday=weekday,
hour=hour,
minute=minute,
second=second,
microsecond=microsecond)
81 changes: 80 additions & 1 deletion arq/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from aioredis.pool import RedisPool
from async_timeout import timeout

__all__ = ['RedisSettings', 'RedisMixin']
__all__ = ['RedisSettings', 'RedisMixin', 'next_cron']
logger = logging.getLogger('arq.utils')


Expand Down Expand Up @@ -186,3 +186,82 @@ def ellipsis(s: str, length: int=DEFAULT_CURTAIL) -> str:
if len(s) > length:
s = s[:length - 1] + '…'
return s


_dt_fields = [
'month',
'day',
'weekday',
'hour',
'minute',
'second',
'microsecond',
]


def _get_next_dt(dt_, options): # noqa: C901
for field in _dt_fields:
v = options[field]
if v is None:
continue
if field == 'weekday':
next_v = dt_.weekday()
else:
next_v = getattr(dt_, field)
if isinstance(v, int):
mismatch = next_v != v
else:
assert isinstance(v, (set, list, tuple))
mismatch = next_v not in v
# print(field, v, next_v, mismatch)
if mismatch:
micro = max(dt_.microsecond - options['microsecond'], 0)
if field == 'month':
if dt_.month == 12:
return datetime(dt_.year + 1, 1, 1)
else:
return datetime(dt_.year, dt_.month + 1, 1)
elif field in ('day', 'weekday'):
return dt_ + timedelta(days=1) - timedelta(hours=dt_.hour, minutes=dt_.minute, seconds=dt_.second,
microseconds=micro)
elif field == 'hour':
return dt_ + timedelta(hours=1) - timedelta(minutes=dt_.minute, seconds=dt_.second, microseconds=micro)
elif field == 'minute':
return dt_ + timedelta(minutes=1) - timedelta(seconds=dt_.second, microseconds=micro)
elif field == 'second':
return dt_ + timedelta(seconds=1) - timedelta(microseconds=micro)
else:
assert field == 'microsecond'
return dt_ + timedelta(microseconds=options['microsecond'] - dt_.microsecond)


def next_cron(preview_dt: datetime, *,
month: Union[None, set, int]=None,
day: Union[None, set, int]=None,
weekday: Union[None, set, int, str]=None,
hour: Union[None, set, int]=None,
minute: Union[None, set, int]=None,
second: Union[None, set, int]=0,
microsecond: int=123456):
"""
Find the next datetime matching the given parameters.
"""
dt = preview_dt + timedelta(seconds=1)
if isinstance(weekday, str):
weekday = ['mon', 'tues', 'wed', 'thurs', 'fri', 'sat', 'sun'].index(weekday.lower())
options = dict(
month=month,
day=day,
weekday=weekday,
hour=hour,
minute=minute,
second=second,
microsecond=microsecond,
)

while True:
next_dt = _get_next_dt(dt, options)
# print(dt, next_dt)
if next_dt is None:
return dt
dt = next_dt
Loading

0 comments on commit 5cc3bf5

Please sign in to comment.