Skip to content

Instantly share code, notes, and snippets.

@gabesmed
Created April 23, 2010 00:17

Revisions

  1. gabesmed revised this gist Apr 23, 2010. 1 changed file with 14 additions and 196 deletions.
    210 changes: 14 additions & 196 deletions gistfile1.pyw
    Original file line number Diff line number Diff line change
    @@ -13,6 +13,11 @@ from scenicroute.utils.geography.voronoi import computeDelaunayTriangulation
    from scenicroute.utils.geography.distance import great_circle_distance as gcd
    from scenicroute.utils.geography.coordinates import to_tuple

    #
    # This is a partial sample for explanatory purposes.
    # For the full code, shoot me an email at gabe@scenicroute.info.
    #

    class EvolutionSettings:
    num_generations = 10 # number of generations to run for
    spawn = 100 # number of new children to make each generation
    @@ -76,18 +81,12 @@ def evolve_candidates(points, edges, desired_duration):
    # and return the final population
    return evolution_chamber.pop

    def dot(a, b):
    alen = math.hypot(*a)
    blen = math.hypot(*b)
    a = map(lambda x: x / alen, a)
    b = map(lambda x: x / blen, b)
    return reduce(operator.add, map(operator.mul, a, b))

    class FitnessTester(object):

    LETHAL = -99999999

    def __init__(self, points, desired_duration):
    "Initialize preference constants."
    self.points = points
    self.desired_duration = desired_duration
    self.min_step_duration = 2 # no step should be less than 3 minutes
    @@ -97,7 +96,9 @@ class FitnessTester(object):
    self._fitness_cache = {}

    def fitness(self, individual):

    """
    Calculate the fitness of an individual path.
    """
    if tuple(individual) in self._fitness_cache:
    return self._fitness_cache[tuple(individual)]

    @@ -124,16 +125,16 @@ class FitnessTester(object):

    for point in all_points[1:]:

    # calculate distances
    distance = gcd(last_point, point)
    distance_to_end = gcd(point, all_points[-1])

    duration = duration_for_distance(distance)

    # and figure out if we're headed backwards
    total_duration += duration
    if distance_to_end > last_distance_to_end:
    total_backwards += duration_for_distance(
    distance_to_end - last_distance_to_end)

    # enumerate steps that are less than or more than ideal distance
    if duration < self.min_step_duration:
    total_below_min += (self.min_step_duration - duration)
    if duration > self.max_step_duration:
    @@ -143,8 +144,8 @@ class FitnessTester(object):
    last_distance_to_end = distance_to_end

    num_switchbacks = 0

    if len(all_points) > 2:
    # for all points, figure out if it's switching back and penalize appropriately.
    for i in xrange(1, len(all_points) - 1):
    ax, ay = all_points[i-1].coords
    ox, oy = all_points[i].coords
    @@ -187,187 +188,4 @@ class FitnessTester(object):
    self._fitness_cache[tuple(individual)] = fitness

    return fitness

    class EvolutionChamber(object):
    def __init__(self, tester, mutator, generations, cull, spawn,
    preserve, mutate=0.5):
    self.tester = tester
    self.mutator = mutator
    self.generations = generations
    self.cull = cull
    self.spawn = spawn
    self.preserve = preserve
    self.mutate = mutate
    self.pop = None

    def evolve(self, initial):
    "Start the evolution and return the best result."

    self.pop = initial

    for i in xrange(self.generations):
    self.pop = self.gen(self.pop)

    return self.pop[0] # return most fit.

    def gen(self, pop):
    "Run a generation on this population and return the next one"
    # sort by fitness, best first

    pop = sorted(pop, key=self.tester.fitness, reverse=True)

    popcount2 = len(pop) ** 2

    children = []

    # build children
    for i in xrange(int(self.spawn / 2)):

    # pick parents, preferring earlier (more fit)
    index1 = index2 = int(math.sqrt(random.randrange(popcount2)))
    parent1 = pop[-index1]
    while index1 == index2:
    index2 = int(math.sqrt(random.randrange(popcount2)))
    parent2 = pop[-index2]

    # crossover the two parents and mutate
    child1, child2 = self.crossover(parent1, parent2)

    child1 = self.mutator.mutate(child1)
    child2 = self.mutator.mutate(child2)

    children.extend([child1, child2])

    # add the top n parents to this generation
    if self.preserve:
    children.extend(pop[:self.preserve])

    # sort by fitness
    children = sorted(children, key=self.tester.fitness, reverse=True)

    # and return the top n
    return children[:self.cull]

    def crossover(self, parent1, parent2):
    "Cross over the two choices"

    # can't cross over empty list
    if len(parent1) == 0 or len(parent2) == 0:
    return parent1, parent2

    xpoint1 = random.randrange(len(parent1))
    xpoint2 = random.randrange(len(parent2))

    child1 = parent1[:xpoint1] + parent2[xpoint2:]
    child2 = parent2[:xpoint2] + parent1[xpoint1:]

    return child1, child2

    class Mutator(object):
    def __init__(self, edges):
    self.edges = edges

    def mutate(self, individual):
    mutations = [
    (self.move_point, 1),
    (self.swap_points, 2),
    (self.delete_point, 1),
    (self.add_middle, 1),
    (self.add_start, 0),
    (self.add_end, 0)]

    available = filter(lambda m: len(individual) >= m[1], mutations)
    choice = random.randrange(len(available))

    mutated = available[choice][0](individual)
    cleaned = self.cleanup(mutated)

    # print '%s %s -> %s' % (individual,
    # available[choice][0].__name__, cleaned)
    #
    return cleaned

    def cleanup(self, individual):
    return list(no_dupes(individual))

    def move_point(self, individual):
    "Choose a random node and replace it with an adjacent node"
    if len(individual) == 0:
    return individual

    index = random.randrange(len(individual))
    try:
    individual[index] = self._get_adjacent(individual, individual[index])
    except NoAdjacentException:
    pass
    return individual

    def swap_points(self, individual):
    "Take two points in the route and swap their order"
    if len(individual) < 2:
    return individual

    i1 = i2 = random.randrange(len(individual))
    while i1 == i2:
    i2 = random.randrange(len(individual))

    individual[i1], individual[i2] = individual[i2], individual[i1]
    return individual

    def delete_point(self, individual):
    "Remove a point from the list"
    if len(individual) == 0:
    return individual

    index = random.randrange(len(individual))
    return individual[:index] + individual[index + 1:]

    def add_middle(self, individual):
    "Add a point to the list anywhere in the middle"

    index = random.randrange(len(individual))

    try:
    if random.randrange(2):
    # insert before
    individual.insert(index, self._get_adjacent(individual, individual[index]))
    else:
    # insert after
    individual.insert(index + 1, self._get_adjacent(individual, individual[index]))
    except NoAdjacentException:
    pass

    return individual

    def add_start(self, individual):
    try:
    individual.insert(0, self._get_adjacent(individual, START_INDEX))
    except NoAdjacentException:
    pass
    return individual

    def add_end(self, individual):
    try:
    individual.insert(len(individual) + 1,
    self._get_adjacent(individual, END_INDEX))
    except NoAdjacentException:
    pass
    return individual

    def _get_adjacent(self, individual, point):

    neighbors = []
    for index in self.edges[point]:
    if index in [0, 1]:
    continue
    if index in individual:
    continue
    neighbors.append(index)

    if len(neighbors) == 0:
    raise NoAdjacentException

    neighbor_index = random.randrange(len(neighbors))
    return neighbors[neighbor_index]

    class NoAdjacentException(Exception): pass

  2. @invalid-email-address Anonymous created this gist Apr 23, 2010.
    373 changes: 373 additions & 0 deletions gistfile1.pyw
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,373 @@
    import math
    import random
    import operator

    from django.contrib.gis.geos import LineString, Point

    from scenicroute.apps.hunts.routing.triangulation import (
    build_triangulation, START_INDEX, END_INDEX)
    from scenicroute.apps.hunts.routing.culling import no_dupes
    from scenicroute.apps.hunts.routing.estimation import (duration_for_distance,
    distance_in_duration)
    from scenicroute.utils.geography.voronoi import computeDelaunayTriangulation
    from scenicroute.utils.geography.distance import great_circle_distance as gcd
    from scenicroute.utils.geography.coordinates import to_tuple

    class EvolutionSettings:
    num_generations = 10 # number of generations to run for
    spawn = 100 # number of new children to make each generation
    preserve = 5 # preserve the top N children each generation unchanged
    cull = 20 # number of top children to keep each generation
    choose_from_top = 10 # and choose randomly from the top N final results

    def build_route(hunt, messages, desired_duration):
    """
    Build a candidate route, return a list of messages
    """
    if len(messages) == 0:
    # Return an empty list
    return []

    # triangulate the route
    points, messages_lists, edges = build_triangulation(hunt, messages)
    # points is a list of Point objects [START, END, P0, P1...]
    # messages is a list of lists of messages ['start', 'end', [], [], ...]
    # edges is a dict is list indexes, { i0: set(i1), i1: set(i2, i3), ... }

    if len(points) == 2:
    # only start and end points
    return []

    final_population = evolve_candidates(points, edges, desired_duration)

    # and choose a random of the top five
    # if the solution is highly convergent, then there will be only one
    # and the random choice won't matter. if it's highly divergent,
    # then we get a little spice in the choice
    final_choice = random.randrange(EvolutionSettings.choose_from_top)
    index_sequence = final_population[final_choice]

    # choose among the messages for each location in the sequence
    final_route = []
    for index in index_sequence:
    message_list = messages_lists[index]
    choice = random.randrange(len(message_list))
    message = message_list[choice]
    final_route.append(message)

    return final_route

    def evolve_candidates(points, edges, desired_duration):

    # seed and run the genetic algorithm
    initial = [[
    random.randrange(2, len(points)),
    random.randrange(2, len(points))] for i in xrange(40)]
    tester = FitnessTester(points, desired_duration)
    mutator = Mutator(edges)
    evolution_chamber = EvolutionChamber(tester, mutator,
    generations=EvolutionSettings.num_generations,
    cull=EvolutionSettings.cull,
    spawn=EvolutionSettings.spawn,
    preserve=EvolutionSettings.preserve)

    evolution_chamber.evolve(initial)

    # and return the final population
    return evolution_chamber.pop

    def dot(a, b):
    alen = math.hypot(*a)
    blen = math.hypot(*b)
    a = map(lambda x: x / alen, a)
    b = map(lambda x: x / blen, b)
    return reduce(operator.add, map(operator.mul, a, b))

    class FitnessTester(object):

    LETHAL = -99999999

    def __init__(self, points, desired_duration):
    self.points = points
    self.desired_duration = desired_duration
    self.min_step_duration = 2 # no step should be less than 3 minutes
    self.max_step_duration = 6 # no step should be greater than 10 minutes
    self.max_switchback = -0.90 # no angle should switch back more than this much
    self.stop_duration = 1 # add one minute to total duration for each stop
    self._fitness_cache = {}

    def fitness(self, individual):

    if tuple(individual) in self._fitness_cache:
    return self._fitness_cache[tuple(individual)]

    if 0 in individual or 1 in individual:
    return self.LETHAL
    if isinstance(individual, tuple):
    individual = list(individual)

    duplicates = len(individual) - len(list(set(individual)))
    if duplicates:
    return self.LETHAL

    # total duration includes 5 minutes per stop
    total_duration = len(individual) * self.stop_duration
    total_backwards = 0
    total_below_min = 0
    total_above_max = 0

    all_points = [self.points[i]
    for i in [START_INDEX] + individual + [END_INDEX]]

    last_point = all_points[0]
    last_distance_to_end = gcd(last_point, all_points[-1])

    for point in all_points[1:]:

    distance = gcd(last_point, point)
    distance_to_end = gcd(point, all_points[-1])

    duration = duration_for_distance(distance)

    total_duration += duration
    if distance_to_end > last_distance_to_end:
    total_backwards += duration_for_distance(
    distance_to_end - last_distance_to_end)

    if duration < self.min_step_duration:
    total_below_min += (self.min_step_duration - duration)
    if duration > self.max_step_duration:
    total_above_max += (duration - self.max_step_duration)

    last_point = point
    last_distance_to_end = distance_to_end

    num_switchbacks = 0

    if len(all_points) > 2:
    for i in xrange(1, len(all_points) - 1):
    ax, ay = all_points[i-1].coords
    ox, oy = all_points[i].coords
    bx, by = all_points[i+1].coords
    ao = (ox - ax, oy - ay)
    ob = (bx - ox, by - oy)
    try:
    product = dot(ao, ob)
    if product < self.max_switchback:
    num_switchbacks += 1
    except ZeroDivisionError:
    pass


    # lose a point for every minute away from desired total distance
    duration_penalty = abs((self.desired_duration - total_duration))

    # lose a point for every minute you go backwards
    backwards_penalty = 1.0 * total_backwards

    # lose five points for every minute below the penalty
    below_min_penalty = 5.0 * total_below_min
    # lose two points for every minutes above max
    above_max_penalty = 2.0 * total_above_max

    # lose five points for every major switchback
    switchback_penalty = 5.0 * num_switchbacks

    fitness = 0
    fitness -= duration_penalty
    fitness -= backwards_penalty
    fitness -= switchback_penalty
    fitness -= below_min_penalty
    fitness -= above_max_penalty

    # print 'total duration: %.1f, switchbacks %d, below_min %.1f, above_max %.1f, fitness %.5f - %s' % (
    # total_duration, num_switchbacks, total_below_min, total_above_max, fitness,
    # individual)

    self._fitness_cache[tuple(individual)] = fitness

    return fitness

    class EvolutionChamber(object):
    def __init__(self, tester, mutator, generations, cull, spawn,
    preserve, mutate=0.5):
    self.tester = tester
    self.mutator = mutator
    self.generations = generations
    self.cull = cull
    self.spawn = spawn
    self.preserve = preserve
    self.mutate = mutate
    self.pop = None

    def evolve(self, initial):
    "Start the evolution and return the best result."

    self.pop = initial

    for i in xrange(self.generations):
    self.pop = self.gen(self.pop)

    return self.pop[0] # return most fit.

    def gen(self, pop):
    "Run a generation on this population and return the next one"
    # sort by fitness, best first

    pop = sorted(pop, key=self.tester.fitness, reverse=True)

    popcount2 = len(pop) ** 2

    children = []

    # build children
    for i in xrange(int(self.spawn / 2)):

    # pick parents, preferring earlier (more fit)
    index1 = index2 = int(math.sqrt(random.randrange(popcount2)))
    parent1 = pop[-index1]
    while index1 == index2:
    index2 = int(math.sqrt(random.randrange(popcount2)))
    parent2 = pop[-index2]

    # crossover the two parents and mutate
    child1, child2 = self.crossover(parent1, parent2)

    child1 = self.mutator.mutate(child1)
    child2 = self.mutator.mutate(child2)

    children.extend([child1, child2])

    # add the top n parents to this generation
    if self.preserve:
    children.extend(pop[:self.preserve])

    # sort by fitness
    children = sorted(children, key=self.tester.fitness, reverse=True)

    # and return the top n
    return children[:self.cull]

    def crossover(self, parent1, parent2):
    "Cross over the two choices"

    # can't cross over empty list
    if len(parent1) == 0 or len(parent2) == 0:
    return parent1, parent2

    xpoint1 = random.randrange(len(parent1))
    xpoint2 = random.randrange(len(parent2))

    child1 = parent1[:xpoint1] + parent2[xpoint2:]
    child2 = parent2[:xpoint2] + parent1[xpoint1:]

    return child1, child2

    class Mutator(object):
    def __init__(self, edges):
    self.edges = edges

    def mutate(self, individual):
    mutations = [
    (self.move_point, 1),
    (self.swap_points, 2),
    (self.delete_point, 1),
    (self.add_middle, 1),
    (self.add_start, 0),
    (self.add_end, 0)]

    available = filter(lambda m: len(individual) >= m[1], mutations)
    choice = random.randrange(len(available))

    mutated = available[choice][0](individual)
    cleaned = self.cleanup(mutated)

    # print '%s %s -> %s' % (individual,
    # available[choice][0].__name__, cleaned)
    #
    return cleaned

    def cleanup(self, individual):
    return list(no_dupes(individual))

    def move_point(self, individual):
    "Choose a random node and replace it with an adjacent node"
    if len(individual) == 0:
    return individual

    index = random.randrange(len(individual))
    try:
    individual[index] = self._get_adjacent(individual, individual[index])
    except NoAdjacentException:
    pass
    return individual

    def swap_points(self, individual):
    "Take two points in the route and swap their order"
    if len(individual) < 2:
    return individual

    i1 = i2 = random.randrange(len(individual))
    while i1 == i2:
    i2 = random.randrange(len(individual))

    individual[i1], individual[i2] = individual[i2], individual[i1]
    return individual

    def delete_point(self, individual):
    "Remove a point from the list"
    if len(individual) == 0:
    return individual

    index = random.randrange(len(individual))
    return individual[:index] + individual[index + 1:]

    def add_middle(self, individual):
    "Add a point to the list anywhere in the middle"

    index = random.randrange(len(individual))

    try:
    if random.randrange(2):
    # insert before
    individual.insert(index, self._get_adjacent(individual, individual[index]))
    else:
    # insert after
    individual.insert(index + 1, self._get_adjacent(individual, individual[index]))
    except NoAdjacentException:
    pass

    return individual

    def add_start(self, individual):
    try:
    individual.insert(0, self._get_adjacent(individual, START_INDEX))
    except NoAdjacentException:
    pass
    return individual

    def add_end(self, individual):
    try:
    individual.insert(len(individual) + 1,
    self._get_adjacent(individual, END_INDEX))
    except NoAdjacentException:
    pass
    return individual

    def _get_adjacent(self, individual, point):

    neighbors = []
    for index in self.edges[point]:
    if index in [0, 1]:
    continue
    if index in individual:
    continue
    neighbors.append(index)

    if len(neighbors) == 0:
    raise NoAdjacentException

    neighbor_index = random.randrange(len(neighbors))
    return neighbors[neighbor_index]

    class NoAdjacentException(Exception): pass