forked from python-arq/arq
-
Notifications
You must be signed in to change notification settings - Fork 0
/
conftest.py
116 lines (83 loc) · 2.57 KB
/
conftest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import asyncio
import functools
import os
import sys
import msgpack
import pytest
from arq.connections import ArqRedis, create_pool
from arq.worker import Worker
@pytest.fixture(name='loop')
def _fix_loop(event_loop):
return event_loop
@pytest.fixture
async def arq_redis(loop):
redis_ = ArqRedis(
host='localhost',
port=6379,
encoding='utf-8',
)
await redis_.flushall()
yield redis_
await redis_.close(close_connection_pool=True)
@pytest.fixture
async def arq_redis_msgpack(loop):
redis_ = ArqRedis(
host='localhost',
port=6379,
encoding='utf-8',
job_serializer=msgpack.packb,
job_deserializer=functools.partial(msgpack.unpackb, raw=False),
)
await redis_.flushall()
yield redis_
await redis_.close(close_connection_pool=True)
@pytest.fixture
async def worker(arq_redis):
worker_: Worker = None
def create(functions=[], burst=True, poll_delay=0, max_jobs=10, arq_redis=arq_redis, **kwargs):
nonlocal worker_
worker_ = Worker(
functions=functions, redis_pool=arq_redis, burst=burst, poll_delay=poll_delay, max_jobs=max_jobs, **kwargs
)
return worker_
yield create
if worker_:
await worker_.close()
@pytest.fixture(name='create_pool')
async def fix_create_pool(loop):
pools = []
async def create_pool_(settings, *args, **kwargs):
pool = await create_pool(settings, *args, **kwargs)
pools.append(pool)
return pool
yield create_pool_
await asyncio.gather(*[p.close(close_connection_pool=True) for p in pools])
@pytest.fixture(name='cancel_remaining_task')
def fix_cancel_remaining_task(loop):
async def cancel_remaining_task():
tasks = asyncio.all_tasks(loop)
cancelled = []
for task in tasks:
# in repr works in 3.7 where get_coro() is not available
if 'cancel_remaining_task()' not in repr(task):
cancelled.append(task)
task.cancel()
if cancelled:
print(f'Cancelled {len(cancelled)} ongoing tasks', file=sys.stderr)
await asyncio.gather(*cancelled, return_exceptions=True)
yield
loop.run_until_complete(cancel_remaining_task())
class SetEnv:
def __init__(self):
self.envars = set()
def set(self, name, value):
self.envars.add(name)
os.environ[name] = value
def clear(self):
for n in self.envars:
os.environ.pop(n)
@pytest.fixture
def env():
setenv = SetEnv()
yield setenv
setenv.clear()