forked from python-arq/arq
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_jobs.py
177 lines (143 loc) · 5.81 KB
/
test_jobs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
import asyncio
import pickle
import pytest
from dirty_equals import IsNow
from arq import Worker, func
from arq.connections import ArqRedis, RedisSettings, create_pool
from arq.constants import default_queue_name, in_progress_key_prefix, job_key_prefix, result_key_prefix
from arq.jobs import DeserializationError, Job, JobResult, JobStatus, deserialize_job_raw, serialize_result
async def test_job_in_progress(arq_redis: ArqRedis):
await arq_redis.set(in_progress_key_prefix + 'foobar', b'1')
j = Job('foobar', arq_redis)
assert JobStatus.in_progress == await j.status()
assert str(j) == '<arq job foobar>'
async def test_unknown(arq_redis: ArqRedis):
j = Job('foobar', arq_redis)
assert JobStatus.not_found == await j.status()
info = await j.info()
assert info is None
async def test_result_timeout(arq_redis: ArqRedis):
j = Job('foobar', arq_redis)
with pytest.raises(asyncio.TimeoutError):
await j.result(0.1, poll_delay=0)
async def test_enqueue_job(arq_redis: ArqRedis, worker, queue_name=default_queue_name):
async def foobar(ctx, *args, **kwargs):
return 42
j = await arq_redis.enqueue_job('foobar', 1, 2, c=3, _queue_name=queue_name)
assert isinstance(j, Job)
assert JobStatus.queued == await j.status()
worker: Worker = worker(functions=[func(foobar, name='foobar')], queue_name=queue_name)
await worker.main()
r = await j.result(poll_delay=0)
assert r == 42
assert JobStatus.complete == await j.status()
info = await j.info()
expected_queue_name = queue_name or arq_redis.default_queue_name
assert info == JobResult(
job_try=1,
function='foobar',
args=(1, 2),
kwargs={'c': 3},
enqueue_time=IsNow(tz='utc'),
success=True,
result=42,
start_time=IsNow(tz='utc'),
finish_time=IsNow(tz='utc'),
score=None,
queue_name=expected_queue_name,
)
results = await arq_redis.all_job_results()
assert results == [
JobResult(
function='foobar',
args=(1, 2),
kwargs={'c': 3},
job_try=1,
enqueue_time=IsNow(tz='utc'),
success=True,
result=42,
start_time=IsNow(tz='utc'),
finish_time=IsNow(tz='utc'),
score=None,
queue_name=expected_queue_name,
job_id=j.job_id,
)
]
async def test_enqueue_job_alt_queue(arq_redis: ArqRedis, worker):
await test_enqueue_job(arq_redis, worker, queue_name='custom_queue')
async def test_enqueue_job_nondefault_queue(worker):
"""Test initializing arq_redis with a queue name, and the worker using it."""
arq_redis = await create_pool(RedisSettings(), default_queue_name='test_queue')
await test_enqueue_job(
arq_redis,
lambda functions, **_: worker(functions=functions, arq_redis=arq_redis, queue_name=None),
queue_name=None,
)
async def test_cant_unpickle_at_all():
class Foobar:
def __getstate__(self):
raise TypeError("this doesn't pickle")
r1 = serialize_result('foobar', (1,), {}, 1, 123, True, Foobar(), 123, 123, 'testing', 'test-queue')
assert isinstance(r1, bytes)
r2 = serialize_result('foobar', (Foobar(),), {}, 1, 123, True, Foobar(), 123, 123, 'testing', 'test-queue')
assert r2 is None
async def test_custom_serializer():
class Foobar:
def __getstate__(self):
raise TypeError("this doesn't pickle")
def custom_serializer(x):
return b'0123456789'
r1 = serialize_result(
'foobar', (1,), {}, 1, 123, True, Foobar(), 123, 123, 'testing', 'test-queue', serializer=custom_serializer
)
assert r1 == b'0123456789'
r2 = serialize_result(
'foobar',
(Foobar(),),
{},
1,
123,
True,
Foobar(),
123,
123,
'testing',
'test-queue',
serializer=custom_serializer,
)
assert r2 == b'0123456789'
async def test_deserialize_result(arq_redis: ArqRedis, worker):
async def foobar(ctx, a, b):
return a + b
j = await arq_redis.enqueue_job('foobar', 1, 2)
assert JobStatus.queued == await j.status()
worker: Worker = worker(functions=[func(foobar, name='foobar')])
await worker.run_check()
assert await j.result(poll_delay=0) == 3
assert await j.result(poll_delay=0) == 3
info = await j.info()
assert info.args == (1, 2)
await arq_redis.set(result_key_prefix + j.job_id, b'invalid pickle data')
with pytest.raises(DeserializationError, match='unable to deserialize job result'):
assert await j.result(poll_delay=0) == 3
async def test_deserialize_info(arq_redis: ArqRedis):
j = await arq_redis.enqueue_job('foobar', 1, 2)
assert JobStatus.queued == await j.status()
await arq_redis.set(job_key_prefix + j.job_id, b'invalid pickle data')
with pytest.raises(DeserializationError, match='unable to deserialize job'):
assert await j.info()
async def test_deserialize_job_raw():
assert deserialize_job_raw(pickle.dumps({'f': 1, 'a': 2, 'k': 3, 't': 4, 'et': 5})) == (1, 2, 3, 4, 5)
with pytest.raises(DeserializationError, match='unable to deserialize job'):
deserialize_job_raw(b'123')
async def test_get_job_result(arq_redis: ArqRedis):
with pytest.raises(KeyError, match='job "foobar" not found'):
await arq_redis._get_job_result(b'foobar')
async def test_result_pole_delay_dep(arq_redis: ArqRedis):
j = Job('foobar', arq_redis)
r = serialize_result('foobar', (1,), {}, 1, 123, True, 42, 123, 123, 'testing', 'test-queue')
await arq_redis.set(result_key_prefix + j.job_id, r)
with pytest.warns(
DeprecationWarning, match='"pole_delay" is deprecated, use the correct spelling "poll_delay" instead'
):
assert await j.result(pole_delay=0) == 42