Skip to content

Instantly share code, notes, and snippets.

@jerrylususu
Last active December 25, 2022 07:18
Show Gist options
  • Save jerrylususu/d95e4999d68c9657f4f803ff7edcfcc3 to your computer and use it in GitHub Desktop.
Save jerrylususu/d95e4999d68c9657f4f803ff7edcfcc3 to your computer and use it in GitHub Desktop.
metro cycles
import json
with open("mapping.json","r",encoding="u8") as f:
mapping = json.load(f)
with open("numbered_edges.json","r",encoding="u8") as f:
edges = json.load(f)
with open("lp_6.json","r") as f:
loops = json.load(f)
def recreate_link(edge_list):
pool = list(edge_list)
link = []
current = pool.pop(0)
while len(pool) > 0:
target = None
for idx, (a, b) in enumerate(pool):
if a == current[1]:
target = (a, b)
pool.pop(idx)
break
elif b == current[1]:
target = (b, a)
pool.pop(idx)
break
if target == None:
print("current", current)
print("remain pool:", pool)
raise Exception("Failed")
link.append(current)
current = target
link.append(current)
return link
# 站名转换
def t(val):
return mapping["idx_to_node"][str(val)]
loops = [recreate_link(i) for i in loops]
for i in range(len(loops[0])):
if loops[0][i][0] != loops[1][i][0] or loops[1][i][0]!=loops[2][i][0] or loops[0][i][0]!=loops[2][i][0]:
print("!", i, t(loops[0][i][0]), t(loops[1][i][0]), t(loops[2][i][0]))
else:
print(i, mapping["idx_to_node"][str(loops[0][i][0])])
print([t(i[0]) for i in loops[0]])
import geatpy as ea
import numpy as np
import json
arr = np.load("metro_arr.npy")
with open("numbered_edges.json","r",encoding="u8") as f:
edges = json.load(f)
edges_to_nums = {}
num_to_edges = {}
for idx,[i,j] in enumerate(edges):
edges_to_nums[f"{i}_{j}"] = idx
edges_to_nums[f"{j}_{i}"] = idx
num_to_edges[idx] = (i, j)
def deserialize_loop(loop_np_arr):
idx_li = np.where(loop_np_arr==1)[0].tolist()
return [num_to_edges[i] for i in idx_li]
def recreate_link(edge_list):
pool = list(edge_list)
link = []
current = pool.pop(0)
while len(pool) > 0:
target = None
for idx, (a, b) in enumerate(pool):
if a == current[1]:
target = (a, b)
pool.pop(idx)
break
elif b == current[1]:
target = (b, a)
pool.pop(idx)
break
if target == None:
return False
link.append(current)
current = target
link.append(current)
return True
def check_node_only_used_twice(base):
egde_idx_li = np.where(base==True)[0].tolist()
used_map = {}
for edge_idx in egde_idx_li:
a, b = num_to_edges[edge_idx]
if a not in used_map:
used_map[a] = 0
if b not in used_map:
used_map[b] = 0
used_map[a] += 1
used_map[b] += 1
if used_map[a] > 2 or used_map[b] > 2:
return False
return True
def check(base):
if not check_node_only_used_twice(base):
return False
loop = deserialize_loop(base)
if not recreate_link(loop):
return False
return True
def calc(idx_li):
if len(idx_li) == 0:
return -1
base = np.copy(arr[idx_li[0]])
for i in idx_li[1:]:
if np.count_nonzero( np.logical_and(base, arr[i]) ) > 0:
# ok, can continue
base = np.logical_xor(base, arr[i])
valid = check(base)
if not valid:
return -1
else:
# fail, return 0
return 0
return np.count_nonzero(base)
def run(edge_line_arr):
idx_li = np.where(edge_line_arr==True)[0].tolist()
vals = []
for i in range(len(idx_li)):
vals.append(calc(idx_li[i:]))
vals.append(calc(idx_li[:i]))
li2 = list(idx_li)
del li2[i]
vals.append(calc(li2))
return max(vals)
# 构建问题
r = 1 # 目标函数需要用到的额外数据
@ea.Problem.single
def evalVars(Vars): # 定义目标函数(含约束)
f = run(Vars)
return f
problem = ea.Problem(name='metro loop',
M=1, # 目标维数
maxormins=[-1], # 目标最小最大化标记列表,1:最小化该目标;-1:最大化该目标
Dim=48, # 决策变量维数
varTypes=[1 for i in range(48)], # 决策变量的类型列表,0:实数;1:整数
lb=[0 for i in range(48)], # 决策变量下界
ub=[1 for i in range(48)], # 决策变量上界
lbin=[1 for i in range(48)],
ubin=[1 for i in range(48)],
evalVars=evalVars)
# 构建算法
algorithm = ea.soea_SEGA_templet(problem,
ea.Population(Encoding='RI', NIND=50),
MAXGEN=500, # 最大进化代数。
logTras=1, # 表示每隔多少代记录一次日志信息,0表示不记录。
trappedValue=1e-6, # 单目标优化陷入停滞的判断阈值。
maxTrappedCount=50) # 进化停滞计数器最大上限值。
# 求解
for i in range(500):
print("seed=",i)
res = ea.optimize(algorithm, seed=i, verbose=True, drawing=1, outputMsg=True, drawLog=False, saveFlag=True, dirName=f'result/seed_{i}')
print("i=", i, "res=", res["ObjV"][0][0])
with open("final_evo.txt","a",encoding="u8") as f:
f.write(f'seed={i}, val={res["ObjV"][0][0]}\n')
import json
from graphillion import GraphSet
def recreate_link(edge_list):
pool = list(edge_list)
link = []
current = pool.pop(0)
while len(pool) > 0:
target = None
for idx, (a, b) in enumerate(pool):
if a == current[1]:
target = (a, b)
pool.pop(idx)
break
elif b == current[1]:
target = (b, a)
pool.pop(idx)
break
if target == None:
print("current", current)
print("remain pool:", pool)
raise Exception("Failed")
link.append(current)
current = target
link.append(current)
print("remain pool:", pool)
return link
with open("numbered_edges.json","r",encoding="u8") as f:
edges = json.load(f)
universe = list(set([(i[0],i[1]) for i in edges]))
GraphSet.set_universe(universe)
# gs = GraphSet([universe])
# print(gs.len())
loops = GraphSet.cycles(is_hamilton=False)
print(loops.len()) # 19314368669
# for i in range(150,170):
# lar = loops.larger(i)
# print(i, lar.len())
# result:
# 150 723300
# 151 456628
# 152 280951
# 153 167383
# 154 95999
# 155 53104
# 156 28306
# 157 14292
# 158 6718
# 159 2962
# 160 1227
# 161 462
# 162 128
# 163 23
# 164 3
# 165 0
# 166 0
# 167 0
# 168 0
# 169 0
lar = loops.larger(164)
print(164, lar.len())
final_loop_li = []
for idx, lp in enumerate(iter(lar)):
print("idx", idx, "len", len(lp))
final_loop_li.append(lp)
link = recreate_link(lp)
print(link)
with open("lp_6.json","w") as f:
json.dump(final_loop_li, f)
# 这段代码有两个部分
# 上半部分:构建 metro_arr.npy 文件,用于遗传算法的数据输入
# 下半部分:从遗传算法的结果,返回出环路上的站点列表
# 使用时可以先注释掉下半部分
import json
import numpy as np
with open("mapping.json","r",encoding="u8") as f:
mapping = json.load(f)
with open("numbered_edges.json","r",encoding="u8") as f:
edges = json.load(f)
# build bi-directional mapping
edges_to_nums = {}
num_to_edges = {}
for idx,[i,j] in enumerate(edges):
edges_to_nums[f"{i}_{j}"] = idx
edges_to_nums[f"{j}_{i}"] = idx
num_to_edges[idx] = (i, j)
with open("cycle_basics.json","r",encoding="u8") as f:
cycle_basics = json.load(f)
print(len(edges))
print(cycle_basics[0])
def deserialize_loop(loop_np_arr):
idx_li = np.where(loop_np_arr==1)[0].tolist()
return [num_to_edges[i] for i in idx_li]
def recreate_link(edge_list):
pool = list(edge_list)
link = []
current = pool.pop(0)
while len(pool) > 0:
target = None
for idx, (a, b) in enumerate(pool):
if a == current[1]:
target = (a, b)
pool.pop(idx)
break
elif b == current[1]:
target = (b, a)
pool.pop(idx)
break
if target == None:
print("current", current)
print("remain pool:", pool)
raise Exception("Failed")
link.append(current)
current = target
link.append(current)
print("remain pool:", pool)
return link
arr = np.full((48,330),False,dtype=bool)
for cycle_idx, cycle_basic in enumerate(cycle_basics):
# print(cycle_idx, cycle_basic)
for idx in range(len(cycle_basic) - 1):
edge_str = f"{cycle_basic[idx]}_{cycle_basic[idx+1]}"
edge_num = edges_to_nums[edge_str]
arr[cycle_idx][edge_num] = True
edge_str = f"{cycle_basic[len(cycle_basic) - 1]}_{cycle_basic[0]}"
edge_num = edges_to_nums[edge_str]
arr[cycle_idx][edge_num] = True
np.save("metro_arr.npy", arr)
print("saved arr")
### 上半部分结束
### 下半部分开始
def calc(idx_li):
if len(idx_li) == 0:
return -1
base = np.copy(arr[idx_li[0]])
for i in idx_li[1:]:
if np.count_nonzero( np.logical_and(base, arr[i]) ) > 0:
# ok, can continue
base = np.logical_xor(base, arr[i])
valid = check_node_only_used_twice(base)
if not valid:
return 0
else:
# fail, return 0
return 0
return np.count_nonzero(base)
def build(idx_li):
if len(idx_li) == 0:
return -1
base = np.copy(arr[idx_li[0]])
for i in idx_li[1:]:
if np.count_nonzero( np.logical_and(base, arr[i]) ) > 0:
# ok, can continue
base = np.logical_xor(base, arr[i])
valid = check_node_only_used_twice(base)
if not valid:
return 0
else:
# fail, return 0
return 0
valid = check_node_only_used_twice(base, show=True)
return base
def run(edge_line_arr):
idx_li = np.where(edge_line_arr==1)[0].tolist()
vals = []
counter = 0
inputs = []
for i in range(len(idx_li)):
vals.append(calc(idx_li[i:]))
counter += 1
inputs.append(idx_li[i:])
vals.append(calc(idx_li[:i]))
counter += 1
inputs.append(idx_li[:i])
li2 = list(idx_li)
del li2[i]
vals.append(calc(li2))
counter += 1
inputs.append(li2)
print(vals)
m, m_idx = -1, -1
for idx, val in enumerate(vals):
if val > m:
m = val
m_idx = idx
print("max_idx", m_idx, "max", m)
return max(vals), build(inputs[m_idx])
best = np.array([0, 0, 0, 0, 1, 1, 0, 1, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 1, 1, 1, 0, 1, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 0, 0, 0, 1,
0, 0, 1, 0])
print(run(best))
# best_len = run(best)
best_len, best_loop = run(best)
print("result:", best_len)
edge_list = deserialize_loop(best_loop)
print("edge list len", len(edge_list))
print(edge_list)
link = recreate_link(edge_list)
print("link", link)
def rebuild_mapping(link):
new_link = []
for a,b in link:
new_link.append((mapping["idx_to_node"][str(a)], mapping["idx_to_node"][str(b)]))
return new_link
rebuild_link = rebuild_mapping(link)
print("rebuild link", [i[0] for i in rebuild_link])
原帖:https://bgm.tv/group/topic/375970
TLDR:本 gist 试图找出深圳地铁图上的最长环
新算法运行说明
1. 把 `metroStationsList.js` 里的站点信息单独存储成一个 `all_lines.json`
2. 运行 `prepare_graph.py`
3. 运行 `find_graphillion.py`
4. 运行 `decode_result.py`
--- (以下废弃)
旧算法运行说明
1. 把 `metroStationsList.js` 里的站点信息单独存储成一个 `all_lines.json`
2. 运行 `prepare_graph.py`
3. 运行 `prepare_cycle.py`
4. 注释掉 `meta.py` 的下半部分,然后运行 `meta.py`
5. 运行 `find_evo.py` 进行计算
(seed=353 时得到了长度为 135 的结果)
6. 编辑 `meta.py` 把遗传算法的结果(seed_n 中的 `result info.txt` 文件中的 `Vars`)贴到下半部分的 `best` 变量中,得到最终的路径。
# 从所有边 `graph.json` 得到数字编号的边 `numbered_edges.json` 和 cycle bases `cycle_basics.json`
# 用数字编号是因为这样可能处理起来比字符串稍微快一点?
import networkx as nx
import json
G = nx.Graph()
with open("graph.json","r",encoding="u8") as f:
edges = json.load(f)
idx_to_node = {}
node_to_idx = {}
idx = 0
for n1, n2 in edges:
if n1 not in node_to_idx:
idx+=1
node_to_idx[n1] = idx
idx_to_node[idx] = n1
if n2 not in node_to_idx:
idx+=1
node_to_idx[n2] = idx
idx_to_node[idx] = n2
G.add_nodes_from(idx_to_node.keys())
with open("mapping.json","w",encoding="u8") as f:
json.dump({
"node_to_idx": node_to_idx,
"idx_to_node": idx_to_node
},f,ensure_ascii=False, indent=2)
numbered_edges = []
for n1, n2 in edges:
G.add_edge(node_to_idx[n1], node_to_idx[n2])
numbered_edges.append([node_to_idx[n1], node_to_idx[n2]])
with open("numbered_edges.json","w",encoding="u8") as f:
json.dump(numbered_edges, f)
print(G)
with open("cycle_basics.json","w",encoding="u8") as f:
li = list(nx.cycle_basis(G))
json.dump(li, f)
print(f"cycle bases: len={len(li)}")
# 从站点 JSON 得到图上所有边 `graph.json`
import json
with open("all_lines.json", "r", encoding="u8") as f:
lines_arr = json.load(f)
print(lines_arr[0].keys())
print(lines_arr[0]["stationList"][0])
print(lines_arr[0]["stationList"][0].keys())
line_no_to_station_list = {}
for line in lines_arr:
stations_in_line = []
for station in line["stationList"]:
# build initial list
stations_in_line.append([station["stationName"], [i["NO"] for i in station["includesLine"]]])
line_no_to_station_list[line["line"]] = stations_in_line
print(line_no_to_station_list)
graph = []
for line, stations in line_no_to_station_list.items():
for idx, (station, includesLineList) in enumerate(stations):
if idx-1>=0:
graph.append([stations[idx-1][0], station])
print(graph)
with open("graph.json","w",encoding="u8") as f:
json.dump(graph, f, ensure_ascii=False, indent=2)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment