Created
April 23, 2010 00:17
Revisions
-
gabesmed revised this gist
Apr 23, 2010 . 1 changed file with 14 additions and 196 deletions.There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -13,6 +13,11 @@ from scenicroute.utils.geography.voronoi import computeDelaunayTriangulation from scenicroute.utils.geography.distance import great_circle_distance as gcd from scenicroute.utils.geography.coordinates import to_tuple # # This is a partial sample for explanatory purposes. # For the full code, shoot me an email at gabe@scenicroute.info. # class EvolutionSettings: num_generations = 10 # number of generations to run for spawn = 100 # number of new children to make each generation @@ -76,18 +81,12 @@ def evolve_candidates(points, edges, desired_duration): # and return the final population return evolution_chamber.pop class FitnessTester(object): LETHAL = -99999999 def __init__(self, points, desired_duration): "Initialize preference constants." self.points = points self.desired_duration = desired_duration self.min_step_duration = 2 # no step should be less than 3 minutes @@ -97,7 +96,9 @@ class FitnessTester(object): self._fitness_cache = {} def fitness(self, individual): """ Calculate the fitness of an individual path. """ if tuple(individual) in self._fitness_cache: return self._fitness_cache[tuple(individual)] @@ -124,16 +125,16 @@ class FitnessTester(object): for point in all_points[1:]: # calculate distances distance = gcd(last_point, point) distance_to_end = gcd(point, all_points[-1]) duration = duration_for_distance(distance) # and figure out if we're headed backwards total_duration += duration if distance_to_end > last_distance_to_end: total_backwards += duration_for_distance( distance_to_end - last_distance_to_end) # enumerate steps that are less than or more than ideal distance if duration < self.min_step_duration: total_below_min += (self.min_step_duration - duration) if duration > self.max_step_duration: @@ -143,8 +144,8 @@ class FitnessTester(object): last_distance_to_end = distance_to_end num_switchbacks = 0 if len(all_points) > 2: # for all points, figure out if it's switching back and penalize appropriately. for i in xrange(1, len(all_points) - 1): ax, ay = all_points[i-1].coords ox, oy = all_points[i].coords @@ -187,187 +188,4 @@ class FitnessTester(object): self._fitness_cache[tuple(individual)] = fitness return fitness
-
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,373 @@ import math import random import operator from django.contrib.gis.geos import LineString, Point from scenicroute.apps.hunts.routing.triangulation import ( build_triangulation, START_INDEX, END_INDEX) from scenicroute.apps.hunts.routing.culling import no_dupes from scenicroute.apps.hunts.routing.estimation import (duration_for_distance, distance_in_duration) from scenicroute.utils.geography.voronoi import computeDelaunayTriangulation from scenicroute.utils.geography.distance import great_circle_distance as gcd from scenicroute.utils.geography.coordinates import to_tuple class EvolutionSettings: num_generations = 10 # number of generations to run for spawn = 100 # number of new children to make each generation preserve = 5 # preserve the top N children each generation unchanged cull = 20 # number of top children to keep each generation choose_from_top = 10 # and choose randomly from the top N final results def build_route(hunt, messages, desired_duration): """ Build a candidate route, return a list of messages """ if len(messages) == 0: # Return an empty list return [] # triangulate the route points, messages_lists, edges = build_triangulation(hunt, messages) # points is a list of Point objects [START, END, P0, P1...] # messages is a list of lists of messages ['start', 'end', [], [], ...] # edges is a dict is list indexes, { i0: set(i1), i1: set(i2, i3), ... } if len(points) == 2: # only start and end points return [] final_population = evolve_candidates(points, edges, desired_duration) # and choose a random of the top five # if the solution is highly convergent, then there will be only one # and the random choice won't matter. if it's highly divergent, # then we get a little spice in the choice final_choice = random.randrange(EvolutionSettings.choose_from_top) index_sequence = final_population[final_choice] # choose among the messages for each location in the sequence final_route = [] for index in index_sequence: message_list = messages_lists[index] choice = random.randrange(len(message_list)) message = message_list[choice] final_route.append(message) return final_route def evolve_candidates(points, edges, desired_duration): # seed and run the genetic algorithm initial = [[ random.randrange(2, len(points)), random.randrange(2, len(points))] for i in xrange(40)] tester = FitnessTester(points, desired_duration) mutator = Mutator(edges) evolution_chamber = EvolutionChamber(tester, mutator, generations=EvolutionSettings.num_generations, cull=EvolutionSettings.cull, spawn=EvolutionSettings.spawn, preserve=EvolutionSettings.preserve) evolution_chamber.evolve(initial) # and return the final population return evolution_chamber.pop def dot(a, b): alen = math.hypot(*a) blen = math.hypot(*b) a = map(lambda x: x / alen, a) b = map(lambda x: x / blen, b) return reduce(operator.add, map(operator.mul, a, b)) class FitnessTester(object): LETHAL = -99999999 def __init__(self, points, desired_duration): self.points = points self.desired_duration = desired_duration self.min_step_duration = 2 # no step should be less than 3 minutes self.max_step_duration = 6 # no step should be greater than 10 minutes self.max_switchback = -0.90 # no angle should switch back more than this much self.stop_duration = 1 # add one minute to total duration for each stop self._fitness_cache = {} def fitness(self, individual): if tuple(individual) in self._fitness_cache: return self._fitness_cache[tuple(individual)] if 0 in individual or 1 in individual: return self.LETHAL if isinstance(individual, tuple): individual = list(individual) duplicates = len(individual) - len(list(set(individual))) if duplicates: return self.LETHAL # total duration includes 5 minutes per stop total_duration = len(individual) * self.stop_duration total_backwards = 0 total_below_min = 0 total_above_max = 0 all_points = [self.points[i] for i in [START_INDEX] + individual + [END_INDEX]] last_point = all_points[0] last_distance_to_end = gcd(last_point, all_points[-1]) for point in all_points[1:]: distance = gcd(last_point, point) distance_to_end = gcd(point, all_points[-1]) duration = duration_for_distance(distance) total_duration += duration if distance_to_end > last_distance_to_end: total_backwards += duration_for_distance( distance_to_end - last_distance_to_end) if duration < self.min_step_duration: total_below_min += (self.min_step_duration - duration) if duration > self.max_step_duration: total_above_max += (duration - self.max_step_duration) last_point = point last_distance_to_end = distance_to_end num_switchbacks = 0 if len(all_points) > 2: for i in xrange(1, len(all_points) - 1): ax, ay = all_points[i-1].coords ox, oy = all_points[i].coords bx, by = all_points[i+1].coords ao = (ox - ax, oy - ay) ob = (bx - ox, by - oy) try: product = dot(ao, ob) if product < self.max_switchback: num_switchbacks += 1 except ZeroDivisionError: pass # lose a point for every minute away from desired total distance duration_penalty = abs((self.desired_duration - total_duration)) # lose a point for every minute you go backwards backwards_penalty = 1.0 * total_backwards # lose five points for every minute below the penalty below_min_penalty = 5.0 * total_below_min # lose two points for every minutes above max above_max_penalty = 2.0 * total_above_max # lose five points for every major switchback switchback_penalty = 5.0 * num_switchbacks fitness = 0 fitness -= duration_penalty fitness -= backwards_penalty fitness -= switchback_penalty fitness -= below_min_penalty fitness -= above_max_penalty # print 'total duration: %.1f, switchbacks %d, below_min %.1f, above_max %.1f, fitness %.5f - %s' % ( # total_duration, num_switchbacks, total_below_min, total_above_max, fitness, # individual) self._fitness_cache[tuple(individual)] = fitness return fitness class EvolutionChamber(object): def __init__(self, tester, mutator, generations, cull, spawn, preserve, mutate=0.5): self.tester = tester self.mutator = mutator self.generations = generations self.cull = cull self.spawn = spawn self.preserve = preserve self.mutate = mutate self.pop = None def evolve(self, initial): "Start the evolution and return the best result." self.pop = initial for i in xrange(self.generations): self.pop = self.gen(self.pop) return self.pop[0] # return most fit. def gen(self, pop): "Run a generation on this population and return the next one" # sort by fitness, best first pop = sorted(pop, key=self.tester.fitness, reverse=True) popcount2 = len(pop) ** 2 children = [] # build children for i in xrange(int(self.spawn / 2)): # pick parents, preferring earlier (more fit) index1 = index2 = int(math.sqrt(random.randrange(popcount2))) parent1 = pop[-index1] while index1 == index2: index2 = int(math.sqrt(random.randrange(popcount2))) parent2 = pop[-index2] # crossover the two parents and mutate child1, child2 = self.crossover(parent1, parent2) child1 = self.mutator.mutate(child1) child2 = self.mutator.mutate(child2) children.extend([child1, child2]) # add the top n parents to this generation if self.preserve: children.extend(pop[:self.preserve]) # sort by fitness children = sorted(children, key=self.tester.fitness, reverse=True) # and return the top n return children[:self.cull] def crossover(self, parent1, parent2): "Cross over the two choices" # can't cross over empty list if len(parent1) == 0 or len(parent2) == 0: return parent1, parent2 xpoint1 = random.randrange(len(parent1)) xpoint2 = random.randrange(len(parent2)) child1 = parent1[:xpoint1] + parent2[xpoint2:] child2 = parent2[:xpoint2] + parent1[xpoint1:] return child1, child2 class Mutator(object): def __init__(self, edges): self.edges = edges def mutate(self, individual): mutations = [ (self.move_point, 1), (self.swap_points, 2), (self.delete_point, 1), (self.add_middle, 1), (self.add_start, 0), (self.add_end, 0)] available = filter(lambda m: len(individual) >= m[1], mutations) choice = random.randrange(len(available)) mutated = available[choice][0](individual) cleaned = self.cleanup(mutated) # print '%s %s -> %s' % (individual, # available[choice][0].__name__, cleaned) # return cleaned def cleanup(self, individual): return list(no_dupes(individual)) def move_point(self, individual): "Choose a random node and replace it with an adjacent node" if len(individual) == 0: return individual index = random.randrange(len(individual)) try: individual[index] = self._get_adjacent(individual, individual[index]) except NoAdjacentException: pass return individual def swap_points(self, individual): "Take two points in the route and swap their order" if len(individual) < 2: return individual i1 = i2 = random.randrange(len(individual)) while i1 == i2: i2 = random.randrange(len(individual)) individual[i1], individual[i2] = individual[i2], individual[i1] return individual def delete_point(self, individual): "Remove a point from the list" if len(individual) == 0: return individual index = random.randrange(len(individual)) return individual[:index] + individual[index + 1:] def add_middle(self, individual): "Add a point to the list anywhere in the middle" index = random.randrange(len(individual)) try: if random.randrange(2): # insert before individual.insert(index, self._get_adjacent(individual, individual[index])) else: # insert after individual.insert(index + 1, self._get_adjacent(individual, individual[index])) except NoAdjacentException: pass return individual def add_start(self, individual): try: individual.insert(0, self._get_adjacent(individual, START_INDEX)) except NoAdjacentException: pass return individual def add_end(self, individual): try: individual.insert(len(individual) + 1, self._get_adjacent(individual, END_INDEX)) except NoAdjacentException: pass return individual def _get_adjacent(self, individual, point): neighbors = [] for index in self.edges[point]: if index in [0, 1]: continue if index in individual: continue neighbors.append(index) if len(neighbors) == 0: raise NoAdjacentException neighbor_index = random.randrange(len(neighbors)) return neighbors[neighbor_index] class NoAdjacentException(Exception): pass