Skip to content

Instantly share code, notes, and snippets.

@nickludlam
Last active January 31, 2021 02:37
Show Gist options
  • Save nickludlam/4855f400a28c9bc7ff1daedd63434dd6 to your computer and use it in GitHub Desktop.
Save nickludlam/4855f400a28c9bc7ff1daedd63434dd6 to your computer and use it in GitHub Desktop.
Looking at how to optimise the Travelling Trucker problem for The Hutton Orbital Truckers group
import sys
import time
import json
import random
import operator
from enum import Enum
import vector
import networkx as nx
from networkx.algorithms.tournament import hamiltonian_path
from networkx.algorithms import tournament
import matplotlib.pyplot as plt
from nose.tools import assert_equal
from nose.tools import assert_false
from nose.tools import assert_true
class StarSystem:
def __init__(self, system_name, position_dict):
self.name = system_name
self.position = vector.Vector3(position_dict['x'], position_dict['y'], position_dict['z'])
self.distances = {}
def add_distance(self, star_system, distance):
self.distances[star_system] = distance
def add_distances(self, distances_dictionary):
self.distances = distances_dictionary
def distance_to(self, target_system):
return self.distances[target_system]
def ordered_distances(self):
#print(f"ordered_distances has {len(self.distances)} elements")
sorted_pairs = sorted(self.distances.items(), key=operator.itemgetter(1))
#print(sorted_pairs)
return sorted_pairs
with open('source.json') as f:
data = json.load(f)
# We use a global because this is a hacky script that I made up as I went
all_systems = []
for name in data:
position_dict = data[name]
new_star_system = StarSystem(name, position_dict)
all_systems.append(new_star_system)
all_systems_len = len(all_systems)
for idx, system in enumerate(all_systems):
print(f"{idx} : {system.name}")
# Test vector maths
#s0 = all_systems[0]
#s1 = all_systems[1]
#relative_position = s1.position - s0.position
# print(f"relative_position.magnitude={relative_position.magnitude()}")
# Generate our distance cache inside the all_systems data structure
for start_star_system in all_systems:
#print(f"----- {start_star_system.name} -----")
for end_star_system in all_systems:
#print(f"Evaluating: {start_star_system.name} -> {end_star_system.name}")
if end_star_system != start_star_system: # different systems
distance = (end_star_system.position - start_star_system.position).magnitude()
start_star_system.add_distance(end_star_system, distance)
print(f"Total systems imported: {len(all_systems)}")
# next_system = None
# best_distance = 1000000
# for end_star_system in all_systems:
# if end_star_system != starter_system:
# # existing_edge = G[end_star_system][start_star_system]
# # print(existing_edge)
# print(f"checking {starter_system.name} for distance to {end_star_system.name}")
# inter_system_distance = starter_system.distance_to(end_star_system)
# if inter_system_distance < best_distance:
# next_system = end_star_system
# best_distance = inter_system_distance
# print(f"Best distance from {starter_system} -> {next_system} is {best_distance}")
# return next_system
# This first class encodes the concept of a route, which is an array of system indices
# stored in `visited_systems_indices` and a dictionary `visited_systems` which was used
# initially to boostrap the naive routes generated in `simple_plot_route()`
class RouteFinder(object):
def load_route_indices(self, route_indices):
if (len(route_indices) != all_systems_len):
print(f"Incorrect number of systems: {len(route_indices)} given but {all_systems_len} required")
sys.exit(1)
self.visited_systems_indices = route_indices.copy()
def best_destination_from(self, starter_system):
for (system, distance) in starter_system.ordered_distances():
if system not in self.visited_systems:
return system
else:
pass
#print(f"Already visited {system.name}")
return None
# Dumb initial route generation approach. Start somewhere, and pick the closest nearby
# systems you've not yet visited, and repeat
def simple_plot_route(self, starting_index=0):
self.visited_systems_indices = []
self.visited_systems = {}
self.add_system(all_systems[starting_index])
while len(self.visited_systems_indices) < all_systems_len:
best_destination = self.best_destination_from(self.current_system)
if best_destination == None:
print(f"Next system for {self.current_system.name} is none")
break
else:
self.add_system(best_destination)
#print("Finished basic route plot!")
@property
def current_system(self):
return all_systems[self.visited_systems_indices[-1]]
def add_system(self, star_system):
self.visited_systems[star_system] = True
star_system_index = all_systems.index(star_system)
self.visited_systems_indices.append(star_system_index)
def longest_hop(self):
(longest_hop_distance, longest_hop_index) = self.longest_hop_with_index()
return longest_hop_distance
def longest_hop_index(self):
(longest_hop_distance, longest_hop_index) = self.longest_hop_with_index()
return longest_hop_index
def longest_hop_with_index(self):
longest_hop_index = -1
longest_hop = 0
my_current_system = all_systems[self.visited_systems_indices[0]]
for index in self.visited_systems_indices[1:]:
my_next_system = all_systems[index]
distance = my_current_system.distance_to(my_next_system)
if distance > longest_hop:
longest_hop = distance
longest_hop_index = index
return (longest_hop, longest_hop_index)
def distance_for_current_path(self):
total_distance = 0
my_current_system = all_systems[self.visited_systems_indices[0]]
for index in self.visited_systems_indices[1:]:
my_next_system = all_systems[index]
total_distance += my_current_system.distance_to(my_next_system)
#print(f" {system_count} {my_current_system.name} -> {my_next_system.name}")
my_current_system = my_next_system
#print(f"distance for {len(self.visited_systems_indices)} systems is {total_distance}")
return total_distance
def print_route(self):
total_distance = 0
longest_hop = 0
my_current_system = all_systems[self.visited_systems_indices[0]]
for index in self.visited_systems_indices[1:]:
my_next_system = all_systems[index]
distance = my_current_system.distance_to(my_next_system)
print(f"{index:02d} {my_current_system.name} -> {my_next_system.name} ({distance:.1f}ly)")
total_distance += distance
if distance > longest_hop:
longest_hop = distance
my_current_system = my_next_system
print(f"\nTotal distance {total_distance:.1f} ly")
print(f"Longest hop distance {longest_hop:.1f} ly")
print(f"Compact route {self.visited_systems_indices}")
def init_from_route_finder(self, other):
self.visited_systems_indices = other.visited_systems_indices.copy()
#self.visited_systems = other.visited_systems.copy()
# Simple RouteFinder usage tests
# route_finder = RouteFinder()
# route_finder.simple_plot_route()
# print("distance", route_finder.distance_for_current_path())
# Don't know why I can't have this inside RouteComparator
class RouteComparatorFunction(Enum):
FITNESS_SELECTOR_SHORTEST_TOTAL = 0
FITNESS_SELECTOR_SHORTEST_HOP = 1 # NOTE THAT THIS COMPARATOR IS BROKEN RIGHT NOW!
# A class to mutate and check two routes against each other. The backbone of our GA
class RouteComparator(object):
def __init__(self, skip_duplicate_mutations=False, fitness_selector=RouteComparatorFunction.FITNESS_SELECTOR_SHORTEST_TOTAL):
self.total_mutation_count = 0
self.fitness_selector = fitness_selector
if self.fitness_selector == RouteComparatorFunction.FITNESS_SELECTOR_SHORTEST_HOP:
print("Shortest hop optimisation is currently unsupported. It needs a better way to target the source of the largest hop and ensure that each mutation tries to address it")
sys.exit(1)
self.skip_duplicate_mutations = skip_duplicate_mutations
self.existing_mutations_set = set()
self.successful_mutations = 0
def basic_init(self):
current_best_route = RouteFinder()
current_best_route.simple_plot_route()
current_best_route_distance = current_best_route.distance_for_current_path()
starting_index = 1
while starting_index < all_systems_len:
alternative_route = RouteFinder()
alternative_route.simple_plot_route(starting_index)
alternative_route_distance = alternative_route.distance_for_current_path()
#print(f"comparing {alternative_route_distance} with {current_best_route_distance}")
if alternative_route_distance < current_best_route_distance:
current_best_route = alternative_route
current_best_route_distance = alternative_route_distance
print(f"New starter system gave a best route distance of {alternative_route_distance} for {current_best_route.visited_systems_indices}")
starting_index += 1
self.assign_best_route(current_best_route)
def load_route_indices(self, route_indices):
route = RouteFinder()
route.load_route_indices(route_indices)
self.assign_best_route(route)
def assign_best_route(self, best_route):
self.best_route = best_route
if self.fitness_selector == RouteComparatorFunction.FITNESS_SELECTOR_SHORTEST_TOTAL:
self.best_route_fitness_value = best_route.distance_for_current_path()
elif self.fitness_selector == RouteComparatorFunction.FITNESS_SELECTOR_SHORTEST_HOP:
self.best_route_fitness_value = best_route.longest_hop()
self.existing_mutations_set = set()
def do_mutations(self, desired_count, required_mutation_count=1, allowed_mutation_distance=1):
print(f"Doing {desired_count} mutations")
mutation_count = 0
self.successful_mutations = 0
prior_mutation_cache_hits = 0
last_printout_time = time.time()
while mutation_count < desired_count:
#print(f"On mutation {mutation_count} and {self.successful_mutations} have been successful")
#random_mutation_count = random.randint(1, 3)
if self.mutate_route_random(required_mutation_count, allowed_mutation_distance):
mutation_count += 1
self.total_mutation_count += 1
if self.total_mutation_count % 10000 == 0:
current_printout_time = time.time()
rate = 10000 / (current_printout_time - last_printout_time)
percent_complete = (self.total_mutation_count / desired_count) * 100.0
print(f"On mutation {self.total_mutation_count} ({percent_complete:.1f}%) at {rate:.0f} tests/s. {prior_mutation_cache_hits} prior mutations skipped, {self.successful_mutations} successful")
last_printout_time = current_printout_time
else:
prior_mutation_cache_hits += 1
print(f"Finished doing {mutation_count} mutations and {self.successful_mutations} were successful")
#print("\n\nNew best route:")
#self.best_route.print_route()
#print(f"Distance: {self.best_route.distance_for_current_path()}")
def fitness_score_for_route(self, route):
if self.fitness_selector == RouteComparatorFunction.FITNESS_SELECTOR_SHORTEST_TOTAL:
return route.distance_for_current_path()
elif self.fitness_selector == RouteComparatorFunction.FITNESS_SELECTOR_SHORTEST_HOP:
return route.longest_hop()
def new_route_is_fitter(self, new_route):
if self.fitness_selector == RouteComparatorFunction.FITNESS_SELECTOR_SHORTEST_TOTAL:
return self.new_route_is_fitter_by_total_distance(new_route)
elif self.fitness_selector == RouteComparatorFunction.FITNESS_SELECTOR_SHORTEST_HOP:
return self.new_route_is_fitter_by_longest_hop_distance(new_route)
def new_route_is_fitter_by_longest_hop_distance(self, new_route):
existing_route_longest_hop = self.best_route_fitness_value
new_route_longest_hop = new_route.longest_hop()
#print(f"Existing longest hop {existing_route_longest_hop:.1f} vs new longest hop {new_route_longest_hop:.1f}")
return new_route_longest_hop < existing_route_longest_hop
def new_route_is_fitter_by_total_distance(self, new_route):
existing_route_distance = self.best_route_fitness_value
new_route_distance = new_route.distance_for_current_path()
#print(f"Existing route distance {existing_route_distance:.1f} vs new route distance {new_route_distance:.1f}")
return new_route_distance < existing_route_distance
def mutate_route_random(self, max_mutation_count=1, max_mutation_distance=1):
mutated_route = RouteFinder()
mutated_route.init_from_route_finder(self.best_route)
mutated_route_distance = mutated_route.distance_for_current_path()
required_mutation_count = random.randint(1, max_mutation_count)
mutation_count = 0
mutation_string = ":"
while mutation_count < required_mutation_count:
random_index_source = random.randint(0, all_systems_len - 1)
swap_direction = random.choice([1, -1])
swap_distance = 1
swap_distance = random.randint(1, max_mutation_distance)
random_index_dest = random_index_source + (swap_direction * swap_distance)
# clamp so we don't try to swap outside the range
random_index_dest = max(0, min(random_index_dest, all_systems_len-1))
# Prevent a no-op
if (random_index_dest == random_index_source):
continue
mutated_route.visited_systems_indices[random_index_source], mutated_route.visited_systems_indices[random_index_dest] = mutated_route.visited_systems_indices[random_index_dest], mutated_route.visited_systems_indices[random_index_source]
mutation_string += f"{random_index_source},{random_index_dest}:"
#print(f" Mutating iteration {self.total_mutation_count} by swapping index {random_index_source} and {random_index_dest}")
mutation_count += 1
#print(f"Dist: {self.fitness_score_for_route(mutated_route)} / Mutations: {mutation_string}")
if self.skip_duplicate_mutations and mutation_string in self.existing_mutations_set:
#print(f"Skipping {mutation_string} as we've tried it already")
return False
else:
if self.new_route_is_fitter(mutated_route):
mutated_route.print_route()
self.assign_best_route(mutated_route)
self.successful_mutations += 1
#print(f"Inserting mutation_string {mutation_string}")
if self.skip_duplicate_mutations:
self.existing_mutations_set.add(mutation_string)
return True
def mutate_route(self, required_mutation_count=1, allowed_mutation_distance=1):
mutated_route = RouteFinder()
mutated_route.init_from_route_finder(self.best_route)
mutated_route_distance = mutated_route.distance_for_current_path()
#print(f"Before mutation, dist = {mutated_route_distance}")
#print(f"Before mutation, route = {mutated_route.visited_systems_indices}")
mutation_count = 0
while mutation_count < required_mutation_count:
# randint has both args inclusive in the output
random_index_source = random.randint(0, all_systems_len - 1)
# Focus on the longest hop if we're selecting for that
if self.fitness_selector == RouteComparatorFunction.FITNESS_SELECTOR_SHORTEST_HOP:
random_index_source = mutated_route.longest_hop_index()
# do we swap up or down the list?
swap_direction = random.choice([1, -1])
swap_distance = 1
if (allowed_mutation_distance > 1):
swap_distance = random.randint(1, allowed_mutation_distance)
random_index_dest = random_index_source + (swap_direction * swap_distance)
# clamp so we don't try to swap outside the range
random_index_dest = max(0, min(random_index_dest, all_systems_len-1))
if (random_index_dest == random_index_source):
continue
print(f"Testing mutation {self.total_mutation_count} by swapping index {random_index_source} and {random_index_dest}")
# Classic swap
mutated_route.visited_systems_indices[random_index_source], mutated_route.visited_systems_indices[random_index_dest] = mutated_route.visited_systems_indices[random_index_dest], mutated_route.visited_systems_indices[random_index_source]
mutation_count += 1
if self.new_route_is_fitter(mutated_route):
self.assign_best_route(mutated_route)
return True
else:
#print(f"Mutated distance {mutated_route_distance} not better than best {self.best_route_distance}")
return False
if __name__ == "__main__":
start_time = time.time()
seed = int(sys.argv[1]) if len(sys.argv) >= 2 else 1
iteration_count = int(sys.argv[2]) if len(sys.argv) >= 3 else 100000
random.seed(seed) # Required for repeatability
route_comparator = RouteComparator(skip_duplicate_mutations=False)
# Use this if you're not loading an existing route like we are below
#route_comparator.basic_init()
# Start from the beginning
#dumb_route = range(len(all_systems))
#route_comparator.load_route_indices([*dumb_route])
# Current route
route_comparator.load_route_indices([3, 0, 5, 2, 6, 7, 8, 1, 10, 4, 13, 11, 23, 9, 20, 12, 19, 22, 24, 17, 25, 21, 15, 14, 16, 18])
# Load in the previous best route I found from a previous iteration
#route_comparator.load_route_indices([7,14,15,8,16,18,25,6,10,1,21,24,17,22,13,19,12,20,2,4,0,5,9,23,11,3])
route_comparator.best_route.print_route()
print("Starting mutations")
#route_comparator.do_mutations(1000000)
#route_comparator.do_mutations(100000, 1, 2)
#route_comparator.do_mutations(100000, 2, 2)
# I'm just playing with these numbers currently
route_comparator.do_mutations(iteration_count, 4, 4)
print("\n-------------\nFinal route:\n")
route_comparator.best_route.print_route()
elapsed_time = time.time() - start_time
print(f"Duration: {elapsed_time}")
sys.exit(0)
#############################################################
# unused graph work - not worth it in the end
# # https://gist.github.com/mikkelam/ab7966e7ab1c441f947b
# def some_random_hamilton(G):
# F = [(G,[list(G.nodes())[0]])]
# n = G.number_of_nodes()
# while F:
# graph,path = F.pop()
# confs = []
# neighbors = (node for node in graph.neighbors(path[-1])
# if node != path[-1]) #exclude self loops
# for neighbor in neighbors:
# conf_p = path[:]
# conf_p.append(neighbor)
# conf_g = nx.Graph(graph)
# conf_g.remove_node(path[-1])
# confs.append((conf_g,conf_p))
# for g,p in confs:
# if len(p)==n:
# return p
# else:
# F.append((g,p))
# return None
# G = nx.DiGraph()
# G.add_nodes_from(all_systems)
# flip_weights = False
# for start_star_system in all_systems:
# for end_star_system in all_systems:
# if end_star_system != start_star_system: # different systems
# distance = (end_star_system.position - start_star_system.position).magnitude()
# try:
# exising_edge = G[end_star_system][start_star_system]
# # Do nothing if we have the link
# #print("... Skipped (existing)")
# except KeyError:
# #wt_i = - distance if flip_weights else distance
# G.add_edge(start_star_system, end_star_system, attr_dict={'distance': distance})
# #print(f"... Added!")
# else:
# pass
# #print(f"... Skipped (same systems)")
# all_names_map = {s:s.name for s in all_systems}
# nx.relabel_nodes(G, all_names_map, copy=False)
# nx.draw(G, None, with_labels=True, arrows=True, node_size=700)
# plt.savefig("path_graph1.png")
# plt.show()
# sys.exit(0)
# # We should have (total-1) connections for each node
# print("------")
# print('Number of nodes: {}'.format(len(G.nodes())))
# print('Number of edges: {}'.format(len(G.edges())))
# print()
# for node in G.nodes():
# print(node.name, G.degree(node))
# print("------")
# total_distance = 0
# assert_true(tournament.is_tournament(G))
# path = hamiltonian_path(G)
# assert_equal(len(path), 26)
# assert_true(all(v in G[u] for u, v in zip(path, path[1:])))
# current_system = path[0]
# for next_system in path[1:]:
# distance = (next_system.position - current_system.position).magnitude()
# print(current_system.name, " -> ", next_system.name, " @ ", distance, "ly")
# current_system = next_system
# total_distance += distance
# print("Total distance", total_distance)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment