Skip to content

Commit

Permalink
move JobSerialisationError raise to jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
samuelcolvin committed Sep 18, 2016
1 parent c9ac80f commit efb64c8
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 9 deletions.
10 changes: 7 additions & 3 deletions arq/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from .utils import ellipsis, from_unix_ms, timestamp, to_unix_ms

__all__ = ['JobSerialisationError', 'Job']
__all__ = ['JobSerialisationError', 'Job', 'DatetimeJob']


class JobSerialisationError(Exception):
Expand Down Expand Up @@ -55,7 +55,10 @@ def encode(cls, *, queued_at: int=None, class_name: str, func_name: str,
:param kwargs: key word arguments to pass to the function
"""
queued_at = queued_at or int(timestamp() * 1000)
return cls._encode([queued_at, class_name, func_name, args, kwargs])
try:
return cls._encode([queued_at, class_name, func_name, args, kwargs])
except TypeError as e:
raise JobSerialisationError(str(e)) from e

@classmethod
def _encode(cls, data) -> bytes:
Expand Down Expand Up @@ -99,7 +102,8 @@ def msgpack_encoder(obj):
if tz is not None:
result[TIMEZONE] = tz
return result
return obj
else:
return obj

@staticmethod
def msgpack_object_hook(obj):
Expand Down
9 changes: 3 additions & 6 deletions arq/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import logging
from functools import wraps

from .jobs import Job, JobSerialisationError
from .jobs import Job
from .utils import RedisMixin

__all__ = ['Actor', 'concurrent']
Expand Down Expand Up @@ -91,11 +91,8 @@ async def enqueue_job(self, func_name: str, *args, queue: str=None, **kwargs):
:param kwargs: key word arguments to pass to the function
"""
queue = queue or self.DEFAULT_QUEUE
try:
data = self.job_class.encode(class_name=self.name, func_name=func_name, # type: ignore
args=args, kwargs=kwargs) # type: ignore
except TypeError as e:
raise JobSerialisationError(str(e)) from e
data = self.job_class.encode(class_name=self.name, func_name=func_name, # type: ignore
args=args, kwargs=kwargs) # type: ignore
main_logger.debug('%s.%s ▶ %s', self.name, func_name, queue)

# use the pool directly rather than get_redis_conn to avoid one extra await
Expand Down

0 comments on commit efb64c8

Please sign in to comment.