"""The Answers object""" import re from collections import defaultdict import clyngor from clyngor import as_pyasp, parsing, utils def naive_parsing_of_answer_set(answer_set:str, *, discard_quotes:bool=False, parse_int:bool=True, parse_args:bool=True) -> [(str, tuple)]: """Yield (pred, args), naively parsed from given answer set encoded as clingo output string. Some atoms may be missing. Some others may be poorly parsed.""" # print('NAIVE_PARSING_OF_ANSWER_SET:', answer_set, f'\t keep_quotes={keep_quotes}, parse_int={parse_int}, parse_args={parse_args}') REG_ANSWER_SET = re.compile(r'([a-z_-][a-zA-Z0-9_]*|[0-9]+|"[^"]*")(\([^)]+\))?') for match in REG_ANSWER_SET.finditer(answer_set): pred, args = match.groups() assert args is None or (args.startswith('(') and args.endswith(')')) if args and parse_args: args = args[1:-1] # remove surrounding parens if discard_quotes: args = utils.remove_arguments_quotes(args) args = tuple( (int(arg) if parse_int and (arg[1:] if arg.startswith('-') else arg).isnumeric() else arg) for arg in args.split(',') ) if parse_args else args elif args: # args should not be parsed args = args pred = int(pred) if pred.isnumeric() else pred # handle yield pred, args or () # print('\t>', pred, args) class Answers: """Proxy to the solver, generated by solving methods like solve.solve or inline.ASP. Iterable on the answer sets generated by the solver. Also expose some answer set formatting tunning. """ def __init__(self, answers:iter, command:str='', statistics:dict={}, *, decoders:iter=(), with_optimization:bool=False, on_end:callable=None, **kwargs): """Answer sets must be iterable of (predicate, args). decoders -- iterable of decoders to apply on ASP (see clyngor.decoder). with_optimization -- answers are read as ((predicate, args), optimization, optimality) allowing to retrieve optimization data of the answers. See also Answers.with_optimization property. on_end -- if callable, called when all answer sets are exhausted. """ if not with_optimization: # emulate optimization with None value # TODO: verify that in absence of optimization, all answers are ALWAYS enumerated from 1 to n. answers = ((answer, None, False, idx) for idx, answer in enumerate(answers, start=1)) self._answers = iter(answers) self._command = str(command or '') self._statistics = statistics # will be updated by reading method self._decoders = tuple(decoders) self._first_arg_only = False self._group_atoms = False self._group_atoms_with_arity = False self._as_pyasp = False self._sorted = False self._keep_quotes = True # TODO: break api to add keep quotes instead of discard quotes. self._careful_parsing = False self._collapse_atoms= False self._collapse_args = True self._parse_int = True self._ignore_args = False self._with_optimization = False self._with_optimality = False self._with_answer_number = False self.__on_end = on_end or (lambda: None) for k, v in kwargs.items(): if isinstance(getattr(self, '_' + k, None), bool): setattr(self, '_' + k, bool(value)) elif k == 'discard_quotes': setattr(self, '_keep_quotes', not bool(value)) def __del__(self): """Call the on_end function, avoiding a too-many-file-open error. That error happened in a recent project where clyngor was called thousands of times, without accessing all models, which prevented __iter__ method to ever call on_end function. Hence the use of __del__. """ self.clean_resources() @property def command(self) -> str: return self._command @property def with_optimization(self): """Yield (model, optimization) instead of just model""" self._with_optimization = True return self @property def with_optimality(self): """Yield (model, optimization, optimality) instead of just model""" self._with_optimization = True self._with_optimality = True return self @property def with_answer_number(self): """Yield (model, optimization, optimality, answer_number) instead of just model""" self._with_optimization = True self._with_optimality = True self._with_answer_number = True return self @property def first_arg_only(self): """Keep only the first argument, and do not enclose it in a tuple.""" self._first_arg_only = True return self @property def by_predicate(self): """Group atoms by predicate. Answer sets are then dict with predicate as keys and collection of args as value.""" self._group_atoms = True return self @property def by_arity(self): """Group atoms by predicate and arity. Answer sets are then dict with predicate/arity as keys and collection of args as value.""" self._group_atoms = True self._group_atoms_with_arity = True return self @property def as_pyasp(self): """Return Term and TermSet object offering a pyasp-like interface""" self._as_pyasp = True return self @property def sorted(self): """Sort the atom (or the args when grouped)""" self._sorted = True return self @property def keep_quotes(self): """Keep the quotes of the string""" self._keep_quotes = True return self @property def discard_quotes(self): """Discard the quotes of the string""" self._keep_quotes = False return self @property def careful_parsing(self): """Use robust parser""" self._careful_parsing = True return self @property def atoms_as_string(self): """All atoms are encoded as ASP strings, left unparsed.""" self._collapse_atoms = True self._collapse_args = True return self @property def int_not_parsed(self): """Do not parse the integer arguments, so if an atom have integers as arguments, they will be returned as string, not integers. """ self._parse_int = False return self @property def parse_args(self): """Parse the arguments as well, so if an atom is argument of another one, it will be parsed as any atom instead of being understood as a string. Will use the robust parser. """ self._careful_parsing = True # needed to implement the collapse self._collapse_atoms = False self._collapse_args = False return self @property def no_arg(self): """Do not parse arguments, and discard/ignore them. """ self._ignore_args = True return self def __next__(self): return next(iter(self)) def __iter__(self): """Yield answer sets""" for answer_set, optimization, optimality, answer_number in self._answers: answer_set = tuple(self._parse_answer(answer_set)) parsed = self._format(answer_set) if self._with_answer_number: yield parsed, optimization, optimality, answer_number elif self._with_optimality: yield parsed, optimization, optimality elif self._with_optimization: yield parsed, optimization else: yield parsed self.clean_resources() def clean_resources(self): self.__on_end() self.__on_end = lambda: None # don't call it again def _parse_answer(self, answer_set:str) -> iter: """Yield atoms as (pred, args) from given answer set""" careful_parsing = self._careful_parsing or parsing.careful_parsing_required(answer_set) # keep_quotes = self._keep_quotes or not self._collapse_atoms or not self._as_pyasp discard_quotes = not (self._keep_quotes or self._collapse_atoms or self._as_pyasp) if isinstance(answer_set, str) and careful_parsing: # print('CAREFUL PARSING:', answer_set) # _keep_quotes is incompatible with atoms_as_string and as_pyasp. # atom_as_string: remove the quotes delimiting arguments. # as_pyasp: remove the quotes for the arguments. yield from parsing.Parser( self._collapse_atoms, self._collapse_args, discard_quotes, self._first_arg_only, parse_integer=self._parse_int ).parse_terms(answer_set) elif isinstance(answer_set, str): # the good ol' split # print('THE GOOD OLD SPLIT:', f"keep_quotes={self._keep_quotes} collapse_atoms={self._collapse_atoms}") yield from self.__finish_parsing(naive_parsing_of_answer_set(answer_set, discard_quotes=discard_quotes, parse_int=self._parse_int, parse_args=self._collapse_args or self._first_arg_only)) elif isinstance(answer_set, (set, tuple)) and all(isinstance(atom, (str, int, tuple)) for atom in answer_set): # already parsed # print('FROM SET OR TUPLE') if not self._parse_int: answer_set = utils.integers_to_string_atoms(answer_set) yield from self.__finish_parsing(answer_set) else: # unknown format raise ValueError("unknow answer set format: {}, {}".format(type(answer_set), answer_set)) def __finish_parsing(self, answer_set:[(str, tuple)]) -> [(str, tuple) or str]: """Modify (pred, args) atoms according to parsing options""" for pred, args in answer_set: if self._first_arg_only: args = [args[0]] if args else args if self._atoms_as_string: args = ('(' + ','.join(map(str, args)) + ')') if args else '' yield pred + args else: yield pred, args def _format(self, answer_set) -> dict or frozenset: """Perform the formatting of the answer set according to formatting options. answer_set -- iterable of (pred, args) """ sorted_tuple = lambda it: tuple(sorted(it)) builder = sorted_tuple if self._sorted else frozenset if self._atoms_as_string: # special case return builder(answer_set) elif self._ignore_args: answer_set = (pred for pred, _ in answer_set) if self._group_atoms: return {pred: frozenset() for pred in answer_set} if self._as_pyasp: return as_pyasp.TermSet(as_pyasp.Atom.from_tuple_repr((pred, ())) for pred in answer_set) return builder(answer_set) elif self._first_arg_only: answer_set = builder((pred, args[0] if args else ()) for pred, args in answer_set) else: answer_set = builder((pred, tuple(args)) for pred, args in answer_set) # NB: as_pyasp flag behave differently if group_atoms is activated if self._group_atoms: mapping = defaultdict(set) for pred, args in answer_set: if self._as_pyasp: args = as_pyasp.Atom.from_tuple_repr((pred, args)) mapping[pred].add(args) if self._group_atoms_with_arity: mapping[pred, len(args)].add(args) mapping['{}/{}'.format(pred, str(len(args)))].add(args) if self._as_pyasp: return {pred: as_pyasp.TermSet(args) for pred, args in mapping.items()} else: return {pred: builder(args) for pred, args in mapping.items()} elif self._as_pyasp: return as_pyasp.TermSet(as_pyasp.Atom.from_tuple_repr(atom) for atom in answer_set) return answer_set @property def _atoms_as_string(self) -> bool: """Shortcut""" return self._collapse_atoms and self._collapse_args @property def statistics(self) -> dict: return dict(self._statistics) class ClingoAnswers(Answers): """Proxy to the solver as called through the python clingo module. """ def __init__(self, solver, statistics:callable=(lambda: {})): assert clyngor.have_clingo_module() super().__init__(self.__compute_answers(), with_optimization=True, command='[clingo module call]') self._solver = solver self._statistics = lambda s=solver: s.statistics assert callable(self._statistics) def __compute_answers(self): kwargs = {'yield_': True, 'async_': True} # compat with 3.7 with self._solver.solve(**kwargs) as models: for model in models: answer_set = set(utils.clingo_symbol_as_python_value(a) for a in model.symbols(shown=True)) yield answer_set, model.cost, model.optimality_proven, model.number @property def statistics(self) -> dict: return self._statistics()