-
Notifications
You must be signed in to change notification settings - Fork 151
/
Copy pathstrategies.py
87 lines (67 loc) · 2.66 KB
/
strategies.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import asyncio
from copy import copy
from sqlalchemy.engine import url
from sqlalchemy import util
from sqlalchemy.engine.strategies import EngineStrategy
from .engine import GinoEngine
class GinoStrategy(EngineStrategy):
"""A SQLAlchemy engine strategy for GINO.
This strategy is initialized automatically as :mod:`gino` is imported.
If :func:`sqlalchemy.create_engine` uses ``strategy="gino"``, it will return a
:class:`~collections.abc.Coroutine`, and treat URL prefix ``postgresql://`` or
``postgres://`` as ``postgresql+asyncpg://``.
"""
name = "gino"
engine_cls = GinoEngine
async def create(self, name_or_url, loop=None, **kwargs):
engine_cls = self.engine_cls
u = url.make_url(name_or_url)
if loop is None:
loop = asyncio.get_event_loop()
if u.drivername in {"postgresql", "postgres"}:
u = copy(u)
u.drivername = "postgresql+asyncpg"
elif u.drivername in {"mysql"}:
u = copy(u)
u.drivername = "mysql+aiomysql"
dialect_cls = u.get_dialect()
pop_kwarg = kwargs.pop
dialect_args = {}
# consume dialect arguments from kwargs
for k in util.get_cls_kwargs(dialect_cls).union(
getattr(dialect_cls, "init_kwargs", set())
):
if k in kwargs:
dialect_args[k] = pop_kwarg(k)
kwargs.pop("module", None) # unused
dbapi_args = {}
for k in util.get_func_kwargs(dialect_cls.dbapi):
if k in kwargs:
dbapi_args[k] = pop_kwarg(k)
dbapi = dialect_cls.dbapi(**dbapi_args)
dialect_args["dbapi"] = dbapi
dialect = dialect_cls(**dialect_args)
pool_class = kwargs.pop("pool_class", None)
pool = await dialect.init_pool(u, loop, pool_class=pool_class)
engine_args = dict(loop=loop)
for k in util.get_cls_kwargs(engine_cls):
if k in kwargs:
engine_args[k] = pop_kwarg(k)
# all kwargs should be consumed
if kwargs:
await pool.close()
raise TypeError(
"Invalid argument(s) %s sent to create_engine(), "
"using configuration %s/%s. Please check that the "
"keyword arguments are appropriate for this combination "
"of components."
% (
",".join("'%s'" % k for k in kwargs),
dialect_cls.__name__,
engine_cls.__name__,
)
)
engine = engine_cls(dialect, pool, **engine_args)
dialect_cls.engine_created(engine)
return engine
GinoStrategy()