Skip to content

Commit

Permalink
Added MPSO and SPSO algorithmsfor dynamic function optimization
Browse files Browse the repository at this point in the history
--HG--
branch : dev
  • Loading branch information
fmder committed Aug 29, 2012
1 parent b7fc552 commit 886f1b4
Show file tree
Hide file tree
Showing 2 changed files with 388 additions and 0 deletions.
219 changes: 219 additions & 0 deletions examples/pso/pso_multiswarm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
# This file is part of DEAP.
#
# DEAP is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation, either version 3 of
# the License, or (at your option) any later version.
#
# DEAP is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with DEAP. If not, see <http://www.gnu.org/licenses/>.

"""Implementation of the Multiswarm Particle Swarm Optimization algorithm as
presented in *Blackwell, Branke, and Li, 2008, Particle Swarms for Dynamic
Optimization Problems.*
"""

import itertools
import math
import operator
import random

from deap import base
from deap.benchmarks import movingpeaks
from deap import creator
from deap import tools

scenario = movingpeaks.SCENARIO_2

NDIM = 5
BOUNDS = [scenario["min_coord"], scenario["max_coord"]]

mpb = movingpeaks.MovingPeaks(dim=NDIM, **scenario)

creator.create("FitnessMax", base.Fitness, weights=(1.0,))
creator.create("Particle", list, fitness=creator.FitnessMax, speed=list,
best=None, bestfit=creator.FitnessMax)
creator.create("Swarm", list, best=None, bestfit=creator.FitnessMax)

def generate(pclass, dim, pmin, pmax, smin, smax):
part = pclass(random.uniform(pmin, pmax) for _ in range(dim))
part.speed = [random.uniform(smin, smax) for _ in range(dim)]
return part

def convert_quantum(swarm, rcloud, centre):
dim = len(swarm[0])
for part in swarm:
position = [random.gauss(0, 1) for _ in range(dim)]
dist = math.sqrt(sum(x**2 for x in position))

# Gaussian distribution
# u = abs(random.gauss(0, 1.0/3.0))
# part[:] = [(rcloud * x * u**(1.0/dim) / dist) + c for x, c in zip(position, centre)]

# UVD distribution
# u = random.random()
# part[:] = [(rcloud * x * u**(1.0/dim) / dist) + c for x, c in zip(position, centre)]

# NUVD distribution
u = abs(random.gauss(0, 1.0/3.0))
part[:] = [(rcloud * x * u / dist) + c for x, c in zip(position, centre)]

del part.fitness.values
del part.bestfit.values
part.best = None

return swarm

def updateParticle(part, best, chi, c):
ce1 = (c * random.uniform(0, 1) for _ in range(len(part)))
ce2 = (c * random.uniform(0, 1) for _ in range(len(part)))
ce1_p = itertools.imap(operator.mul, ce1, itertools.imap(operator.sub, best, part))
ce2_g = itertools.imap(operator.mul, ce2, itertools.imap(operator.sub, part.best, part))
a = itertools.imap(operator.sub,
itertools.imap(operator.mul,
itertools.repeat(chi),
itertools.imap(operator.add, ce1_p, ce2_g)),
itertools.imap(operator.mul,
itertools.repeat(1 - chi),
part.speed))
part.speed = map(operator.add, part.speed, a)
part[:] = map(operator.add, part, part.speed)

toolbox = base.Toolbox()
toolbox.register("particle", generate, creator.Particle, dim=NDIM,
pmin=BOUNDS[0], pmax=BOUNDS[1], smin=-(BOUNDS[1] - BOUNDS[0])/2.0,
smax=(BOUNDS[1] - BOUNDS[0])/2.0)
toolbox.register("swarm", tools.initRepeat, creator.Swarm, toolbox.particle)
toolbox.register("update", updateParticle, chi=0.729843788, c=2.05)
toolbox.register("convert", convert_quantum)
toolbox.register("evaluate", mpb)

def main(verbose=True):
NSWARMS = 1
NPARTICLES = 5
NEXCESS = 3
RCLOUD = 0.5 # 0.5 times the move severity

stats = tools.Statistics(lambda ind: ind.fitness.values)
stats.register("avg", tools.mean)
stats.register("std", tools.std)
stats.register("min", min)
stats.register("max", max)

# Generate the initial population
population = [toolbox.swarm(n=NPARTICLES) for _ in range(NSWARMS)]

# Evaluate each particle
for swarm in population:
for part in swarm:
part.fitness.values = toolbox.evaluate(part)

# Update swarm's attractors personal best and global best
if not part.best or part.fitness > part.bestfit:
part.best = toolbox.clone(part[:]) # Get the position
part.bestfit.values = part.fitness.values # Get the fitness
if not swarm.best or part.fitness > swarm.bestfit:
swarm.best = toolbox.clone(part[:]) # Get the position
swarm.bestfit.values = part.fitness.values # Get the fitness

stats.update(itertools.chain(*population))

if verbose:
logger = tools.EvolutionLogger(["gen", "evals", "nswarm", "error", "offline_error"] + stats.functions.keys())
logger.logHeader()
logger.logGeneration(gen=0, evals=mpb.nevals, nswarm=len(population), error=mpb.currentError(), offline_error=mpb.offlineError(), stats=stats)

generation = 1
while mpb.nevals < 5e5:
# Check for convergence
rexcl = (BOUNDS[1] - BOUNDS[0]) / (2 * len(population)**(1.0/NDIM))

not_converged = 0
worst_swarm_idx = None
worst_swarm = None
for i, swarm in enumerate(population):
# Compute the diameter of the swarm
for p1, p2 in itertools.combinations(swarm, 2):
d = math.sqrt(sum((x1 - x2)**2. for x1, x2 in zip(p1, p2)))
if d > 2*rexcl:
not_converged += 1
# Search for the worst swarm according to its global best
if not worst_swarm or swarm.bestfit < worst_swarm.bestfit:
worst_swarm_idx = i
worst_swarm = swarm
break

# If all swarms have converged, add a swarm
if not_converged == 0:
population.append(toolbox.swarm(n=NPARTICLES))
# If too many swarms are roaming, remove the worst swarm
elif not_converged > NEXCESS:
population.pop(worst_swarm_idx)

# Update and evaluate the swarm
for swarm in population:
# Check for change
if swarm.best and toolbox.evaluate(swarm.best) != swarm.bestfit.values:
# Convert particles to quantum particles
swarm[:] = toolbox.convert(swarm, rcloud=RCLOUD, centre=swarm.best)
swarm.best = None
del swarm.bestfit.values

for part in swarm:
# Not necessary to update if it is a new swarm
# or a swarm just converted to quantum
if swarm.best and part.best:
toolbox.update(part, swarm.best)
part.fitness.values = toolbox.evaluate(part)

# Update swarm's attractors personal best and global best
if not part.best or part.fitness > part.bestfit:
part.best = toolbox.clone(part[:])
part.bestfit.values = part.fitness.values
if not swarm.best or part.fitness > swarm.bestfit:
swarm.best = toolbox.clone(part[:])
swarm.bestfit.values = part.fitness.values

stats.update(itertools.chain(*population))

if verbose:
logger.logGeneration(gen=generation, evals=mpb.nevals, nswarm=len(population), error=mpb.currentError(), offline_error=mpb.offlineError(), stats=stats)

# Apply exclusion
reinit_swarms = set()
for s1, s2 in itertools.combinations(range(len(population)), 2):
# Swarms must have a best and not already be set to reinitialize
if population[s1].best and population[s2].best and not (s1 in reinit_swarms or s2 in reinit_swarms):
dist = 0
for x1, x2 in zip(population[s1].best, population[s2].best):
dist += (x1 - x2)**2.
dist = math.sqrt(dist)
if dist < rexcl:
if population[s1].bestfit <= population[s2].bestfit:
reinit_swarms.add(s1)
else:
reinit_swarms.add(s2)

# Reinitialize and evaluate swarms
for s in reinit_swarms:
population[s] = toolbox.swarm(n=NPARTICLES)
for part in population[s]:
part.fitness.values = toolbox.evaluate(part)

# Update swarm's attractors personal best and global best
if not part.best or part.fitness > part.bestfit:
part.best = toolbox.clone(part[:])
part.bestfit.values = part.fitness.values
if not population[s].best or part.fitness > population[s].bestfit:
population[s].best = toolbox.clone(part[:])
population[s].bestfit.values = part.fitness.values
generation += 1

if __name__ == "__main__":
main()

169 changes: 169 additions & 0 deletions examples/pso/pso_speciation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
# This file is part of DEAP.
#
# DEAP is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation, either version 3 of
# the License, or (at your option) any later version.
#
# DEAP is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with DEAP. If not, see <http://www.gnu.org/licenses/>.

"""Implementation of the Speciation Particle Swarm Optimization algorithm as
presented in *Li, Blackwell, and Branke, 2006, Particle Swarm with Speciation
and Adaptation in a Dynamic Environment.*
"""

import itertools
import math
import operator
import random

from deap import base
from deap.benchmarks import movingpeaks
from deap import creator
from deap import tools

scenario = movingpeaks.SCENARIO_2

NDIM = 5
BOUNDS = [scenario["min_coord"], scenario["max_coord"]]

mpb = movingpeaks.MovingPeaks(dim=NDIM, **scenario)

creator.create("FitnessMax", base.Fitness, weights=(1.0,))
creator.create("Particle", list, fitness=creator.FitnessMax, speed=list,
best=None, bestfit=creator.FitnessMax)

def generate(pclass, dim, pmin, pmax, smin, smax):
part = pclass(random.uniform(pmin, pmax) for _ in xrange(dim))
part.speed = [random.uniform(smin, smax) for _ in xrange(dim)]
return part

def convert_quantum(swarm, rcloud, centre):
dim = len(swarm[0])
for part in swarm:
position = [random.gauss(0, 1) for _ in range(dim)]
dist = math.sqrt(sum(x**2 for x in position))

# Gaussian distribution
# u = abs(random.gauss(0, 1.0/3.0))
# part[:] = [(rcloud * x * u**(1.0/dim) / dist) + c for x, c in zip(position, centre)]

# UVD distribution
# u = random.random()
# part[:] = [(rcloud * x * u**(1.0/dim) / dist) + c for x, c in zip(position, centre)]

# NUVD distribution
u = abs(random.gauss(0, 1.0/3.0))
part[:] = [(rcloud * x * u / dist) + c for x, c in zip(position, centre)]

del part.fitness.values
del part.bestfit.values
part.best = None

return swarm

def updateParticle(part, best, chi, c):
ce1 = (c*random.uniform(0, 1) for _ in range(len(part)))
ce2 = (c*random.uniform(0, 1) for _ in range(len(part)))
ce1_p = itertools.imap(operator.mul, ce1, itertools.imap(operator.sub, best, part))
ce2_g = itertools.imap(operator.mul, ce2, itertools.imap(operator.sub, part.best, part))
a = itertools.imap(operator.sub,
itertools.imap(operator.mul,
itertools.repeat(chi),
itertools.imap(operator.add, ce1_p, ce2_g)),
itertools.imap(operator.mul,
itertools.repeat(1-chi),
part.speed))
part.speed = map(operator.add, part.speed, a)
part[:] = map(operator.add, part, part.speed)

toolbox = base.Toolbox()
toolbox.register("particle", generate, creator.Particle, dim=NDIM,
pmin=BOUNDS[0], pmax=BOUNDS[1], smin=-(BOUNDS[1] - BOUNDS[0])/2.0,
smax=(BOUNDS[1] - BOUNDS[0])/2.0)
toolbox.register("swarm", tools.initRepeat, list, toolbox.particle)
toolbox.register("update", updateParticle, chi=0.729843788, c=2.05)
toolbox.register("convert", convert_quantum)
toolbox.register("evaluate", mpb)

def main(verbose=True):
NPARTICLES = 100
RS = (BOUNDS[1] - BOUNDS[0]) / (50**(1.0/NDIM)) # between 1/20 and 1/10 of the domain's range
PMAX = 10
RCLOUD = 1.0 # 0.5 times the move severity

stats = tools.Statistics(lambda ind: ind.fitness.values)
stats.register("avg", tools.mean)
stats.register("std", tools.std)
stats.register("min", min)
stats.register("max", max)

swarm = toolbox.swarm(n=NPARTICLES)

if verbose:
logger = tools.EvolutionLogger(["gen", "evals", "nspecies", "error", "offline_error"] + stats.functions.keys())
logger.logHeader()

generation = 0
while mpb.nevals < 5e5:
# Evaluate each particle in the swarm
for part in swarm:
part.fitness.values = toolbox.evaluate(part)
if not part.best or part.bestfit < part.fitness:
part.best = toolbox.clone(part[:]) # Get the position
part.bestfit.values = part.fitness.values # Get the fitness

stats.update(swarm)

# Sort swarm into species, best individual comes first
sorted_swarm = sorted(swarm, key=lambda ind: ind.bestfit, reverse=True)
species = []
while sorted_swarm:
found = False
for s in species:
dist = math.sqrt(sum((x1 - x2)**2 for x1, x2 in zip(sorted_swarm[0].best, s[0].best)))
if dist <= RS:
found = True
s.append(sorted_swarm[0])
break
if not found:
species.append([sorted_swarm[0]])
sorted_swarm.pop(0)

if verbose:
logger.logGeneration(gen=generation, evals=mpb.nevals, nspecies=len(species), error=mpb.currentError(), offline_error=mpb.offlineError(), stats=stats)

# Detect change
if any(s[0].bestfit.values != toolbox.evaluate(s[0].best) for s in species):
# Convert particles to quantum particles
for s in species:
s[:] = toolbox.convert(s, rcloud=RCLOUD, centre=s[0].best)

else:
# Replace exceeding particles in a species with new particles
for s in species:
if len(s) > PMAX:
n = len(s) - PMAX
del s[PMAX:]
s.extend(toolbox.swarm(n=n))

# Update particles that have not been reinitialized
for s in species[:-1]:
for part in s[:PMAX]:
toolbox.update(part, s[0].best)
del part.fitness.values

# Return all but the worst species' updated particles to the swarm
# The worst species is replaced by new particles
swarm = list(itertools.chain(toolbox.swarm(n=len(species[-1])), *species[:-1]))
generation += 1

if __name__ == '__main__':
main()

0 comments on commit 886f1b4

Please sign in to comment.