Created
June 11, 2020 04:44
-
-
Save velotiotech/34666213c056eb940339dcfe381ec440 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#We first import the necessary libraries and define hyperparameters - | |
import gym | |
import random | |
import numpy as np | |
import tflearn | |
from tflearn.layers.core import input_data, dropout, fully_connected | |
from tflearn.layers.estimator import regression | |
from statistics import median, mean | |
from collections import Counter | |
LR = 2.33e-4 | |
env = gym.make("CartPole-v0") | |
observation = env.reset() | |
goal_steps = 500 | |
score_requirement = 50 | |
initial_games = 10000 | |
#Now we will define a function to generate training data - | |
def initial_population(): | |
# [OBS, MOVES] | |
training_data = [] | |
# all scores: | |
scores = [] | |
# scores above our threshold: | |
accepted_scores = [] | |
# number of episodes | |
for _ in range(initial_games): | |
score = 0 | |
# moves specifically from this episode: | |
episode_memory = [] | |
# previous observation that we saw | |
prev_observation = [] | |
for _ in range(goal_steps): | |
# choose random action left or right i.e (0 or 1) | |
action = random.randrange(0,2) | |
observation, reward, done, info = env.step(action) | |
# since that the observation is returned FROM the action | |
# we store previous observation and corresponding action | |
if len(prev_observation) > 0 : | |
episode_memory.append([prev_observation, action]) | |
prev_observation = observation | |
score+=reward | |
if done: break | |
# reinforcement methodology here. | |
# IF our score is higher than our threshold, we save | |
# all we're doing is reinforcing the score, we're not trying | |
# to influence the machine in any way as to HOW that score is | |
# reached. | |
if score >= score_requirement: | |
accepted_scores.append(score) | |
for data in episode_memory: | |
# convert to one-hot (this is the output layer for our neural network) | |
if data[1] == 1: | |
output = [0,1] | |
elif data[1] == 0: | |
output = [1,0] | |
# saving our training data | |
training_data.append([data[0], output]) | |
# reset env to play again | |
env.reset() | |
# save overall scores | |
scores.append(score) | |
# Now using tflearn we will define our neural network | |
def neural_network_model(input_size): | |
network = input_data(shape=[None, input_size, 1], name='input') | |
network = fully_connected(network, 128, activation='relu') | |
network = dropout(network, 0.8) | |
network = fully_connected(network, 256, activation='relu') | |
network = dropout(network, 0.8) | |
network = fully_connected(network, 512, activation='relu') | |
network = dropout(network, 0.8) | |
network = fully_connected(network, 256, activation='relu') | |
network = dropout(network, 0.8) | |
network = fully_connected(network, 128, activation='relu') | |
network = dropout(network, 0.8) | |
network = fully_connected(network, 2, activation='softmax') | |
network = regression(network, optimizer='adam', learning_rate=LR, loss='categorical_crossentropy', name='targets') | |
model = tflearn.DNN(network, tensorboard_dir='log') | |
return model | |
#It is time to train the model now - | |
def train_model(training_data, model=False): | |
X = np.array([i[0] for i in training_data]).reshape(-1,len(training_data[0][0]),1) | |
y = [i[1] for i in training_data] | |
if not model: | |
model = neural_network_model(input_size = len(X[0])) | |
model.fit({'input': X}, {'targets': y}, n_epoch=5, snapshot_step=500, show_metric=True, run_id='openai_CartPole') | |
return model | |
training_data = initial_population() | |
model = train_model(training_data) | |
#Training complete, now we should play the game to see how the output looks like | |
scores = [] | |
choices = [] | |
for each_game in range(10): | |
score = 0 | |
game_memory = [] | |
prev_obs = [] | |
env.reset() | |
for _ in range(goal_steps): | |
env.render() | |
if len(prev_obs)==0: | |
action = random.randrange(0,2) | |
else: | |
action = np.argmax(model.predict(prev_obs.reshape(-1,len(prev_obs),1))[0]) | |
choices.append(action) | |
new_observation, reward, done, info = env.step(action) | |
prev_obs = new_observation | |
game_memory.append([new_observation, action]) | |
score+=reward | |
if done: break | |
scores.append(score) | |
print('Average Score:',sum(scores)/len(scores)) | |
print('choice 1:{} choice 0:{}'.format(float((choices.count(1))/float(len(choices)))*100,float((choices.count(0))/float(len(choices)))*100)) | |
print(score_requirement) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment