Skip to content

Instantly share code, notes, and snippets.

@sp4ghet
Created December 14, 2017 16:13
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sp4ghet/b3cab24a749ad1f2088efce367446b00 to your computer and use it in GitHub Desktop.
Save sp4ghet/b3cab24a749ad1f2088efce367446b00 to your computer and use it in GitHub Desktop.
import numpy as np
from typing import Generator, Tuple, Callable
from abc import ABCMeta, abstractmethod
class IProblem(metaclass=ABCMeta):
@staticmethod
@abstractmethod
def fitness(X: np.ndarray) -> Tuple[np.ndarray, ...]:
pass
@staticmethod
@abstractmethod
def perfect_pareto_front() -> np.ndarray:
raise NotImplementedError
class MOPSO:
@staticmethod
def call(problem: IProblem, epochs: int, agent_count: int) -> Generator[Tuple[np.ndarray, np.ndarray], None, None]:
swarm = np.array( [[MOPSO._new_design(problem.bounds), # 0 Design Vector
None, # 1 Objective
None, # 2 Personal Best Design Vector
0, # 3 domination count
None, # 4 dominating_set S
np.zeros(len(problem.bounds)) # 5 velocity
] for _ in range(agent_count)])
# Initialize objective vector
for agent in swarm:
agent[1] = np.array(problem.fitness(agent[0]))
swarm[:, 2] = swarm[:, 0]
external_archive = MOPSO._update_archive(swarm, swarm, agent_count)
for i in range(1, epochs+1):
for agent in swarm:
# select design vector of random leader
leader = MOPSO._select_leader(external_archive)
# Update position of agent
agent[0], agent[5] = MOPSO._flight(agent, leader, problem.bounds)
# Conduct mutation
agent[0] = MOPSO._mutate(agent[0], problem.bounds)
# Evaluate fitness and update pbest if necessary
new_objective = np.array(problem.fitness(agent[0]))
if MOPSO._dominates(new_objective, agent[1]):
agent[2] = agent[0]
agent[1] = new_objective
external_archive = MOPSO._update_archive(external_archive, swarm, agent_count)
designs = np.array(external_archive[:, 0].tolist())
objectives = np.array(external_archive[:, 1].tolist())
yield (designs, objectives)
@staticmethod
def _update_archive(external_archive: np.ndarray, swarm: np.ndarray, size: int) -> np.ndarray:
"""
Select the top candidate solutions and return them as a new external archive
"""
new_archive = []
aggregate_swarm = []
aggregate_swarm.extend(external_archive.tolist())
aggregate_swarm.extend(swarm.tolist())
aggregate_swarm = np.array(aggregate_swarm)
fronts = MOPSO.nondominated_sort(aggregate_swarm)
# Create new nondominated archive, with some overflow due to front size
i = 0
while len(new_archive) + len(fronts[i]) < size:
new_archive.extend(fronts[i])
i += 1
if new_archive:
new_archive = np.array(new_archive)
else:
new_archive = np.ndarray(shape=(0, 6))
# Calculate density(crowding distance) of new archive
front = np.array(fronts[i])
densities = enumerate(MOPSO._density_estimate(front)) # we want the indices
densities = sorted(densities, key=lambda x: -x[1]) # sort indices based on density
densities = np.fromiter(map(lambda x: x[0], densities), dtype=int) # new ndarray of indices
new_archive = np.concatenate((new_archive, front[densities]))
return new_archive[:size] # return new archive
@staticmethod
def nondominated_sort(swarm: np.ndarray) -> np.ndarray:
fronts = [[]]
for agent in swarm:
agent[4] = []
agent[3] = 0
for other_agent in swarm:
if MOPSO._dominates(agent[1], other_agent[1]):
agent[4].append(other_agent)
elif MOPSO._dominates(other_agent[1], agent[1]):
agent[3] += 1
if agent[3] == 0:
fronts[0].append(agent)
i = 0
while len(fronts[-1]) > 0:
h = []
for agent in fronts[i]:
for other_agent in agent[4]:
other_agent[3] -= 1
if other_agent[3] == 0:
h.append(other_agent)
i += 1
fronts.append(h)
return np.array(fronts)
@staticmethod
def _new_design(bounds: np.ndarray) -> np.ndarray:
return np.fromiter((np.random.uniform(bounds[i, 0], bounds[i, 1]) for i in range(bounds.shape[0])),
dtype=np.float64)
@staticmethod
def _select_leader(non_dominated_set: np.ndarray) -> np.ndarray:
"""
Selects a random leader from the current non-dominated set and returns its design vector
non_dominated_set.shape = agent_count, len(bounds)
return.shape = len(bounds)
"""
return np.random.choice(non_dominated_set[:, 0])
@staticmethod
def _flight(agent: np.ndarray, leader: np.ndarray, bounds: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""
Update the position(design vector) of the agent particle.
Returns the new position.
agent, leader : shape = len(bounds)
return = new_position, velocity : shape = len(bounds), len(bounds)
"""
inertia = 1
cognitive_c1 = 1
social_c2 = 1
v = inertia * agent[5]
v += cognitive_c1 * np.random.rand() * (agent[2] - agent[0])
v += social_c2 * np.random.rand() * (leader - agent[0])
new_position = (agent[0] + v)
for i, bound in enumerate(bounds):
if new_position[i] < bound[0]:
new_position[i] = bound[0]
if new_position[i] > bound[1]:
new_position[i] = bound[1]
return new_position, v
@staticmethod
def _mutate(design_vector: np.ndarray, bounds: np.ndarray) -> np.ndarray:
"""
Add 'craziness' to design vector
Variable-wise polynomial mutation operator with n_m = 20
http://www.inderscienceonline.com/doi/pdf/10.1504/IJAISC.2014.059280
"""
n_m = 20 # n_m = [20,100] usually works
def delta_l(u : float) -> float:
return (2*u) ** (1/(1 + n_m)) - 1
def delta_r(u : float) -> float:
return 1 - (2*(1-u)) ** (1/(1+n_m))
new_design_vector = np.zeros(design_vector.shape[0])
for i, p in enumerate(design_vector):
u = np.random.rand()
x_lower = bounds[i][0]
x_upper = bounds[i][1]
if u <= 0.5:
new_design_vector[i] = p + delta_l(u)*(p - x_lower)
else:
new_design_vector[i] = p + delta_r(u)*(x_upper - p)
return new_design_vector
@staticmethod
def _density_estimate(swarm: np.ndarray) -> np.ndarray:
"""
nearest neighbor density estimate of all agents in swarm
(based off the crowding_distance from NSGA-II)
a.shape = (agent_count, m)
where: m = objective_count
return.shape = (agent_count)
"""
agent_count = swarm.shape[0]
distances = np.zeros(agent_count)
for m in range(swarm[0, 1].shape[0]):
a_ = enumerate(swarm)
a_ = sorted(a_, key=lambda x: x[1][1][m])
distances[a_[0][0]] = float('inf')
distances[a_[-1][0]] = float('inf')
for i in range(1, agent_count - 1):
actual_index = a_[i][0]
distances[actual_index] += a_[i+1][1][1][m] - a_[i-1][1][1][m]
return distances
@staticmethod
def _dominates(a: np.ndarray, b: np.ndarray) -> bool:
"""
Takes 2 objective space vectors and evaluates whether a dominates b.
"""
return any([a_i < b_i for a_i, b_i in zip(a, b)]) and all([a_i <= b_i for a_i, b_i in zip(a, b)])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment