-
-
Save TheBojda/5f02f03a6c2aa28dcbd60a22b1d4ce7d to your computer and use it in GitHub Desktop.
import time | |
import gym | |
import numpy as np | |
import pygad.torchga | |
import pygad | |
import torch | |
import torch.nn as nn | |
from multiprocessing import Pool | |
def fitness_func(solution, sol_idx): | |
global model, observation_space_size, env | |
model_weights_dict = pygad.torchga.model_weights_as_dict(model=model, weights_vector=solution) | |
model.load_state_dict(model_weights_dict) | |
# play game | |
observation = env.reset() | |
sum_reward = 0 | |
done = False | |
while (not done) and (sum_reward < 1000): | |
# env.render() | |
ob_tensor = torch.tensor(observation.copy(), dtype=torch.float) | |
q_values = model(ob_tensor) | |
action = np.argmax(q_values).numpy() | |
observation_next, reward, done, info = env.step(action) | |
observation = observation_next | |
sum_reward += reward | |
return sum_reward | |
def callback_generation(ga_instance): | |
print("Generation = {generation}".format(generation=ga_instance.generations_completed)) | |
print("Fitness = {fitness}".format(fitness=ga_instance.best_solution()[1])) | |
def fitness_wrapper(solution): | |
return fitness_func(solution, 0) | |
class PooledGA(pygad.GA): | |
def cal_pop_fitness(self): | |
global pool | |
pop_fitness = pool.map(fitness_wrapper, self.population) | |
print(pop_fitness) | |
pop_fitness = np.array(pop_fitness) | |
return pop_fitness | |
env = gym.make("CartPole-v1") | |
observation_space_size = env.observation_space.shape[0] | |
action_space_size = env.action_space.n | |
torch.set_grad_enabled(False) | |
model = nn.Sequential( | |
nn.Linear(observation_space_size, 16), | |
nn.ReLU(), | |
nn.Linear(16, 16), | |
nn.ReLU(), | |
nn.Linear(16, action_space_size) | |
) | |
torch_ga = pygad.torchga.TorchGA(model=model, num_solutions=10) | |
# Prepare the PyGAD parameters. Check the documentation for more information: https://pygad.readthedocs.io/en/latest/README_pygad_ReadTheDocs.html#pygad-ga-class | |
num_generations = 50 # Number of generations. | |
num_parents_mating = 5 # Number of solutions to be selected as parents in the mating pool. | |
initial_population = torch_ga.population_weights # Initial population of network weights | |
parent_selection_type = "sss" # Type of parent selection. | |
crossover_type = "single_point" # Type of the crossover operator. | |
mutation_type = "random" # Type of the mutation operator. | |
mutation_percent_genes = 10 # Percentage of genes to mutate. This parameter has no action if the parameter mutation_num_genes exists. | |
keep_parents = -1 # Number of parents to keep in the next population. -1 means keep all parents and 0 means keep nothing. | |
start_time = time.time() | |
# ga_instance = pygad.GA(num_generations=num_generations, | |
# num_parents_mating=num_parents_mating, | |
# initial_population=initial_population, | |
# fitness_func=fitness_func, | |
# parent_selection_type=parent_selection_type, | |
# crossover_type=crossover_type, | |
# mutation_type=mutation_type, | |
# mutation_percent_genes=mutation_percent_genes, | |
# keep_parents=keep_parents, | |
# on_generation=callback_generation) | |
# | |
# ga_instance.run() | |
ga_instance = PooledGA(num_generations=num_generations, | |
num_parents_mating=num_parents_mating, | |
initial_population=initial_population, | |
fitness_func=fitness_func, | |
parent_selection_type=parent_selection_type, | |
crossover_type=crossover_type, | |
mutation_type=mutation_type, | |
mutation_percent_genes=mutation_percent_genes, | |
keep_parents=keep_parents, | |
on_generation=callback_generation) | |
with Pool(processes=10) as pool: | |
ga_instance.run() | |
solution, solution_fitness, solution_idx = ga_instance.best_solution() | |
print("Fitness value of the best solution = {solution_fitness}".format(solution_fitness=solution_fitness)) | |
print("Index of the best solution : {solution_idx}".format(solution_idx=solution_idx)) | |
print("--- %s seconds ---" % (time.time() - start_time)) | |
model_weights_dict = pygad.torchga.model_weights_as_dict(model=model, weights_vector=solution) | |
model.load_state_dict(model_weights_dict) | |
# play game | |
observation = env.reset() | |
sum_reward = 0 | |
done = False | |
while not done: | |
env.render() | |
ob_tensor = torch.tensor(observation.copy(), dtype=torch.float) | |
q_values = model(ob_tensor) | |
action = np.argmax(q_values).numpy() | |
observation_next, reward, done, info = env.step(action) | |
observation = observation_next | |
sum_reward += reward | |
print("Sum reward: " + str(sum_reward)) | |
# After the generations complete, some plots are showed that summarize how the outputs/fitness values evolve over generations. | |
ga_instance.plot_result(title="PyGAD & Keras - Iteration vs. Fitness", linewidth=4) |
I'm happy if this snippet helped you. :)
Btw, there is a medium article for the snippet. Take a look at it if you think.
I think I saw a comment of PyGAD author somewhere, about parallelism and that he wasn't exactly happy with the results so he didn't implement it. IDK. My fitness function has to calculate in order, so I can't just create chunks of chromosome and calculate it. On the other hand, it is perfectly possible to calculate population in a parallel way.
Hi, thank you for making this code. Got here from PyGAd documentations since I want to calculate population fitness in parallel.
I have some questions regarding solution_idx, in line 39:
return fitness_func(solution, 0)
you set it to 0, is there any way to keep it changing accordingly to the actual index of a solution? Because if I understand this correctly the idx will be 0 for every solution.
@Dvorsek as you can see, sol_idx
is not used in this example code.
If I needed to track indices of solutions, I would add some code to PooledGA.cal_pop_fitness
function. On the other hand, I think it is already being tracked somewhere in the PyGAD
object.
What are you trying to do?
@mpasternak Okay, I see.
For my fitness function I have to run each one in a different directory, otherwise the files will get overwritten from other runs that will be running in parallel. So my thoughts were to navigate each run to its own directory based on solution_idx
like this:
def fitness_func(solution, solution_idx):
# Working directory
try:
os.mkdir(solution_idx)
except FileExistsError:
pass
os.chdir(solution_idx)
# Rest of fitness_func
The files can get overwritten with each generation just not with runs from other solutions in the same generation.
If you have any ideas on how I could proceed they would be much appreciated.
class PooledGA(pygad.GA):
population_index = 0
def cal_pop_fitness(self):
global pool
pop_fitness = pool.map(fitness_wrapper, self.population, self.population_index)
self.population_index += 1
pop_fitness = np.array(pop_fitness)
return pop_fitness
If you need a separate directory for every population, you can do what @mpasternak wrote.
If you need a separate directory for every solution, you can generate a UUID for them.
If you need a separate directory for every process, you can use the process ID using os.getpid() .
@mpasternak @TheBojda Thank you both for your help.
I think I will make use of os.getpid()
I'd love to see both this example and the examples of separation per process/pop/etc in PyGad Docs.
@Dvorsek would you like to write it and submit it to https://github.com/ahmedfgad/GeneticAlgorithmPython repo as a PR?
@mpasternak Sure I can do it. I have solved this per process issue with Value
from multiprocessing
now.
I came to see your code snippet looking on some ways to make my PyGAD-using code parallel.
I am not entirely sure that moving fitness calculation into another process is doing this code any good. For me, it looks like in this code snippet once CPU starts calculating fitness_function, it will, in fact, use another CPU (as in another process - deferring calculations to multiprocessing.Pool) - but the first CPU, calling that function, will, in fact, have to wait for the function to complete.
So it looks like you do use 2 CPUs but you do this in a sequential manner and not in a parallel one.
Am I wrong?
EDIT: of course I was wrong. It looks that what's being parallelized is calculating fitness of every single element of self.population. And it looks that I can use it in my code -- even if I need to calculate fitness in my model sequentially, I could easily calculate population fitness in parallel. So, analyzing of this snipped actually helped a lot. Thanks for posting this!