Created
December 14, 2017 16:13
-
-
Save sp4ghet/b3cab24a749ad1f2088efce367446b00 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
from typing import Generator, Tuple, Callable | |
from abc import ABCMeta, abstractmethod | |
class IProblem(metaclass=ABCMeta): | |
@staticmethod | |
@abstractmethod | |
def fitness(X: np.ndarray) -> Tuple[np.ndarray, ...]: | |
pass | |
@staticmethod | |
@abstractmethod | |
def perfect_pareto_front() -> np.ndarray: | |
raise NotImplementedError | |
class MOPSO: | |
@staticmethod | |
def call(problem: IProblem, epochs: int, agent_count: int) -> Generator[Tuple[np.ndarray, np.ndarray], None, None]: | |
swarm = np.array( [[MOPSO._new_design(problem.bounds), # 0 Design Vector | |
None, # 1 Objective | |
None, # 2 Personal Best Design Vector | |
0, # 3 domination count | |
None, # 4 dominating_set S | |
np.zeros(len(problem.bounds)) # 5 velocity | |
] for _ in range(agent_count)]) | |
# Initialize objective vector | |
for agent in swarm: | |
agent[1] = np.array(problem.fitness(agent[0])) | |
swarm[:, 2] = swarm[:, 0] | |
external_archive = MOPSO._update_archive(swarm, swarm, agent_count) | |
for i in range(1, epochs+1): | |
for agent in swarm: | |
# select design vector of random leader | |
leader = MOPSO._select_leader(external_archive) | |
# Update position of agent | |
agent[0], agent[5] = MOPSO._flight(agent, leader, problem.bounds) | |
# Conduct mutation | |
agent[0] = MOPSO._mutate(agent[0], problem.bounds) | |
# Evaluate fitness and update pbest if necessary | |
new_objective = np.array(problem.fitness(agent[0])) | |
if MOPSO._dominates(new_objective, agent[1]): | |
agent[2] = agent[0] | |
agent[1] = new_objective | |
external_archive = MOPSO._update_archive(external_archive, swarm, agent_count) | |
designs = np.array(external_archive[:, 0].tolist()) | |
objectives = np.array(external_archive[:, 1].tolist()) | |
yield (designs, objectives) | |
@staticmethod | |
def _update_archive(external_archive: np.ndarray, swarm: np.ndarray, size: int) -> np.ndarray: | |
""" | |
Select the top candidate solutions and return them as a new external archive | |
""" | |
new_archive = [] | |
aggregate_swarm = [] | |
aggregate_swarm.extend(external_archive.tolist()) | |
aggregate_swarm.extend(swarm.tolist()) | |
aggregate_swarm = np.array(aggregate_swarm) | |
fronts = MOPSO.nondominated_sort(aggregate_swarm) | |
# Create new nondominated archive, with some overflow due to front size | |
i = 0 | |
while len(new_archive) + len(fronts[i]) < size: | |
new_archive.extend(fronts[i]) | |
i += 1 | |
if new_archive: | |
new_archive = np.array(new_archive) | |
else: | |
new_archive = np.ndarray(shape=(0, 6)) | |
# Calculate density(crowding distance) of new archive | |
front = np.array(fronts[i]) | |
densities = enumerate(MOPSO._density_estimate(front)) # we want the indices | |
densities = sorted(densities, key=lambda x: -x[1]) # sort indices based on density | |
densities = np.fromiter(map(lambda x: x[0], densities), dtype=int) # new ndarray of indices | |
new_archive = np.concatenate((new_archive, front[densities])) | |
return new_archive[:size] # return new archive | |
@staticmethod | |
def nondominated_sort(swarm: np.ndarray) -> np.ndarray: | |
fronts = [[]] | |
for agent in swarm: | |
agent[4] = [] | |
agent[3] = 0 | |
for other_agent in swarm: | |
if MOPSO._dominates(agent[1], other_agent[1]): | |
agent[4].append(other_agent) | |
elif MOPSO._dominates(other_agent[1], agent[1]): | |
agent[3] += 1 | |
if agent[3] == 0: | |
fronts[0].append(agent) | |
i = 0 | |
while len(fronts[-1]) > 0: | |
h = [] | |
for agent in fronts[i]: | |
for other_agent in agent[4]: | |
other_agent[3] -= 1 | |
if other_agent[3] == 0: | |
h.append(other_agent) | |
i += 1 | |
fronts.append(h) | |
return np.array(fronts) | |
@staticmethod | |
def _new_design(bounds: np.ndarray) -> np.ndarray: | |
return np.fromiter((np.random.uniform(bounds[i, 0], bounds[i, 1]) for i in range(bounds.shape[0])), | |
dtype=np.float64) | |
@staticmethod | |
def _select_leader(non_dominated_set: np.ndarray) -> np.ndarray: | |
""" | |
Selects a random leader from the current non-dominated set and returns its design vector | |
non_dominated_set.shape = agent_count, len(bounds) | |
return.shape = len(bounds) | |
""" | |
return np.random.choice(non_dominated_set[:, 0]) | |
@staticmethod | |
def _flight(agent: np.ndarray, leader: np.ndarray, bounds: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: | |
""" | |
Update the position(design vector) of the agent particle. | |
Returns the new position. | |
agent, leader : shape = len(bounds) | |
return = new_position, velocity : shape = len(bounds), len(bounds) | |
""" | |
inertia = 1 | |
cognitive_c1 = 1 | |
social_c2 = 1 | |
v = inertia * agent[5] | |
v += cognitive_c1 * np.random.rand() * (agent[2] - agent[0]) | |
v += social_c2 * np.random.rand() * (leader - agent[0]) | |
new_position = (agent[0] + v) | |
for i, bound in enumerate(bounds): | |
if new_position[i] < bound[0]: | |
new_position[i] = bound[0] | |
if new_position[i] > bound[1]: | |
new_position[i] = bound[1] | |
return new_position, v | |
@staticmethod | |
def _mutate(design_vector: np.ndarray, bounds: np.ndarray) -> np.ndarray: | |
""" | |
Add 'craziness' to design vector | |
Variable-wise polynomial mutation operator with n_m = 20 | |
http://www.inderscienceonline.com/doi/pdf/10.1504/IJAISC.2014.059280 | |
""" | |
n_m = 20 # n_m = [20,100] usually works | |
def delta_l(u : float) -> float: | |
return (2*u) ** (1/(1 + n_m)) - 1 | |
def delta_r(u : float) -> float: | |
return 1 - (2*(1-u)) ** (1/(1+n_m)) | |
new_design_vector = np.zeros(design_vector.shape[0]) | |
for i, p in enumerate(design_vector): | |
u = np.random.rand() | |
x_lower = bounds[i][0] | |
x_upper = bounds[i][1] | |
if u <= 0.5: | |
new_design_vector[i] = p + delta_l(u)*(p - x_lower) | |
else: | |
new_design_vector[i] = p + delta_r(u)*(x_upper - p) | |
return new_design_vector | |
@staticmethod | |
def _density_estimate(swarm: np.ndarray) -> np.ndarray: | |
""" | |
nearest neighbor density estimate of all agents in swarm | |
(based off the crowding_distance from NSGA-II) | |
a.shape = (agent_count, m) | |
where: m = objective_count | |
return.shape = (agent_count) | |
""" | |
agent_count = swarm.shape[0] | |
distances = np.zeros(agent_count) | |
for m in range(swarm[0, 1].shape[0]): | |
a_ = enumerate(swarm) | |
a_ = sorted(a_, key=lambda x: x[1][1][m]) | |
distances[a_[0][0]] = float('inf') | |
distances[a_[-1][0]] = float('inf') | |
for i in range(1, agent_count - 1): | |
actual_index = a_[i][0] | |
distances[actual_index] += a_[i+1][1][1][m] - a_[i-1][1][1][m] | |
return distances | |
@staticmethod | |
def _dominates(a: np.ndarray, b: np.ndarray) -> bool: | |
""" | |
Takes 2 objective space vectors and evaluates whether a dominates b. | |
""" | |
return any([a_i < b_i for a_i, b_i in zip(a, b)]) and all([a_i <= b_i for a_i, b_i in zip(a, b)]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment