Skip to content

Instantly share code, notes, and snippets.

@Madhivarman
Created May 8, 2018 13:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Madhivarman/bf24bb40d01f09944d3d1bf58e83f4c4 to your computer and use it in GitHub Desktop.
Save Madhivarman/bf24bb40d01f09944d3d1bf58e83f4c4 to your computer and use it in GitHub Desktop.
A Neural Network model that learns to play a PONG game from the image RAW pixels.
import gym
import numpy as np
env = gym.make("Pong-v0")
observation = env.reset()
#hyperparameters
episode_number = 0
batch_size=10 #how many episodes to wait before moving the weights
gamma = 0.99 #discount factor for reward
decay_rate = 0.99
hidden_layer_neurons = 200
input_dimensions = 80*80
learning_rate = 1e-4
reward_sum = 0
running_reward=None
prev_processed_observations=None
"""
To preprocess the OpenGym to train a neural network
1. Crop the image
2. Downsample the image
3. Convert image into black and white
4. Remove the background
5. convert 80x80 matrix of values into 6400x1 matrix
6. store the difference between current frame and previous frame
"""
def downsample(img):
return img[::2, ::2, :]
def remove_color(img):
#convert all color into black and white
return img[:,:,0] #RGB is Third dimension
def remove_background(img):
img[img == 144] = 0
img[img == 109] = 0
return img
def relu(vector):
vector[vector < 0] = 0
return vector
def sigmoid(x):
return 1.0/(1.0+np.exp(-x))
def apply_neural_nets(observation_matrix, weights):
"""
Based on the observation_matrix and weights, compute the new hidden layer
"""
hidden_layer_values = np.dot(weights['1'],observation_matrix)
hidden_layer_values = relu(hidden_layer_values)
output_layer_values = np.dot(hidden_layer_values,weights['2'])
output_layer_values = sigmoid(output_layer_values)
return hidden_layer_values, output_layer_values
def preprocess_observation(input_observation,prev_processed_observation,input_dimensions):
processed_observation = input_observation[35:195] #crop the image
processed_observation = downsample(processed_observation)
processed_observation = remove_color(processed_observation)
processed_observation = remove_background(processed_observation)
processed_observation[processed_observation != 0] = 1 #everything else (paddles, ball) set 1
processed_observation = processed_observation.astype(np.float).ravel() #convert from 80x80 to 1600x1
#subtract previous frame from current one
if prev_processed_observation is not None:
input_observation = processed_observation - prev_processed_observation
else:
input_observation = np.zeros(input_dimensions)
#store the previous frame so we can subtract from another frame
prev_processed_observations = processed_observation
return input_observation, prev_processed_observations
def choose_action(probability):
random_value = np.random.uniform()
if random_value < probability:
#signifies up in opengym
return 2
else:
#signifies low in opengym
return 3
def compute_gradient(gradient_log_p,hidden_layer_values,observation_values,weights):
delta_L = gradient_log_p
dC_dw2 = np.dot(hidden_layer_values.T, delta_L).ravel()
delta_l2 = np.outer(delta_L,weights['2'])
delta_l2 = relu(delta_l2)
dC_dw1 = np.dot(delta_l2.T,observation_values)
return{
'1': dC_dw1,
'2': dC_dw2
}
def discount_rewards(rewards, gamma):
discount_rewards = np.zeros_like(rewards)
running_add = 0
for t in reversed(range(0,rewards.size)):
if rewards[t] != 0:
running_add = 0
running_add =running_add * gamma + rewards[t]
discount_rewards[t] = running_add
return discount_rewards
def discount_with_rewards(gradient_log_p, episode_rewards, gamma):
#discount the gradient with the normalized rewards
discount_episode_rewards = discount_rewards(episode_rewards,gamma)
#standardize the rewards
discount_episode_rewards -= np.mean(discount_episode_rewards)
discount_episode_rewards /= np.std(discount_episode_rewards)
return gradient_log_p*discount_episode_rewards
def update_weights(weights,expectation_g_squared,g_dict,decay_rate,learning_rate):
epsilon = 1e-5
for layer_name in weights.keys():
g = g_dict[layer_name]
expectation_g_squared[layer_name] = decay_rate * expectation_g_squared[layer_name] + (1-decay_rate) * g**2
weights[layer_name] += (learning_rate * g)/(np.sqrt(expectation_g_squared[layer_name] + epsilon))
g_dict[layer_name] = np.zeros_like(weights[layer_name]) #reset batch gradient buffer
"""
Layer 1: Neural Network of input matrix 200x6400
Layer 2: 200x1 represents the output of the matrix
"""
weights = {
'1': np.random.randn(hidden_layer_neurons,input_dimensions)/np.sqrt(input_dimensions),
'2': np.random.randn(hidden_layer_neurons)/np.sqrt(hidden_layer_neurons)
}
#using RMS Propagation Algorithm
expectation_g_squared = {}
g_dict = {}
for layer_name in weights.keys():
expectation_g_squared[layer_name] = np.zeros_like(weights[layer_name])
g_dict[layer_name] = np.zeros_like(weights[layer_name])
episode_hidden_layer,episode_observation, episode_gradient_log_ps,episode_rewards = [],[],[],[]
while True:
env.render()
processed_observations, prev_processed_observations = preprocess_observation(observation,prev_processed_observations,input_dimensions)
"""
Now we have preprocessed the image and let's now send an observation to the neural networks
"""
hidden_layer, up_probability = apply_neural_nets(processed_observations,weights)
episode_observation.append(processed_observations)
episode_hidden_layer.append(up_probability)
#now choose the actions
action = choose_action(up_probability)
#carry out the chosen action
observation, reward, done, info = env.step(action)
reward_sum += reward
episode_rewards.append(reward)
"""
After one action we don't really have an idea of whether or not this was
the right action.
"""
fake_label = 1 if action == 2 else 0
loss_function_gradient = fake_label - up_probability
episode_gradient_log_ps.append(loss_function_gradient)
"""
Now By Policy Gradient we can figure how the model learned to play. If the model
won the round, play more game like this else try and generate less error.
"""
#an episode finished
if done:
episode_number += 1
#combine the following values for episode
episode_hidden_layer = np.vstack(episode_hidden_layer)
episode_observation = np.vstack(episode_observation)
episode_gradient_log_ps = np.vstack(episode_gradient_log_ps)
episode_rewards = np.vstack(episode_rewards)
# Tweak the gradient of the log_ps based on discount rewards
episode_gradient_log_ps_discounted = discount_with_rewards(episode_gradient_log_ps,episode_rewards,gamma)
gradient = compute_gradient(
episode_gradient_log_ps_discounted,
episode_hidden_layer,
episode_observation,
weights
)
#sum the gradient when we hit the batch size
for layer_name in gradient:
g_dict[layer_name] += gradient[layer_name]
if episode_number % batch_size == 0:
update_weights(weights,expectation_g_squared,g_dict,decay_rate,learning_rate)
episode_hidden_layer, episode_observation, episode_gradient_log_ps, episode_rewards = [],[],[],[] #reset the values
observation = env.reset()
running_reward = reward_sum if running_reward is None else running_reward * 0.99 + reward_sum * 0.01
print("Resetting Env. Episode reward total was {} running mean: {}".format(reward_sum,running_reward))
reward_sum = 0
prev_processed_observations = None
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment