Created
November 28, 2012 01:25
-
-
Save cmd-ntrf/4158432 to your computer and use it in GitHub Desktop.
DEAP GP Spambase example + ADF.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This file is part of EAP. | |
# | |
# EAP is free software: you can redistribute it and/or modify | |
# it under the terms of the GNU Lesser General Public License as | |
# published by the Free Software Foundation, either version 3 of | |
# the License, or (at your option) any later version. | |
# | |
# EAP is distributed in the hope that it will be useful, | |
# but WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
# GNU Lesser General Public License for more details. | |
# | |
# You should have received a copy of the GNU Lesser General Public | |
# License along with EAP. If not, see <http://www.gnu.org/licenses/>. | |
import random | |
import operator | |
import csv | |
import itertools | |
from deap import algorithms | |
from deap import base | |
from deap import creator | |
from deap import tools | |
from deap import gp | |
# Read the spam list features and put it in a list of lists. | |
# The dataset is from http://archive.ics.uci.edu/ml/datasets/Spambase | |
# This example is a copy of the OpenBEAGLE example : | |
# http://beagle.gel.ulaval.ca/refmanual/beagle/html/d2/dbe/group__Spambase.html | |
spamReader = csv.reader(open("spambase.csv")) | |
spam = list(list(float(elem) for elem in row) for row in spamReader) | |
adfset1 = gp.PrimitiveSetTyped("ADF1", ["float", "float"], "bool", | |
"IN") | |
adfset1.addPrimitive(operator.and_, ["bool", "bool"], "bool") | |
adfset1.addPrimitive(operator.or_, ["bool", "bool"], "bool") | |
adfset1.addPrimitive(operator.not_, ["bool"], "bool") | |
adfset1.addPrimitive(operator.add, ["float","float"], "float") | |
adfset1.addPrimitive(operator.sub, ["float","float"], "float") | |
adfset1.addPrimitive(operator.mul, ["float","float"], "float") | |
adfset1.addEphemeralConstant(lambda: random.random() * 100, "float") | |
adfset1.addTerminal(0, "bool") | |
adfset1.addTerminal(1, "bool") | |
# defined a new primitive set for strongly typed GP | |
pset = gp.PrimitiveSetTyped("MAIN", list(itertools.repeat("float", 57)), "bool", "IN") | |
# boolean operators | |
pset.addPrimitive(operator.and_, ["bool", "bool"], "bool") | |
pset.addPrimitive(operator.or_, ["bool", "bool"], "bool") | |
pset.addPrimitive(operator.not_, ["bool"], "bool") | |
# floating point operators | |
# Define a safe division function | |
def safeDiv(left, right): | |
try: return left / right | |
except ZeroDivisionError: return 0 | |
pset.addPrimitive(operator.add, ["float","float"], "float") | |
pset.addPrimitive(operator.sub, ["float","float"], "float") | |
pset.addPrimitive(operator.mul, ["float","float"], "float") | |
pset.addPrimitive(safeDiv, ["float","float"], "float") | |
pset.addADF(adfset1) | |
# logic operators | |
# Define a new if-then-else function | |
def if_then_else(input, output1, output2): | |
if input: return output1 | |
else: return output2 | |
pset.addPrimitive(operator.lt, ["float", "float"], "bool") | |
pset.addPrimitive(operator.eq, ["float", "float"], "bool") | |
pset.addPrimitive(if_then_else, ["bool", "float", "float"], "float") | |
# terminals | |
pset.addEphemeralConstant(lambda: random.random() * 100, "float") | |
pset.addTerminal(0, "bool") | |
pset.addTerminal(1, "bool") | |
creator.create("FitnessMax", base.Fitness, weights=(1.0,)) | |
creator.create("Individual", list, fitness=creator.FitnessMax) | |
creator.create("ADF1", gp.PrimitiveTree, pset=adfset1) | |
creator.create("MAIN", gp.PrimitiveTree, pset=pset) | |
toolbox = base.Toolbox() | |
toolbox.register('adf_expr1', gp.genFull, pset=adfset1, min_=1, max_=2, | |
type_=adfset1.ret) | |
toolbox.register('main_expr', gp.genRamped, pset=pset, min_=1, max_=2, | |
type_=pset.ret) | |
toolbox.register('ADF1', tools.initIterate, creator.ADF1, toolbox.adf_expr1) | |
toolbox.register('MAIN', tools.initIterate, creator.MAIN, toolbox.main_expr) | |
func_cycle = [toolbox.MAIN, toolbox.ADF1] | |
toolbox.register('individual', tools.initCycle, creator.Individual, func_cycle) | |
toolbox.register("population", tools.initRepeat, list, toolbox.individual) | |
toolbox.register("lambdify", gp.lambdifyADF) | |
def evalSpambase(individual): | |
# Transform the tree expression in a callable function | |
func = toolbox.lambdify(expr=individual) | |
# Randomly sample 400 mails in the spam database | |
spam_samp = random.sample(spam, 400) | |
# Evaluate the sum of correctly identified mail as spam | |
result = sum(bool(func(*mail[:57])) is bool(mail[57]) for mail in spam_samp) | |
return result, | |
toolbox.register("evaluate", evalSpambase) | |
toolbox.register("select", tools.selTournament, tournsize=3) | |
toolbox.register("mate", gp.cxOnePoint) | |
toolbox.register("expr_mut", gp.genFull, min_=0, max_=2) | |
toolbox.register("mutate", gp.mutUniform, expr=toolbox.expr_mut) | |
def main(): | |
random.seed(1024) | |
ind = toolbox.individual() | |
pop = toolbox.population(n=100) | |
hof = tools.HallOfFame(1) | |
stats = tools.Statistics(lambda ind: ind.fitness.values) | |
stats.register("avg", tools.mean) | |
stats.register("std", tools.std) | |
stats.register("min", min) | |
stats.register("max", max) | |
logger = tools.EvolutionLogger(["gen", "evals"] + stats.functions.keys()) | |
logger.logHeader() | |
CXPB, MUTPB, NGEN = 0.5, 0.2, 40 | |
# Evaluate the entire population | |
for ind in pop: | |
ind.fitness.values = toolbox.evaluate(ind) | |
hof.update(pop) | |
stats.update(pop) | |
logger.logGeneration(gen=0, evals=len(pop), stats=stats) | |
for g in range(1, NGEN): | |
# Select the offspring | |
offspring = toolbox.select(pop, len(pop)) | |
# Clone the offspring | |
offspring = [toolbox.clone(ind) for ind in offspring] | |
# Apply crossover and mutation | |
for ind1, ind2 in zip(offspring[::2], offspring[1::2]): | |
for tree1, tree2 in zip(ind1, ind2): | |
if random.random() < CXPB: | |
toolbox.mate(tree1, tree2) | |
del ind1.fitness.values | |
del ind2.fitness.values | |
for ind in offspring: | |
for tree in ind: | |
if random.random() < MUTPB: | |
toolbox.mutate(tree) | |
del ind.fitness.values | |
# Evaluate the individuals with an invalid fitness | |
invalids = [ind for ind in offspring if not ind.fitness.valid] | |
for ind in invalids: | |
ind.fitness.values = toolbox.evaluate(ind) | |
# Replacement of the population by the offspring | |
pop = offspring | |
hof.update(pop) | |
stats.update(pop) | |
logger.logGeneration(gen=g, evals=len(invalids), stats=stats) | |
print 'Best individual : ', gp.stringify(hof[0][0]), hof[0].fitness | |
return pop, stats, hof | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment