Skip to content

Instantly share code, notes, and snippets.

@tk42
Created July 5, 2022 21:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tk42/2fa28c64c088bbd5608bde11cfd8e3d9 to your computer and use it in GitHub Desktop.
Save tk42/2fa28c64c088bbd5608bde11cfd8e3d9 to your computer and use it in GitHub Desktop.
import random
import numpy as np
params = []
import multiprocessing
from deap import base
from deap import creator
from deap import tools
#--------------------------------------------------------------------
#パラメータ値をランダムに決定する関数(個体生成関数の源泉)
def shuffle(container):
global params
shuffled = []
for x in params:
shuffled.append(random.choice(x))
return container(shuffled)
#list内のパラメータ値をランダムに変更する関数(突然変異関数の源泉)
def mutShuffle(individual, indpb):
global params
for i in range(len(individual)):
if random.random() < indpb:
individual[i] = random.choice(params[i])
return individual,
#--------------------------------------------------------------------
### main logic
def main(toolbox):
#random.seed(1024)
#個体をランダムにn個生成し、初期世代を生成
pop = toolbox.population(n=100) #n:世代の個体数
CXPB, MUTPB, NGEN = 0.5, 0.2, 40 #交叉確率、突然変異確率、ループ回数
print("Start of evolution")
#初期世代の全個体の適応度を目的関数により評価
fitnesses = list(map(toolbox.evaluate, pop))
for ind, fit in zip(pop, fitnesses):
ind.fitness.values = fit
print(" Evaluated %i individuals" % len(pop))
#ループ開始
for g in range(NGEN):
print("-- Generation %i --" % g)
#現行世代から個体を選択し次世代に追加
offspring = toolbox.select(pop, len(pop))
offspring = list(map(toolbox.clone, offspring))
#選択した個体に交叉を適応
for child1, child2 in zip(offspring[::2], offspring[1::2]):
if random.random() < CXPB:
toolbox.mate(child1, child2)
del child1.fitness.values
del child2.fitness.values
#選択した個体に突然変異を適応
for mutant in offspring:
if random.random() < MUTPB:
toolbox.mutate(mutant)
del mutant.fitness.values
#適応度が計算されていない個体を集めて適応度を計算
invalid_ind = [ind for ind in offspring if not ind.fitness.valid]
fitnesses = map(toolbox.evaluate, invalid_ind)
for ind, fit in zip(invalid_ind, fitnesses):
ind.fitness.values = fit
print(" Evaluated %i individuals" % len(invalid_ind))
#次世代を現行世代にコピー
pop[:] = offspring
#全個体の適応度をlistに格納
fits = [ind.fitness.values[0] for ind in pop]
#適応度の最大値、最小値、平均値、標準偏差を計算
length = len(pop)
# mean = sum(fits) / length
mean = np.mean(fits)
std = np.std(fits)
# sum2 = sum(x*x for x in fits)
# std = abs(sum2 / length - mean**2)**0.5
print(" Min %s" % min(fits))
print(" Max %s" % max(fits))
print(" Avg %s" % mean)
print(" Std %s" % std)
#最後の世代の中で最も適応度の高い個体のもつパラメータを準最適解として出力
best_ind = tools.selBest(pop, 1)[0]
print("Best parameter is %s, %s" % (best_ind, best_ind.fitness.values))
print("-- End of (successful) evolution --")
def calc_ga(p, eval_backtest):
global params
params = p
pool = multiprocessing.Pool()
#適合度クラスを作成
creator.create("FitnessMax", base.Fitness, weights=(1.0,))
creator.create("Individual", list, fitness=creator.FitnessMax)
toolbox = base.Toolbox()
toolbox.register("map", pool.map)
#個体生成関数,世代生成関数を定義
toolbox.register("individual", shuffle, creator.Individual)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)
#評価関数,交叉関数,突然変異関数,選択関数を定義
toolbox.register("evaluate", eval_backtest)
toolbox.register("mate", tools.cxTwoPoint)
toolbox.register("mutate", mutShuffle, indpb=0.05)
toolbox.register("select", tools.selTournament, tournsize=3)
main(toolbox)
pool.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment