Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Traveling Salesman Problem with Genetic Algorithm. Execute this code on EC2 with proper IAM Role.
import math
import random
import json
import csv
import datetime
import boto3
import os
population = 100
generation = 10000
elite_rate = 0.1
mutation_rate = 0.16
print_lines = 30
working_directory = "/home/ec2-user/traveling_salesman/"
city_info_file = "city_info.json"
city_info_path = working_directory + city_info_file
execution_id = datetime.datetime.now().strftime("%Y%m%d%H%M%S%f")
s3 = boto3.client("s3")
bucket = "[S3 Bucket Name]"
s3directory = "traveling_salesman/"
def read_json(file_path):
with open(file_path, "r") as file:
return_json = json.load(file)
return return_json
def write_json(output_dictionary, file_path):
with open(file_path, "w") as file:
json.dump(output_dictionary, file, sort_keys=True, indent=2)
def write_csv(outputs, file_path):
csvform = []
header_title = ["city_info_file", "population", "generation", "genotype_digits", "elite_rate", "mutation_rate"]
header_content = [outputs["city_info_file"], outputs["population"], outputs["generation"], outputs["genotype_digits"], outputs["elite_rate"], outputs["mutation_rate"]]
csvform.append(header_title)
csvform.append(header_content)
log_title = ["generation_id", "best_individual_id", "fitness", "distance"]
csvform.append(log_title)
for generation_id in range(outputs["generation"]):
best_individual_id = outputs["evolution_log"][generation_id]["best_individual_id"]
fitness = outputs["evolution_log"][generation_id]["individuals"][0]["fitness"]
distance = 1/fitness
log_content = [generation_id, best_individual_id, fitness, distance]
csvform.append(log_content)
writecsv = csv.writer(file(file_path, "w"), lineterminator=",\n")
writecsv.writerows(csvform)
def preparation(city_info_file, population, generation, genotype_digits, elite_rate, mutation_rate):
outputs = {}
outputs["calculation_id"] = 0
outputs["city_info_file"] = city_info_file
outputs["population"] = population
outputs["generation"] = generation
outputs["genotype_digits"] = genotype_digits
outputs["elite_rate"] = elite_rate
outputs["mutation_rate"] = mutation_rate
outputs["start_time"] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
outputs["evolution_log"] = []
return outputs
def generate_genotype(genotype_digits):
genotype = range(genotype_digits)
random.shuffle(genotype)
return genotype
def initial_generation(population, genotype_digits):
individuals = []
for individual_id in range(population):
individual = {}
individual["individual_id"] = individual_id
individual["genotype"] = generate_genotype(genotype_digits)
individuals.append(individual)
return individuals
def evaluation_function(cities, genotype):
distance = 0
for city_number in range(len(cities)-1):
origin_city = filter(lambda x: x["id"]==genotype[city_number],cities)[0]
destination_city = filter(lambda x: x["id"]==genotype[city_number+1],cities)[0]
distance += math.hypot(destination_city["x"]-origin_city["x"], destination_city["y"] - origin_city["y"])
origin_city = filter(lambda x: x["id"]==genotype[len(cities)-1],cities)[0]
destination_city = filter(lambda x: x["id"]==genotype[0],cities)[0]
distance += math.hypot(destination_city["x"]-origin_city["x"], destination_city["y"] - origin_city["y"])
return 1/distance
def roulette(individuals, population):
random_value = random.random()
sum = 0
for individual in individuals:
sum += individual["importance"]
if random_value<sum:
return individual["individual_id"]
return population - 1
def crossover(parent1_genotype, parent2_genotype, child1_genotype, child2_genotype, genotype_digits):
switch_digit = int(random.random()*genotype_digits)
parent1_rest_gene = []
parent2_rest_gene = []
for digit in range(genotype_digits):
if digit < switch_digit:
child1_genotype[digit] = parent1_genotype[digit]
child2_genotype[digit] = parent2_genotype[digit]
else:
parent1_rest_gene.append(parent1_genotype[digit])
parent2_rest_gene.append(parent2_genotype[digit])
child1_digit = switch_digit
child2_digit = switch_digit
for parent_digit in range(genotype_digits):
for rest_digit in range(len(parent1_rest_gene)):
if parent2_genotype[parent_digit]==parent1_rest_gene[rest_digit]:
child1_genotype[child1_digit] = parent2_genotype[parent_digit]
child1_digit += 1
if parent1_genotype[parent_digit]==parent2_rest_gene[rest_digit]:
child2_genotype[child2_digit] = parent1_genotype[parent_digit]
child2_digit += 1
def generate_new(parents, population, genotype_digits):
children = [{} for digit in range(population)]
count = 0
while count < population:
child1 = children[count]
child2 = children[count + 1]
child1["individual_id"] = count
child2["individual_id"] = count + 1
if count < elite_rate*population: #elite
child1["genotype"] = parents[child1["individual_id"]]["genotype"]
child2["genotype"] = parents[child2["individual_id"]]["genotype"]
elif count < (elite_rate + mutation_rate)*population: #mutation
child1["genotype"] = generate_genotype(genotype_digits)
child2["genotype"] = generate_genotype(genotype_digits)
else: #crossover
parent1_id = roulette(parents, population)
# print("Roulette returns " + str(parent1_id) + " as first parent ID")
parent2_id = roulette(parents, population)
# print("Roulette returns " + str(parent2_id) + " as second parent ID")
parent1 = filter(lambda x: x["individual_id"]==parent1_id,parents)[0]
parent2 = filter(lambda x: x["individual_id"]==parent2_id,parents)[0]
child1["genotype"] = [0 for digit in range(genotype_digits)]
child2["genotype"] = [0 for digit in range(genotype_digits)]
crossover(parent1["genotype"], parent2["genotype"], child1["genotype"], child2["genotype"], genotype_digits)
count += 2
return children
def evaluation(cities, population, individuals):
unnormalized_importance = [0 for digit in range(population)]
for individual in individuals:
individual["fitness"] = evaluation_function(cities, individual["genotype"])
max_fitness = max(individuals, key=(lambda x: x["fitness"]))["fitness"]
min_fitness = min(individuals, key=(lambda x: x["fitness"]))["fitness"]
sum = 0
for individual in individuals:
unnormalized_importance[individual["individual_id"]] = (individual["fitness"]-min_fitness)/(max_fitness-min_fitness)
sum += unnormalized_importance[individual["individual_id"]]
for individual in individuals:
individual["importance"] = unnormalized_importance[individual["individual_id"]]/sum
individuals.sort(key=lambda x: x["importance"], reverse=True)
def summarize_generation(generation_id, individuals, best_individual_id):
generation_summary = {}
generation_summary["generation_id"] = generation_id
generation_summary["individuals"] = individuals
generation_summary["best_individual_id"] = best_individual_id
return generation_summary
def summarize_all(outputs, best_individual):
outputs["end_time"] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
outputs["best_individual"] = best_individual
if __name__ == "__main__":
cities = read_json(city_info_path)
genotype_digits = len(cities)
outputs = preparation(city_info_file, population, generation, genotype_digits, elite_rate, mutation_rate)
individuals = initial_generation(population, genotype_digits)
evaluation(cities, population, individuals)
best_individual_id = max(individuals, key=(lambda x: x["fitness"]))["individual_id"]
outputs["evolution_log"].append(summarize_generation(0, individuals, best_individual_id))
for generation_id in range(1, generation):
if generation_id%(generation//print_lines) == 0:
print("[%s]Generation: %d/%d"%(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"), generation_id,generation))
new_individuals = generate_new(individuals, population, genotype_digits)
evaluation(cities, population, new_individuals)
individuals = new_individuals
best_individual_id = max(individuals, key=(lambda x: x["fitness"]))["individual_id"]
outputs["evolution_log"].append(summarize_generation(generation_id, individuals, best_individual_id))
best_individual = {}
best_individual = max(individuals, key=(lambda x: x["fitness"]))
summarize_all(outputs, best_individual)
filename = "traveling_salesman_result_" + execution_id + ".json"
small_filename = "traveling_salesman_result_" + execution_id + ".csv"
filepath = working_directory + filename
small_filepath = working_directory + small_filename
write_json(outputs, filepath)
write_csv(outputs, small_filepath)
s3.upload_file(filepath, bucket, s3directory + filename)
s3.upload_file(small_filepath, bucket, s3directory + small_filename)
print("Calculation result was saved as %s on S3 Bucket %s." % (s3directory + filename, bucket))
os.remove(filepath)
os.remove(small_filepath)
print("Done!!")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment