forked from locationlabs/mockredis
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathscript.py
151 lines (135 loc) · 5.08 KB
/
script.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
class Script(object):
"""
An executable LUA script object returned by ``MockRedis.register_script``.
"""
def __init__(self, registered_client, script):
self.registered_client = registered_client
self.script = script
self.sha = registered_client.script_load(script)
def __call__(self, keys=[], args=[], client=None):
"""Execute the script, passing any required ``args``"""
client = client or self.registered_client
if not client.script_exists(self.sha)[0]:
self.sha = client.script_load(self.script)
return self._execute_lua(keys, args, client)
def _execute_lua(self, keys, args, client):
"""
Sets KEYS and ARGV alongwith redis.call() function in lua globals
and executes the lua redis script
"""
lua, lua_globals = Script._import_lua()
lua_globals.KEYS = self._python_to_lua(keys)
lua_globals.ARGV = self._python_to_lua(args)
def _call(*call_args):
response = client.call(*call_args)
return self._python_to_lua(response)
lua_globals.redis = {"call": _call}
return self._lua_to_python(lua.execute(self.script))
@staticmethod
def _import_lua():
"""
Import lua and dependencies.
:raises: RuntimeError if LUA is not available
"""
try:
import lua
except ImportError:
raise RuntimeError("LUA not installed")
lua_globals = lua.globals()
Script._import_lua_dependencies(lua, lua_globals)
return lua, lua_globals
@staticmethod
def _import_lua_dependencies(lua, lua_globals):
"""
Imports lua dependencies that are supported by redis lua scripts.
Included:
- cjson lib.
Pending:
- base lib.
- table lib.
- string lib.
- math lib.
- debug lib.
- cmsgpack lib.
"""
import ctypes
ctypes.CDLL('liblua5.2.so', mode=ctypes.RTLD_GLOBAL)
try:
lua_globals.cjson = lua.eval('require "cjson"')
except RuntimeError:
raise RuntimeError("cjson not installed")
@staticmethod
def _lua_to_python(lval):
"""
Convert Lua object(s) into Python object(s), as at times Lua object(s)
are not compatible with Python functions
"""
import lua
lua_globals = lua.globals()
if lval is None:
# Lua None --> Python None
return None
if lua_globals.type(lval) == "table":
# Lua table --> Python list
pval = []
for i in lval:
pval.append(Script._lua_to_python(lval[i]))
return pval
elif isinstance(lval, long):
# Lua number --> Python long
return long(lval)
elif isinstance(lval, float):
# Lua number --> Python float
return float(lval)
elif lua_globals.type(lval) == "userdata":
# Lua userdata --> Python string
return str(lval)
elif lua_globals.type(lval) == "string":
# Lua string --> Python string
return lval
elif lua_globals.type(lval) == "boolean":
# Lua boolean --> Python bool
return bool(lval)
raise RuntimeError("Invalid Lua type: " + str(lua_globals.type(lval)))
@staticmethod
def _python_to_lua(pval):
"""
Convert Python object(s) into Lua object(s), as at times Python object(s)
are not compatible with Lua functions
"""
import lua
if pval is None:
# Python None --> Lua None
return lua.eval("")
if isinstance(pval, (list, tuple, set)):
# Python list --> Lua table
# e.g.: in lrange
# in Python returns: [v1, v2, v3]
# in Lua returns: {v1, v2, v3}
lua_list = lua.eval("{}")
lua_table = lua.eval("table")
for item in pval:
lua_table.insert(lua_list, Script._python_to_lua(item))
return lua_list
elif isinstance(pval, dict):
# Python dict --> Lua dict
# e.g.: in hgetall
# in Python returns: {k1:v1, k2:v2, k3:v3}
# in Lua returns: {k1, v1, k2, v2, k3, v3}
lua_dict = lua.eval("{}")
lua_table = lua.eval("table")
for k, v in pval.iteritems():
lua_table.insert(lua_dict, Script._python_to_lua(k))
lua_table.insert(lua_dict, Script._python_to_lua(v))
return lua_dict
elif isinstance(pval, str):
# Python string --> Lua userdata
return pval
elif isinstance(pval, bool):
# Python bool--> Lua boolean
return lua.eval(str(pval).lower())
elif isinstance(pval, (int, long, float)):
# Python int --> Lua number
lua_globals = lua.globals()
return lua_globals.tonumber(str(pval))
raise RuntimeError("Invalid Python type: " + str(type(pval)))