Skip to content

Instantly share code, notes, and snippets.

@TheBojda
Created January 30, 2021 09:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save TheBojda/5f02f03a6c2aa28dcbd60a22b1d4ce7d to your computer and use it in GitHub Desktop.
Save TheBojda/5f02f03a6c2aa28dcbd60a22b1d4ce7d to your computer and use it in GitHub Desktop.
Reinforcement learning on multiple CPUs with Genetic Algorithm using PyGAD, PyTorch, Open AI Gym (CartPole) and multiprocessing.Pool
import time
import gym
import numpy as np
import pygad.torchga
import pygad
import torch
import torch.nn as nn
from multiprocessing import Pool
def fitness_func(solution, sol_idx):
global model, observation_space_size, env
model_weights_dict = pygad.torchga.model_weights_as_dict(model=model, weights_vector=solution)
model.load_state_dict(model_weights_dict)
# play game
observation = env.reset()
sum_reward = 0
done = False
while (not done) and (sum_reward < 1000):
# env.render()
ob_tensor = torch.tensor(observation.copy(), dtype=torch.float)
q_values = model(ob_tensor)
action = np.argmax(q_values).numpy()
observation_next, reward, done, info = env.step(action)
observation = observation_next
sum_reward += reward
return sum_reward
def callback_generation(ga_instance):
print("Generation = {generation}".format(generation=ga_instance.generations_completed))
print("Fitness = {fitness}".format(fitness=ga_instance.best_solution()[1]))
def fitness_wrapper(solution):
return fitness_func(solution, 0)
class PooledGA(pygad.GA):
def cal_pop_fitness(self):
global pool
pop_fitness = pool.map(fitness_wrapper, self.population)
print(pop_fitness)
pop_fitness = np.array(pop_fitness)
return pop_fitness
env = gym.make("CartPole-v1")
observation_space_size = env.observation_space.shape[0]
action_space_size = env.action_space.n
torch.set_grad_enabled(False)
model = nn.Sequential(
nn.Linear(observation_space_size, 16),
nn.ReLU(),
nn.Linear(16, 16),
nn.ReLU(),
nn.Linear(16, action_space_size)
)
torch_ga = pygad.torchga.TorchGA(model=model, num_solutions=10)
# Prepare the PyGAD parameters. Check the documentation for more information: https://pygad.readthedocs.io/en/latest/README_pygad_ReadTheDocs.html#pygad-ga-class
num_generations = 50 # Number of generations.
num_parents_mating = 5 # Number of solutions to be selected as parents in the mating pool.
initial_population = torch_ga.population_weights # Initial population of network weights
parent_selection_type = "sss" # Type of parent selection.
crossover_type = "single_point" # Type of the crossover operator.
mutation_type = "random" # Type of the mutation operator.
mutation_percent_genes = 10 # Percentage of genes to mutate. This parameter has no action if the parameter mutation_num_genes exists.
keep_parents = -1 # Number of parents to keep in the next population. -1 means keep all parents and 0 means keep nothing.
start_time = time.time()
# ga_instance = pygad.GA(num_generations=num_generations,
# num_parents_mating=num_parents_mating,
# initial_population=initial_population,
# fitness_func=fitness_func,
# parent_selection_type=parent_selection_type,
# crossover_type=crossover_type,
# mutation_type=mutation_type,
# mutation_percent_genes=mutation_percent_genes,
# keep_parents=keep_parents,
# on_generation=callback_generation)
#
# ga_instance.run()
ga_instance = PooledGA(num_generations=num_generations,
num_parents_mating=num_parents_mating,
initial_population=initial_population,
fitness_func=fitness_func,
parent_selection_type=parent_selection_type,
crossover_type=crossover_type,
mutation_type=mutation_type,
mutation_percent_genes=mutation_percent_genes,
keep_parents=keep_parents,
on_generation=callback_generation)
with Pool(processes=10) as pool:
ga_instance.run()
solution, solution_fitness, solution_idx = ga_instance.best_solution()
print("Fitness value of the best solution = {solution_fitness}".format(solution_fitness=solution_fitness))
print("Index of the best solution : {solution_idx}".format(solution_idx=solution_idx))
print("--- %s seconds ---" % (time.time() - start_time))
model_weights_dict = pygad.torchga.model_weights_as_dict(model=model, weights_vector=solution)
model.load_state_dict(model_weights_dict)
# play game
observation = env.reset()
sum_reward = 0
done = False
while not done:
env.render()
ob_tensor = torch.tensor(observation.copy(), dtype=torch.float)
q_values = model(ob_tensor)
action = np.argmax(q_values).numpy()
observation_next, reward, done, info = env.step(action)
observation = observation_next
sum_reward += reward
print("Sum reward: " + str(sum_reward))
# After the generations complete, some plots are showed that summarize how the outputs/fitness values evolve over generations.
ga_instance.plot_result(title="PyGAD & Keras - Iteration vs. Fitness", linewidth=4)
@mpasternak
Copy link

mpasternak commented Jan 21, 2022

I came to see your code snippet looking on some ways to make my PyGAD-using code parallel.

I am not entirely sure that moving fitness calculation into another process is doing this code any good. For me, it looks like in this code snippet once CPU starts calculating fitness_function, it will, in fact, use another CPU (as in another process - deferring calculations to multiprocessing.Pool) - but the first CPU, calling that function, will, in fact, have to wait for the function to complete.

So it looks like you do use 2 CPUs but you do this in a sequential manner and not in a parallel one.

Am I wrong?

EDIT: of course I was wrong. It looks that what's being parallelized is calculating fitness of every single element of self.population. And it looks that I can use it in my code -- even if I need to calculate fitness in my model sequentially, I could easily calculate population fitness in parallel. So, analyzing of this snipped actually helped a lot. Thanks for posting this!

@TheBojda
Copy link
Author

I'm happy if this snippet helped you. :)

Btw, there is a medium article for the snippet. Take a look at it if you think.

https://thebojda.medium.com/how-genetic-algorithms-can-compete-with-gradient-descent-and-backprop-30b59d5b1ac0

@mpasternak
Copy link

I think I saw a comment of PyGAD author somewhere, about parallelism and that he wasn't exactly happy with the results so he didn't implement it. IDK. My fitness function has to calculate in order, so I can't just create chunks of chromosome and calculate it. On the other hand, it is perfectly possible to calculate population in a parallel way.

@Dvorsek
Copy link

Dvorsek commented May 20, 2022

Hi, thank you for making this code. Got here from PyGAd documentations since I want to calculate population fitness in parallel.

I have some questions regarding solution_idx, in line 39:
return fitness_func(solution, 0)
you set it to 0, is there any way to keep it changing accordingly to the actual index of a solution? Because if I understand this correctly the idx will be 0 for every solution.

@mpasternak
Copy link

@Dvorsek as you can see, sol_idx is not used in this example code.

If I needed to track indices of solutions, I would add some code to PooledGA.cal_pop_fitness function. On the other hand, I think it is already being tracked somewhere in the PyGAD object.

What are you trying to do?

@Dvorsek
Copy link

Dvorsek commented May 20, 2022

@mpasternak Okay, I see.

For my fitness function I have to run each one in a different directory, otherwise the files will get overwritten from other runs that will be running in parallel. So my thoughts were to navigate each run to its own directory based on solution_idx like this:

def fitness_func(solution, solution_idx):
    # Working directory
    try:
        os.mkdir(solution_idx)
    except FileExistsError:
        pass
    os.chdir(solution_idx)

    # Rest of fitness_func

The files can get overwritten with each generation just not with runs from other solutions in the same generation.

If you have any ideas on how I could proceed they would be much appreciated.

@mpasternak
Copy link

class PooledGA(pygad.GA):

   population_index = 0

    def cal_pop_fitness(self):
        global pool

        pop_fitness = pool.map(fitness_wrapper, self.population, self.population_index)
        self.population_index += 1
        pop_fitness = np.array(pop_fitness)
        return pop_fitness


@TheBojda
Copy link
Author

If you need a separate directory for every population, you can do what @mpasternak wrote.
If you need a separate directory for every solution, you can generate a UUID for them.
If you need a separate directory for every process, you can use the process ID using os.getpid() .

@Dvorsek
Copy link

Dvorsek commented May 23, 2022

@mpasternak @TheBojda Thank you both for your help.
I think I will make use of os.getpid()

@mpasternak
Copy link

I'd love to see both this example and the examples of separation per process/pop/etc in PyGad Docs.

@Dvorsek would you like to write it and submit it to https://github.com/ahmedfgad/GeneticAlgorithmPython repo as a PR?

@Dvorsek
Copy link

Dvorsek commented May 24, 2022

@mpasternak Sure I can do it. I have solved this per process issue with Value from multiprocessing now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment