Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preserving of optimisation results implemented #37

Merged
merged 5 commits into from
Aug 18, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file modified cases/synthetic/__init__.py
100755 → 100644
Empty file.
79 changes: 42 additions & 37 deletions gefest/core/opt/operators/crossover.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,46 +17,51 @@ def crossover_worker(args):
Polygons are exchanged between structures
"""

s1, s2, domain = args[0], args[1], args[2]
try:
s1, s2, domain = args[0], args[1], args[2]

new_structure = copy.deepcopy(s1)
s1 = copy.deepcopy(s1)
s2 = copy.deepcopy(s2)

# Checking if at least one Structure does not have any polygons
if not all([len(s1.polygons), len(s2.polygons)]):
# All polygons are shuffling between Structures in random way
s1, s2 = shuffle_structures(s1, s2)

crossover_point = random.randint(1, min(len(s1.polygons), len(s2.polygons))) # Choosing crossover point randomly

# Crossover conversion
part_1 = s1.polygons[:crossover_point]
if not isinstance(part_1, list):
part_1 = [part_1]
part_2 = s2.polygons[crossover_point:]
if not isinstance(part_2, list):
part_2 = [part_2]

result = copy.deepcopy(part_1)
result.extend(copy.deepcopy(part_2))

new_structure.polygons = result

# Postprocessing for new structure
new_structure = postprocess(new_structure, domain)
constraints = check_constraints(structure=new_structure, domain=domain)
max_attempts = 3 # Number of postprocessing attempts
while not constraints:
new_structure = copy.deepcopy(s1)
s1 = copy.deepcopy(s1)
s2 = copy.deepcopy(s2)

# Checking if at least one Structure does not have any polygons
if not all([len(s1.polygons), len(s2.polygons)]):
# All polygons are shuffling between Structures in random way
s1, s2 = shuffle_structures(s1, s2)

# Choosing crossover point randomly
crossover_point = random.randint(1, min(len(s1.polygons), len(s2.polygons)))

# Crossover conversion
part_1 = s1.polygons[:crossover_point]
if not isinstance(part_1, list):
part_1 = [part_1]
part_2 = s2.polygons[crossover_point:]
if not isinstance(part_2, list):
part_2 = [part_2]

result = copy.deepcopy(part_1)
result.extend(copy.deepcopy(part_2))

new_structure.polygons = result

# Postprocessing for new structure
new_structure = postprocess(new_structure, domain)
constraints = check_constraints(structure=new_structure, domain=domain)
max_attempts -= 1
if max_attempts == 0:
# If the number of attempts is over,
# the transformation is considered unsuccessful
# and one of the structures is returned
return s1
return new_structure
max_attempts = 3 # Number of postprocessing attempts
while not constraints:
new_structure = postprocess(new_structure, domain)
constraints = check_constraints(structure=new_structure, domain=domain)
max_attempts -= 1
if max_attempts == 0:
# If the number of attempts is over,
# the transformation is considered unsuccessful
# and one of the structures is returned
return s1
return new_structure
except Exception as ex:
print(ex)
return s1


def crossover(s1: Structure, s2: Structure, domain: Domain, rate: float = 0.4) -> Structure:
Expand Down
8 changes: 6 additions & 2 deletions gefest/core/opt/optimize.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
from gefest.core.opt.GA.GA import GA
from gefest.core.opt.objectives import calculate_objectives
from gefest.core.opt.operators.operators import default_operators
from gefest.core.opt.result import Result
from gefest.core.opt.setup import Setup


def optimize(task_setup: Setup, objective_function: Callable, max_gens: int, pop_size: int) -> list:
def optimize(task_setup: Setup, objective_function: Callable, max_gens: int, pop_size: int) -> Result:
"""The wrapper object for searching optimal solution by given arguments

Args:
Expand All @@ -30,4 +31,7 @@ def optimize(task_setup: Setup, objective_function: Callable, max_gens: int, pop
calculate_objectives=partial(calculate_objectives, model_func=objective_function),
evolutionary_operators=operators, task_setup=task_setup).solution(verbose=False)

return best.genotype
return Result(name='result', best_structure=best.genotype,
metadata={'max_gens': max_gens,
'pop_size': pop_size},
fitness=best.fitness)
29 changes: 29 additions & 0 deletions gefest/core/opt/result.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import json
import os
from dataclasses import dataclass
from typing import Union, List, Optional

from gefest.core.serialization.serializer import Serializer
from gefest.core.structure.structure import Structure


@dataclass
class Result:
name: str
best_structure: Union[Structure, List[Structure]]
fitness: float
metadata: dict

def save(self, json_file_path: os.PathLike = None) -> Optional[str]:
if json_file_path is None:
return json.dumps(self, indent=4, cls=Serializer)
with open(json_file_path, mode='w') as json_fp:
json.dump(self, json_fp, indent=4, cls=Serializer)

@staticmethod
def load(json_str_or_file_path: Union[str, os.PathLike] = None) -> 'Result':
try:
return json.loads(json_str_or_file_path, cls=Serializer)
except json.JSONDecodeError as exc:
with open(json_str_or_file_path, mode='r') as json_fp:
return json.load(json_fp, cls=Serializer)
Empty file.
32 changes: 32 additions & 0 deletions gefest/core/serialization/any.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from copy import deepcopy
from inspect import signature
from typing import Any, Dict, Type

from gefest.core.serialization.serializer import Serializer, INSTANCE_OR_CALLABLE


def any_to_json(obj: INSTANCE_OR_CALLABLE) -> Dict[str, Any]:
return {
**{k: v for k, v in vars(obj).items() if k != 'log'},
**Serializer.dump_path_to_obj(obj)
}


def any_from_json(cls: Type[INSTANCE_OR_CALLABLE], json_obj: Dict[str, Any]) -> INSTANCE_OR_CALLABLE:
cls_parameters = signature(cls.__init__).parameters
if 'kwargs' not in cls_parameters:
init_data = {
k: v
for k, v in json_obj.items()
if k in cls_parameters
}
obj = cls(**init_data)
vars(obj).update({
k: json_obj[k]
for k in json_obj.keys() ^ init_data.keys()
})
else:
init_data = deepcopy(json_obj)
obj = cls(**init_data)
vars(obj).update(json_obj)
return obj
141 changes: 141 additions & 0 deletions gefest/core/serialization/serializer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
from importlib import import_module
from inspect import isclass, isfunction, ismethod, signature
from json import JSONDecoder, JSONEncoder
from typing import Any, Callable, Dict, Type, TypeVar, Union

MODULE_X_NAME_DELIMITER = '/'
INSTANCE_OR_CALLABLE = TypeVar('INSTANCE_OR_CALLABLE', object, Callable)
CLASS_PATH_KEY = '_class_path'


class Serializer(JSONEncoder, JSONDecoder):
_to_json = 'to_json'
_from_json = 'from_json'

CODERS_BY_TYPE = {}

def __init__(self, *args, **kwargs):
for base_class, coder_name in [(JSONEncoder, 'default'), (JSONDecoder, 'object_hook')]:
base_kwargs = {k: kwargs[k] for k in kwargs.keys() & signature(base_class.__init__).parameters}
base_kwargs[coder_name] = getattr(self, coder_name)
base_class.__init__(self, **base_kwargs)

if not Serializer.CODERS_BY_TYPE:
from gefest.core.opt.result import Result
from gefest.core.structure.structure import Structure
from gefest.core.structure.point import Point
from gefest.core.structure.polygon import Polygon

from .any import (
any_from_json,
any_to_json,
)

_to_json = Serializer._to_json
_from_json = Serializer._from_json
basic_serialization = {_to_json: any_to_json, _from_json: any_from_json}
Serializer.CODERS_BY_TYPE = {
Result: basic_serialization,
Structure: basic_serialization,
Polygon: basic_serialization,
Point: basic_serialization

}

@staticmethod
def _get_field_checker(obj: Union[INSTANCE_OR_CALLABLE, Type[INSTANCE_OR_CALLABLE]]) -> Callable[..., bool]:
if isclass(obj):
return issubclass
return isinstance

@staticmethod
def _get_base_type(obj: Union[INSTANCE_OR_CALLABLE, Type[INSTANCE_OR_CALLABLE]]) -> int:
contains = Serializer._get_field_checker(obj)
for k_type in Serializer.CODERS_BY_TYPE:
if contains(obj, k_type):
return k_type
return None

@staticmethod
def _get_coder_by_type(coder_type: Type, coder_aim: str):
return Serializer.CODERS_BY_TYPE[coder_type][coder_aim]

@staticmethod
def dump_path_to_obj(obj: INSTANCE_OR_CALLABLE) -> Dict[str, str]:
"""Dumps the full path (module + name) to the input object into the dict

Args:
obj: object which path should be resolved (class, function or method)

Returns:
dictionary with path to the object
"""

if isclass(obj) or isfunction(obj) or ismethod(obj):
obj_name = obj.__qualname__
else:
obj_name = obj.__class__.__qualname__

if getattr(obj, '__module__', None) is not None:
obj_module = obj.__module__
else:
obj_module = obj.__class__.__module__
return {
CLASS_PATH_KEY: f'{obj_module}{MODULE_X_NAME_DELIMITER}{obj_name}'
}

def default(self, obj: INSTANCE_OR_CALLABLE) -> Dict[str, Any]:
"""Tries to encode objects that are not simply json-encodable to JSON-object

Args:
obj: object to be encoded (class, function or method)

Returns:
json object
"""
if isfunction(obj) or ismethod(obj):
return Serializer.dump_path_to_obj(obj)
base_type = Serializer._get_base_type(obj)
if base_type is not None:
return Serializer._get_coder_by_type(base_type, Serializer._to_json)(obj)

return JSONEncoder.default(self, obj)

@staticmethod
def _get_class(class_path: str) -> Type[INSTANCE_OR_CALLABLE]:
"""Gets the object type from the class_path
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

этот метод скрытый, зачем его документировать докстрингами?

если им можно пользоваться, то почему он не может быть обычным?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Документировать не обязательно, но лишним не будет. В таком виде взял из FEDOT-а.


Args:
class_path: full path (module + name) of the class

Returns:
class, function or method type
"""

module_name, class_name = class_path.split(MODULE_X_NAME_DELIMITER)
obj_cls = import_module(module_name)
for sub in class_name.split('.'):
obj_cls = getattr(obj_cls, sub)
return obj_cls

def object_hook(self, json_obj: Dict[str, Any]) -> Union[INSTANCE_OR_CALLABLE, dict]:
"""Decodes every JSON-object to python class/func object or just returns dict

Args:
json_obj: dict[str, Any] to be decoded into Python class, function or
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

типы объектов принималось обозначать через type, то есть dict[str, Any]
в таком случае этот объект будет в симпатишном квадратике

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

при этом сами объекты, например Point, Domain и прочее, вот так -> :obj:Domain

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Пока просто убрал дублирующую type hint-ы типизацию.

method object only if it has some special fields

Returns:
Python class, function or method object OR input if it's just a regular dict
"""

if CLASS_PATH_KEY in json_obj:
obj_cls = Serializer._get_class(json_obj[CLASS_PATH_KEY])
del json_obj[CLASS_PATH_KEY]
base_type = Serializer._get_base_type(obj_cls)
if isclass(obj_cls) and base_type is not None:
return Serializer._get_coder_by_type(base_type, Serializer._from_json)(obj_cls, json_obj)
elif isfunction(obj_cls) or ismethod(obj_cls):
return obj_cls
raise TypeError(f'Parsed obj_cls={obj_cls} is not serializable, but should be')
return json_obj
7 changes: 5 additions & 2 deletions gefest/core/structure/polygon.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import List
from typing import List, Optional
from uuid import uuid4

from gefest.core.structure.point import Point

Expand Down Expand Up @@ -31,6 +32,8 @@ class Polygon:
Polygon: ``Polygon(List[Point])``
"""

def __init__(self, polygon_id: str, points: List[Point]):
def __init__(self, polygon_id: str = str(uuid4()), points: Optional[List[Point]] = None):
if points is None:
points = []
DenisSidoren marked this conversation as resolved.
Show resolved Hide resolved
self.id = polygon_id
self.points = points
3 changes: 1 addition & 2 deletions gefest/core/viz/struct_vizualizer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from cProfile import label
import matplotlib.pyplot as plt

from gefest.core.structure.domain import Domain
Expand Down Expand Up @@ -57,7 +56,7 @@ def plot_poly(self, poly: Polygon, info: str) -> plt.plot:
>>> from gefest.core.structure.structure import get_random_poly
>>> struct = get_random_structure(domain)
>>> poly = struct.polygons[0]
>>> viz.plot_poly(poly, 'random genreated polygon')
>>> viz.plot_poly(poly, 'random generated polygon')

Returns:
|viz_poly|
Expand Down
12 changes: 9 additions & 3 deletions test/test_circle.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,23 @@

from gefest.core.geometry.geometry_2d import Geometry2D
from gefest.core.opt.optimize import optimize
from gefest.core.opt.result import Result
from gefest.core.opt.setup import Setup
from gefest.core.structure.domain import Domain
from gefest.core.structure.structure import Structure


"""
Test for synthetic case with isoperimetric task
"""


def test_fast():
geometry = Geometry2D(is_closed=True)

def area_length_ratio(struct: Structure):
if len(struct.polygons) == 0:
return None

poly = struct.polygons[0]
area = geometry.get_square(poly)
length = geometry.get_length(poly)
Expand All @@ -32,17 +36,19 @@ def area_length_ratio(struct: Structure):
(100, 0),
(0, 0)],
geometry=geometry,
max_poly_num=1,
max_poly_num=100,
min_poly_num=1,
max_points_num=30,
min_points_num=20,
is_closed=True)

task_setup = Setup(domain=domain)

optimized_structure = optimize(task_setup=task_setup,
optimization_result = optimize(task_setup=task_setup,
objective_function=area_length_ratio,
pop_size=20,
max_gens=1)

optimized_structure = optimization_result.best_structure
assert type(optimized_structure) == Structure
assert type(optimization_result) == Result
Loading