Skip to content

Instantly share code, notes, and snippets.

@dot1mav
Created May 29, 2023 17:54
Show Gist options
  • Save dot1mav/8563411b3085f7825925ea14a18e4ded to your computer and use it in GitHub Desktop.
Save dot1mav/8563411b3085f7825925ea14a18e4ded to your computer and use it in GitHub Desktop.
network flow
25
1 0 2 20 3 20 4 20 5 20 6 20
2 1 7 10 8 8
3 1 7 6 8 5 9 4
4 1 8 5 9 6 10 10
5 1 9 10 10 4 11 10
6 1 10 10 11 10
7 1 13 15
8 1 12 15 14 15
9 1 13 15 15 15
10 1 14 15 16 15
11 1 15 15
12 1 17 20
13 1 17 20 18 20
14 1 18 20 19 20
15 1 19 20 20 20
16 1 20 20 21 20
17 1 18 8 22 25
18 1 19 8 22 25 23 20
19 1 20 8 23 10 24 25
20 1 23 25 24 25
21 1 23 25
22 1 -1 30
23 1 -1 30
24 1 -1 30
from typing import List, Tuple
from pathlib import Path
from tqdm import tqdm
from time import strftime, gmtime, time
import random
import numpy as np
import networkx as nx
import matplotlib.pyplot as plt
def is_degenerated(x: list) -> bool:
return False if len(x) % 2 == 0 else True
def build_node_idx(n_idx: int, sp_node: dict) -> int:
return n_idx - 1 if n_idx not in list(sp_node.keys()) else sp_node[n_idx]
def read_txt_file(p: Path) -> List[List[int]]:
lines = []
with open(str(p), 'r', encoding="utf-8") as f:
for line in f.readlines():
line = line.strip("\n")
line = [int(l) for l in line.split(" ") if l != ""]
lines.append(line)
return lines
def make_graph(file_path='./graph.txt') -> nx.Graph:
lines = read_txt_file(file_path)
n_nodes = lines[0][0]
source_idx = 0
sink_idx = n_nodes - 1
special_nodes_idx = {0: source_idx, -1: sink_idx}
capacities = np.zeros((n_nodes, n_nodes), dtype=np.int32)
all_edges = []
for line in lines[1:]:
n_idx = build_node_idx(line[0], special_nodes_idx)
n_edges = line[2:]
if is_degenerated(n_edges):
raise Exception("this formatted file is degenerated!!")
edges = []
for idx in range(0, len(n_edges), 2):
e_idx = build_node_idx(n_edges[idx], special_nodes_idx)
e_cap = n_edges[idx + 1]
edges.append((n_idx, e_idx, e_cap))
capacities[n_idx, e_idx] = e_cap
all_edges += edges
graph = nx.Graph()
labels = {}
for s, e, c in all_edges:
graph.add_edge(s, e, capacity=c)
labels[(s, e)] = c
pos = nx.spring_layout(graph)
nx.draw(graph, pos, with_labels=True)
nx.draw_networkx_edge_labels(graph, pos, edge_labels=labels)
plt.savefig('graph.png')
return graph
class Genetic:
def __init__(self, population_size: int, max_iterations: int, mutation_rate: float, crossover_rate: float):
self._population_size: int = population_size
self._max_iterations: int = max_iterations
self._mutation_rate: float = mutation_rate
self._crossover_rate: float = crossover_rate
self._graph: nx.Graph = make_graph()
self._max_fit: float = self._fitness(self._graph)
self._min_fit: float = self._max_fit / 4
print("Max fitness: {}".format(self._max_fit))
print("Min fitness: {}".format(self._min_fit))
self._population_list: List[Tuple[nx.Graph, float]] = []
self._new_population_list: List[Tuple[nx.Graph, float]] = []
self._best_generation: List[Tuple[nx.Graph, float, int]] = []
def _update_list(self, generation: int):
if self._new_population_list:
self._new_population_list.sort(key=lambda x: x[1], reverse=True)
self._best_generation.append((self._new_population_list[0][0], self._new_population_list[0][1], generation))
self._population_list += self._new_population_list.copy()
self._new_population_list.clear()
def _selection(self, clear_: bool = True):
self._population_list.sort(key=lambda x: x[1], reverse=True)
if clear_:
for index, val in enumerate(self._population_list):
if val[1] <= self._min_fit:
self._population_list.pop(index)
def _create_population(self):
progress = tqdm(range(self._population_size), desc="Creating population", unit="chromosome",
total=self._population_size, ncols=100, position=0, leave=None)
for _ in range(self._population_size):
graph: nx.Graph = self._graph.copy()
for node in graph.nodes:
for neighbor in graph.neighbors(node):
capacity = random.randint(1, graph[node][neighbor]['capacity'])
graph[node][neighbor]['capacity'] = capacity
self._population_list.append((graph, self._fitness(graph)))
progress.update(1)
progress.disable = True
progress.close()
def _fitness(self, graph: nx.Graph) -> float:
return nx.maximum_flow_value(graph, 0, graph.number_of_nodes() - 1)
def _crossover(self):
selected_list: List[Tuple[nx.Graph, float]] = []
progress = tqdm(range(self._population_size // 2), desc="Crossover", unit="chromosome", leave=None,
position=0)
for _ in range(len(self._population_list) // 2):
if random.random() / 10 <= self._crossover_rate:
while True:
parent_list = random.sample(self._population_list, 2)
if parent_list not in selected_list:
selected_list.extend(parent_list)
break
mid = random.choice(list(self._graph.nodes))
new_graph: nx.Graph = nx.Graph()
new_graph.add_nodes_from(self._graph.nodes)
for node in range(mid):
for target in parent_list[0][0][node]:
new_graph.add_edge(node, target, capacity=parent_list[0][0][node][target]['capacity'])
for node in range(mid, self._graph.number_of_nodes()):
for target in parent_list[1][0][node]:
new_graph.add_edge(node, target, capacity=parent_list[1][0][node][target]['capacity'])
self._new_population_list.append((new_graph, self._fitness(new_graph)))
progress.update(1)
progress.disable = True
progress.close()
def _mutation(self):
progress = tqdm(range(self._population_size), desc="Mutation", unit="chromosome", leave=None,
position=0)
for index, value in enumerate(self._new_population_list):
if random.random() / 10 <= self._mutation_rate:
graph: nx.Graph = value[0]
nodes = list(graph.nodes)
for _ in range(len(nodes) // 3):
node = random.choice(nodes)
nodes.remove(node)
neighbor = random.choice(list(graph.neighbors(node)))
new_capacity = random.randint(graph[node][neighbor]['capacity'],
self._graph[node][neighbor]['capacity'])
graph[node][neighbor]['capacity'] = new_capacity
fitness = self._fitness(graph)
self._new_population_list[index] = (graph, fitness)
progress.update(1)
progress.disable = True
progress.close()
def run(self):
self._create_population()
self._selection(False)
self._best_generation.append((self._population_list[0][0], self._population_list[0][1], 0))
for generation in tqdm(range(1, self._max_iterations + 1), desc='Generation', unit='Generation',
total=self._max_iterations, ncols=100, position=1):
if self._population_list[0][1] == self._max_fit:
break
self._crossover()
self._mutation()
self._update_list(generation)
self._selection()
print("Best fitness: ", self._population_list[0][1])
time_run = strftime('%H%M%S', gmtime(time()))
plt.figure(f'Best-Generation_{time_run}')
plt.plot([x[2] for x in self._best_generation], [x[1] for x in self._best_generation], label='Best Generation')
plt.savefig(f'Best-Generation_{time_run}.png')
start = time()
Genetic(population_size=1000, max_iterations=100, mutation_rate=0.005, crossover_rate=0.06).run()
end = time()
print(f'Time: {end - start}')
numpy~=1.21.5
networkx~=2.6.3
matplotlib~=3.5.0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment