Created
May 29, 2023 17:54
-
-
Save dot1mav/8563411b3085f7825925ea14a18e4ded to your computer and use it in GitHub Desktop.
network flow
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
25 | |
1 0 2 20 3 20 4 20 5 20 6 20 | |
2 1 7 10 8 8 | |
3 1 7 6 8 5 9 4 | |
4 1 8 5 9 6 10 10 | |
5 1 9 10 10 4 11 10 | |
6 1 10 10 11 10 | |
7 1 13 15 | |
8 1 12 15 14 15 | |
9 1 13 15 15 15 | |
10 1 14 15 16 15 | |
11 1 15 15 | |
12 1 17 20 | |
13 1 17 20 18 20 | |
14 1 18 20 19 20 | |
15 1 19 20 20 20 | |
16 1 20 20 21 20 | |
17 1 18 8 22 25 | |
18 1 19 8 22 25 23 20 | |
19 1 20 8 23 10 24 25 | |
20 1 23 25 24 25 | |
21 1 23 25 | |
22 1 -1 30 | |
23 1 -1 30 | |
24 1 -1 30 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import List, Tuple | |
from pathlib import Path | |
from tqdm import tqdm | |
from time import strftime, gmtime, time | |
import random | |
import numpy as np | |
import networkx as nx | |
import matplotlib.pyplot as plt | |
def is_degenerated(x: list) -> bool: | |
return False if len(x) % 2 == 0 else True | |
def build_node_idx(n_idx: int, sp_node: dict) -> int: | |
return n_idx - 1 if n_idx not in list(sp_node.keys()) else sp_node[n_idx] | |
def read_txt_file(p: Path) -> List[List[int]]: | |
lines = [] | |
with open(str(p), 'r', encoding="utf-8") as f: | |
for line in f.readlines(): | |
line = line.strip("\n") | |
line = [int(l) for l in line.split(" ") if l != ""] | |
lines.append(line) | |
return lines | |
def make_graph(file_path='./graph.txt') -> nx.Graph: | |
lines = read_txt_file(file_path) | |
n_nodes = lines[0][0] | |
source_idx = 0 | |
sink_idx = n_nodes - 1 | |
special_nodes_idx = {0: source_idx, -1: sink_idx} | |
capacities = np.zeros((n_nodes, n_nodes), dtype=np.int32) | |
all_edges = [] | |
for line in lines[1:]: | |
n_idx = build_node_idx(line[0], special_nodes_idx) | |
n_edges = line[2:] | |
if is_degenerated(n_edges): | |
raise Exception("this formatted file is degenerated!!") | |
edges = [] | |
for idx in range(0, len(n_edges), 2): | |
e_idx = build_node_idx(n_edges[idx], special_nodes_idx) | |
e_cap = n_edges[idx + 1] | |
edges.append((n_idx, e_idx, e_cap)) | |
capacities[n_idx, e_idx] = e_cap | |
all_edges += edges | |
graph = nx.Graph() | |
labels = {} | |
for s, e, c in all_edges: | |
graph.add_edge(s, e, capacity=c) | |
labels[(s, e)] = c | |
pos = nx.spring_layout(graph) | |
nx.draw(graph, pos, with_labels=True) | |
nx.draw_networkx_edge_labels(graph, pos, edge_labels=labels) | |
plt.savefig('graph.png') | |
return graph | |
class Genetic: | |
def __init__(self, population_size: int, max_iterations: int, mutation_rate: float, crossover_rate: float): | |
self._population_size: int = population_size | |
self._max_iterations: int = max_iterations | |
self._mutation_rate: float = mutation_rate | |
self._crossover_rate: float = crossover_rate | |
self._graph: nx.Graph = make_graph() | |
self._max_fit: float = self._fitness(self._graph) | |
self._min_fit: float = self._max_fit / 4 | |
print("Max fitness: {}".format(self._max_fit)) | |
print("Min fitness: {}".format(self._min_fit)) | |
self._population_list: List[Tuple[nx.Graph, float]] = [] | |
self._new_population_list: List[Tuple[nx.Graph, float]] = [] | |
self._best_generation: List[Tuple[nx.Graph, float, int]] = [] | |
def _update_list(self, generation: int): | |
if self._new_population_list: | |
self._new_population_list.sort(key=lambda x: x[1], reverse=True) | |
self._best_generation.append((self._new_population_list[0][0], self._new_population_list[0][1], generation)) | |
self._population_list += self._new_population_list.copy() | |
self._new_population_list.clear() | |
def _selection(self, clear_: bool = True): | |
self._population_list.sort(key=lambda x: x[1], reverse=True) | |
if clear_: | |
for index, val in enumerate(self._population_list): | |
if val[1] <= self._min_fit: | |
self._population_list.pop(index) | |
def _create_population(self): | |
progress = tqdm(range(self._population_size), desc="Creating population", unit="chromosome", | |
total=self._population_size, ncols=100, position=0, leave=None) | |
for _ in range(self._population_size): | |
graph: nx.Graph = self._graph.copy() | |
for node in graph.nodes: | |
for neighbor in graph.neighbors(node): | |
capacity = random.randint(1, graph[node][neighbor]['capacity']) | |
graph[node][neighbor]['capacity'] = capacity | |
self._population_list.append((graph, self._fitness(graph))) | |
progress.update(1) | |
progress.disable = True | |
progress.close() | |
def _fitness(self, graph: nx.Graph) -> float: | |
return nx.maximum_flow_value(graph, 0, graph.number_of_nodes() - 1) | |
def _crossover(self): | |
selected_list: List[Tuple[nx.Graph, float]] = [] | |
progress = tqdm(range(self._population_size // 2), desc="Crossover", unit="chromosome", leave=None, | |
position=0) | |
for _ in range(len(self._population_list) // 2): | |
if random.random() / 10 <= self._crossover_rate: | |
while True: | |
parent_list = random.sample(self._population_list, 2) | |
if parent_list not in selected_list: | |
selected_list.extend(parent_list) | |
break | |
mid = random.choice(list(self._graph.nodes)) | |
new_graph: nx.Graph = nx.Graph() | |
new_graph.add_nodes_from(self._graph.nodes) | |
for node in range(mid): | |
for target in parent_list[0][0][node]: | |
new_graph.add_edge(node, target, capacity=parent_list[0][0][node][target]['capacity']) | |
for node in range(mid, self._graph.number_of_nodes()): | |
for target in parent_list[1][0][node]: | |
new_graph.add_edge(node, target, capacity=parent_list[1][0][node][target]['capacity']) | |
self._new_population_list.append((new_graph, self._fitness(new_graph))) | |
progress.update(1) | |
progress.disable = True | |
progress.close() | |
def _mutation(self): | |
progress = tqdm(range(self._population_size), desc="Mutation", unit="chromosome", leave=None, | |
position=0) | |
for index, value in enumerate(self._new_population_list): | |
if random.random() / 10 <= self._mutation_rate: | |
graph: nx.Graph = value[0] | |
nodes = list(graph.nodes) | |
for _ in range(len(nodes) // 3): | |
node = random.choice(nodes) | |
nodes.remove(node) | |
neighbor = random.choice(list(graph.neighbors(node))) | |
new_capacity = random.randint(graph[node][neighbor]['capacity'], | |
self._graph[node][neighbor]['capacity']) | |
graph[node][neighbor]['capacity'] = new_capacity | |
fitness = self._fitness(graph) | |
self._new_population_list[index] = (graph, fitness) | |
progress.update(1) | |
progress.disable = True | |
progress.close() | |
def run(self): | |
self._create_population() | |
self._selection(False) | |
self._best_generation.append((self._population_list[0][0], self._population_list[0][1], 0)) | |
for generation in tqdm(range(1, self._max_iterations + 1), desc='Generation', unit='Generation', | |
total=self._max_iterations, ncols=100, position=1): | |
if self._population_list[0][1] == self._max_fit: | |
break | |
self._crossover() | |
self._mutation() | |
self._update_list(generation) | |
self._selection() | |
print("Best fitness: ", self._population_list[0][1]) | |
time_run = strftime('%H%M%S', gmtime(time())) | |
plt.figure(f'Best-Generation_{time_run}') | |
plt.plot([x[2] for x in self._best_generation], [x[1] for x in self._best_generation], label='Best Generation') | |
plt.savefig(f'Best-Generation_{time_run}.png') | |
start = time() | |
Genetic(population_size=1000, max_iterations=100, mutation_rate=0.005, crossover_rate=0.06).run() | |
end = time() | |
print(f'Time: {end - start}') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
numpy~=1.21.5 | |
networkx~=2.6.3 | |
matplotlib~=3.5.0 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment