-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathanswers.py
349 lines (292 loc) · 13.2 KB
/
answers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
"""The Answers object"""
import re
from collections import defaultdict
import clyngor
from clyngor import as_pyasp, parsing, utils
def naive_parsing_of_answer_set(answer_set:str, *, discard_quotes:bool=False, parse_int:bool=True, parse_args:bool=True) -> [(str, tuple)]:
"""Yield (pred, args), naively parsed from given answer set encoded as clingo output string.
Some atoms may be missing. Some others may be poorly parsed."""
# print('NAIVE_PARSING_OF_ANSWER_SET:', answer_set, f'\t keep_quotes={keep_quotes}, parse_int={parse_int}, parse_args={parse_args}')
REG_ANSWER_SET = re.compile(r'([a-z_-][a-zA-Z0-9_]*|[0-9]+|"[^"]*")(\([^)]+\))?')
for match in REG_ANSWER_SET.finditer(answer_set):
pred, args = match.groups()
assert args is None or (args.startswith('(') and args.endswith(')'))
if args and parse_args:
args = args[1:-1] # remove surrounding parens
if discard_quotes:
args = utils.remove_arguments_quotes(args)
args = tuple(
(int(arg) if parse_int and
(arg[1:] if arg.startswith('-') else arg).isnumeric() else arg)
for arg in args.split(',')
) if parse_args else args
elif args: # args should not be parsed
args = args
pred = int(pred) if pred.isnumeric() else pred # handle
yield pred, args or ()
# print('\t>', pred, args)
class Answers:
"""Proxy to the solver, generated by solving methods like solve.solve
or inline.ASP.
Iterable on the answer sets generated by the solver.
Also expose some answer set formatting tunning.
"""
def __init__(self, answers:iter, command:str='', statistics:dict={},
*, decoders:iter=(), with_optimization:bool=False, on_end:callable=None, **kwargs):
"""Answer sets must be iterable of (predicate, args).
decoders -- iterable of decoders to apply on ASP (see clyngor.decoder).
with_optimization -- answers are read as ((predicate, args), optimization, optimality)
allowing to retrieve optimization data of the answers.
See also Answers.with_optimization property.
on_end -- if callable, called when all answer sets are exhausted.
"""
if not with_optimization:
# emulate optimization with None value
# TODO: verify that in absence of optimization, all answers are ALWAYS enumerated from 1 to n.
answers = ((answer, None, False, idx) for idx, answer in enumerate(answers, start=1))
self._answers = iter(answers)
self._command = str(command or '')
self._statistics = statistics # will be updated by reading method
self._decoders = tuple(decoders)
self._first_arg_only = False
self._group_atoms = False
self._group_atoms_with_arity = False
self._as_pyasp = False
self._sorted = False
self._keep_quotes = True
# TODO: break api to add keep quotes instead of discard quotes.
self._careful_parsing = False
self._collapse_atoms= False
self._collapse_args = True
self._parse_int = True
self._ignore_args = False
self._with_optimization = False
self._with_optimality = False
self._with_answer_number = False
self.__on_end = on_end or (lambda: None)
for k, v in kwargs.items():
if isinstance(getattr(self, '_' + k, None), bool):
setattr(self, '_' + k, bool(value))
elif k == 'discard_quotes':
setattr(self, '_keep_quotes', not bool(value))
def __del__(self):
"""Call the on_end function, avoiding a too-many-file-open error.
That error happened in a recent project where clyngor
was called thousands of times, without accessing all models,
which prevented __iter__ method to ever call on_end function.
Hence the use of __del__.
"""
self.clean_resources()
@property
def command(self) -> str: return self._command
@property
def with_optimization(self):
"""Yield (model, optimization) instead of just model"""
self._with_optimization = True
return self
@property
def with_optimality(self):
"""Yield (model, optimization, optimality) instead of just model"""
self._with_optimization = True
self._with_optimality = True
return self
@property
def with_answer_number(self):
"""Yield (model, optimization, optimality, answer_number) instead of just model"""
self._with_optimization = True
self._with_optimality = True
self._with_answer_number = True
return self
@property
def first_arg_only(self):
"""Keep only the first argument, and do not enclose it in a tuple."""
self._first_arg_only = True
return self
@property
def by_predicate(self):
"""Group atoms by predicate. Answer sets are then dict with predicate
as keys and collection of args as value."""
self._group_atoms = True
return self
@property
def by_arity(self):
"""Group atoms by predicate and arity. Answer sets are then dict with
predicate/arity as keys and collection of args as value."""
self._group_atoms = True
self._group_atoms_with_arity = True
return self
@property
def as_pyasp(self):
"""Return Term and TermSet object offering a pyasp-like interface"""
self._as_pyasp = True
return self
@property
def sorted(self):
"""Sort the atom (or the args when grouped)"""
self._sorted = True
return self
@property
def keep_quotes(self):
"""Keep the quotes of the string"""
self._keep_quotes = True
return self
@property
def discard_quotes(self):
"""Discard the quotes of the string"""
self._keep_quotes = False
return self
@property
def careful_parsing(self):
"""Use robust parser"""
self._careful_parsing = True
return self
@property
def atoms_as_string(self):
"""All atoms are encoded as ASP strings, left unparsed."""
self._collapse_atoms = True
self._collapse_args = True
return self
@property
def int_not_parsed(self):
"""Do not parse the integer arguments, so if an atom have integers
as arguments, they will be returned as string, not integers.
"""
self._parse_int = False
return self
@property
def parse_args(self):
"""Parse the arguments as well, so if an atom is argument of another
one, it will be parsed as any atom instead of being understood
as a string.
Will use the robust parser.
"""
self._careful_parsing = True # needed to implement the collapse
self._collapse_atoms = False
self._collapse_args = False
return self
@property
def no_arg(self):
"""Do not parse arguments, and discard/ignore them.
"""
self._ignore_args = True
return self
def __next__(self):
return next(iter(self))
def __iter__(self):
"""Yield answer sets"""
for answer_set, optimization, optimality, answer_number in self._answers:
answer_set = tuple(self._parse_answer(answer_set))
parsed = self._format(answer_set)
if self._with_answer_number:
yield parsed, optimization, optimality, answer_number
elif self._with_optimality:
yield parsed, optimization, optimality
elif self._with_optimization:
yield parsed, optimization
else:
yield parsed
self.clean_resources()
def clean_resources(self):
self.__on_end()
self.__on_end = lambda: None # don't call it again
def _parse_answer(self, answer_set:str) -> iter:
"""Yield atoms as (pred, args) from given answer set"""
careful_parsing = self._careful_parsing or parsing.careful_parsing_required(answer_set)
# keep_quotes = self._keep_quotes or not self._collapse_atoms or not self._as_pyasp
discard_quotes = not (self._keep_quotes or self._collapse_atoms or self._as_pyasp)
if isinstance(answer_set, str) and careful_parsing:
# print('CAREFUL PARSING:', answer_set)
# _keep_quotes is incompatible with atoms_as_string and as_pyasp.
# atom_as_string: remove the quotes delimiting arguments.
# as_pyasp: remove the quotes for the arguments.
yield from parsing.Parser(
self._collapse_atoms, self._collapse_args,
discard_quotes,
self._first_arg_only,
parse_integer=self._parse_int
).parse_terms(answer_set)
elif isinstance(answer_set, str): # the good ol' split
# print('THE GOOD OLD SPLIT:', f"keep_quotes={self._keep_quotes} collapse_atoms={self._collapse_atoms}")
yield from self.__finish_parsing(naive_parsing_of_answer_set(answer_set, discard_quotes=discard_quotes, parse_int=self._parse_int, parse_args=self._collapse_args or self._first_arg_only))
elif isinstance(answer_set, (set, tuple)) and all(isinstance(atom, (str, int, tuple)) for atom in answer_set): # already parsed
# print('FROM SET OR TUPLE')
if not self._parse_int:
answer_set = utils.integers_to_string_atoms(answer_set)
yield from self.__finish_parsing(answer_set)
else: # unknown format
raise ValueError("unknow answer set format: {}, {}".format(type(answer_set), answer_set))
def __finish_parsing(self, answer_set:[(str, tuple)]) -> [(str, tuple) or str]:
"""Modify (pred, args) atoms according to parsing options"""
for pred, args in answer_set:
if self._first_arg_only:
args = [args[0]] if args else args
if self._atoms_as_string:
args = ('(' + ','.join(map(str, args)) + ')') if args else ''
yield pred + args
else:
yield pred, args
def _format(self, answer_set) -> dict or frozenset:
"""Perform the formatting of the answer set according to
formatting options.
answer_set -- iterable of (pred, args)
"""
sorted_tuple = lambda it: tuple(sorted(it))
builder = sorted_tuple if self._sorted else frozenset
if self._atoms_as_string: # special case
return builder(answer_set)
elif self._ignore_args:
answer_set = (pred for pred, _ in answer_set)
if self._group_atoms:
return {pred: frozenset() for pred in answer_set}
if self._as_pyasp:
return as_pyasp.TermSet(as_pyasp.Atom.from_tuple_repr((pred, ())) for pred in answer_set)
return builder(answer_set)
elif self._first_arg_only:
answer_set = builder((pred, args[0] if args else ())
for pred, args in answer_set)
else:
answer_set = builder((pred, tuple(args))
for pred, args in answer_set)
# NB: as_pyasp flag behave differently if group_atoms is activated
if self._group_atoms:
mapping = defaultdict(set)
for pred, args in answer_set:
if self._as_pyasp:
args = as_pyasp.Atom.from_tuple_repr((pred, args))
mapping[pred].add(args)
if self._group_atoms_with_arity:
mapping[pred, len(args)].add(args)
mapping['{}/{}'.format(pred, str(len(args)))].add(args)
if self._as_pyasp:
return {pred: as_pyasp.TermSet(args) for pred, args in mapping.items()}
else:
return {pred: builder(args) for pred, args in mapping.items()}
elif self._as_pyasp:
return as_pyasp.TermSet(as_pyasp.Atom.from_tuple_repr(atom) for atom in answer_set)
return answer_set
@property
def _atoms_as_string(self) -> bool:
"""Shortcut"""
return self._collapse_atoms and self._collapse_args
@property
def statistics(self) -> dict:
return dict(self._statistics)
class ClingoAnswers(Answers):
"""Proxy to the solver as called through the python clingo module.
"""
def __init__(self, solver, statistics:callable=(lambda: {})):
assert clyngor.have_clingo_module()
super().__init__(self.__compute_answers(), with_optimization=True, command='[clingo module call]')
self._solver = solver
self._statistics = lambda s=solver: s.statistics
assert callable(self._statistics)
def __compute_answers(self):
kwargs = {'yield_': True, 'async_': True} # compat with 3.7
with self._solver.solve(**kwargs) as models:
for model in models:
answer_set = set(utils.clingo_symbol_as_python_value(a)
for a in model.symbols(shown=True))
yield answer_set, model.cost, model.optimality_proven, model.number
@property
def statistics(self) -> dict:
return self._statistics()