-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathutils.py
261 lines (204 loc) · 8.64 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
"""Various high level definition for client"""
import os
import tempfile
import functools
from clyngor import parsing
try:
import clingo
except ImportError:
clingo = None
class ASPSyntaxError(SyntaxError):
"""This is a SyntaxError, but without the filename at the end of the
string representation, and with a payload attached.
Used by solving module to indicate problems in generated files.
"""
def __init__(self, *args, payload:dict, **kwargs):
super().__init__(*args, **kwargs)
self.payload = payload
def __str__(self):
return self.msg
class ASPWarning(ValueError):
"""This is a ValueError, with a payload attached to it"""
def __init__(self, msg:str, payload:dict):
super().__init__(msg)
self.payload = payload
self.atom = payload['atom']
def parse_clingo_output(clingo_output:[str]):
"Yield answer sets found in given clingo output"
yield from (answer for anstype, answer
in parsing.Parser().parse_clasp_output(clingo_output)
if anstype == 'answer')
def make_hashable(val):
"""Convert lists and sets into tuples and frozensets
>>> make_hashable(2)
2
>>> make_hashable('2')
'2'
>>> make_hashable([1, ({2, 3}, [4, 5], {(6, 7): [8, 9]})])
(1, (frozenset({2, 3}), (4, 5), {(6, 7): (8, 9)}))
"""
if isinstance(val, (tuple, list)):
return tuple(map(make_hashable, val))
elif isinstance(val, (frozenset, set)):
return frozenset(map(make_hashable, val))
elif isinstance(val, dict):
return {make_hashable(k): make_hashable(v) for k, v in val.items()}
return val
def remove_arguments_quotes(arguments:str):
"""Remove quotes at the beginning and at the end
of the given arguments in ASP format.
This function is used by the weak parser, and therefore
does not need to meet robustness criterion.
>>> remove_arguments_quotes('a,b')
'a,b'
>>> remove_arguments_quotes('"a","b"')
'a,b'
>>> remove_arguments_quotes('"a",b')
'a,b'
"""
def is_quoted(arg:str) -> bool:
return (isinstance(arg, str) and len(arg) >= 2
and arg[0] == '"' and arg[-1] == '"' and arg[-2] != '\\')
return ','.join(arg[1:-1] if is_quoted(arg) else arg
for arg in arguments.split(','))
def clingo_value_to_python(value:object) -> int or str or tuple:
"""Convert a clingo.Symbol object to the python equivalent"""
if isinstance(value, (int, str)):
return value
elif isinstance(value, (tuple, list)):
return tuple(map(clingo_value_to_python, value))
elif type(value).__name__ == 'Symbol':
try:
typename = str(value.type).lower()
if typename == 'function':
if value.arguments:
pyvalue = (value.name, tuple(map(clingo_value_to_python, value.arguments)))
else:
pyvalue = value.name
else:
pyvalue = getattr(value, typename)
except AttributeError as err: # inf or sup
if value.type == value.type.Infimum:
return -math.inf
elif value.type == value.type.Supremum:
return math.inf
else:
raise err
if typename == 'string':
pyvalue = '"' + pyvalue.replace('"', '\\"') + '"'
return pyvalue
raise TypeError("Can't handle values like {} of type {}."
"".format(value, type(value)))
def answer_set_to_str(answer_set:iter, atom_end:str='', atom_sep:str=' ') -> str:
"""Returns the string representation of given answer set.
answer_set -- iterable of tuple (predicate, args)
atom_sep -- string joining the atoms
"""
return atom_sep.join(generate_answer_set_as_str(answer_set, atom_end=atom_end))
def generate_answer_set_as_str(answer_set:iter, atom_end:str='') -> iter:
"""Yield segment of string describing given answer set.
answer_set -- iterable of tuple (predicate, args), or dict {predicate: [args]}
atom_end -- string added to the end of each given atom.
>>> '.'.join(generate_answer_set_as_str((('a', (1, 2)), ('b', ()))))
'a(1,2).b'
>>> ''.join(generate_answer_set_as_str((('a', (1, 2)), ('b', ())), atom_end='.'))
'a(1,2).b.'
>>> '1'.join(generate_answer_set_as_str((('a', (1, 2)), ('b', ())), atom_end='2'))
'a(1,2)21b2'
"""
if isinstance(answer_set, dict): # have been created using by_predicate, probably
answer_set = (
(predicate, args)
for predicate, all_args in answer_set.items()
for args in all_args
)
template = '{}({})' + str(atom_end)
for predicate, args in answer_set:
if args:
yield template.format(predicate, ','.join(map(str, args)))
else:
yield predicate + str(atom_end)
def answer_set_from_str(line:str, collapse_atoms:bool=False,
collapse_args:bool=True, parse_integer:bool=True) -> iter:
"""Yield atoms found in given string.
line -- parsable string containing an answer set
collapse_atoms -- whole atoms are left unparsed
collapse_args -- atoms args are left unparsed
parse_integer -- integers are returned as python int objects
>>> tuple(sorted(answer_set_from_str('a b c d', True)))
('a', 'b', 'c', 'd')
>>> tuple(sorted((answer_set_from_str('a b(a) c("text") d', True))))
('a', 'b(a)', 'c("text")', 'd')
"""
yield from parsing.Parser(
collapse_atoms=collapse_atoms,
collapse_args=collapse_args,
parse_integer=parse_integer
).parse_terms(line)
def save_answers_in_file(answers:iter, filename:str or None=None,
atom_separator:str=' ',
answer_separator:str='\n', end:str='') -> str:
"""Return the name of the file in which the answer sets are written.
answers -- iterable of answer set to write
filename -- file to write ; if None, a temporary file will be created
atom_separator -- string placed between each atom
answer_separator -- string placed between each answer set
end -- string written at the end
"""
if not filename:
with tempfile.NamedTemporaryFile('w', delete=False) as ofd:
filename = ofd.name
with open(filename, 'w') as ofd:
ofd.write(answer_separator.join(answer_set_to_str(answer, atom_sep=atom_separator)
for answer in answers) + end)
return filename
def load_answers_from_file(filename:str, answer_set_builder:type=frozenset) -> iter:
"""Yield answer set found in each line of given file"""
with open(filename) as ifd:
yield from (
answer_set_builder(answer_set_from_str(line))
for line in ifd
)
def cleaned_path(path:str, error_if_invalid:bool=True) -> str:
"""Return the same path, but cleaned with user expension and absolute"""
path = os.path.abspath(os.path.expanduser(path))
if error_if_invalid and not os.path.exists(path):
open(path) # will raise FileExistsError
return path
def with_clingo_bin(clingo_bin:str) -> callable:
"""Generate a wrapper where, during func call, the CLINGO_BIN_PATH clyngor
variable is set of *clingo_bin*.
Example:
@with_clingo_bin('~/bin/clingo-3.2.1')
def solve_problem(...):
clyngor.solve(...)
Not thread safe.
"""
def wrapper(func):
@functools.wraps(func)
def wrapped(*args, **kwargs):
import clyngor
regular_bin, clyngor.CLINGO_BIN_PATH = clyngor.CLINGO_BIN_PATH, clingo_bin
ret = func(*args, **kwargs)
clyngor.CLINGO_BIN_PATH = regular_bin
return ret
return wrapped
return wrapper
def opt_models_from_clyngor_answers(answers:iter, *, repeated_optimal:bool=True):
"""Return tuple of optimal models found by clingor.solve from answers.
This function assumes that:
- Option '--opt-mode=optN' have been given to clingo.
- that models are yielded by clingo in increasing optimization value,
therefore there is no need to verify that a model B succeeding a model A
is better if they have different optimization value.
- that the first found optimal model will be given a second time, unless option repeated_optimal is set to False.
"""
current_opt, models = None, []
for model, opt in answers.with_optimization:
if opt != current_opt:
current_opt, models = opt, []
if not repeated_optimal: # the first optimal model will not be given again as last model
models.append(model)
else:
models.append(model)
return tuple(models)