Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
DQN cartpole with numpy only
import gym
import numpy as np
from collections import deque
from gym import wrappers
# Create the Cart-Pole game environment
env = gym.make('CartPole-v0')
env = wrappers.Monitor(env, '/tmp/cartpole-experiment-1')
def relu(x):
return np.maximum(0, x)
def relu_grad(dout):
mask = (dout <= 0)
dout[mask] = 0
dx = dout
return dx
def softmax(x):
if x.ndim == 2:
x = x.T
x = x - np.max(x, axis=0)
y = np.exp(x) / np.sum(np.exp(x), axis=0)
return y.T
x = x - np.max(x) # オーバーフロー対策
return np.exp(x) / np.sum(np.exp(x))
def mean_squared_error(y, t):
return 0.5 * np.sum((y-t)**2)
class SGD:
"""確率的勾配降下法(Stochastic Gradient Descent)"""
def __init__(self, lr=0.01):
self.lr = lr
def update(self, params, grads):
for key in params.keys():
params[key] -= self.lr * grads[key]
class Adam:
"""Adam (http://arxiv.org/abs/1412.6980v8)"""
def __init__(self, lr=0.001, beta1=0.9, beta2=0.999):
self.lr = lr
self.beta1 = beta1
self.beta2 = beta2
self.iter = 0
self.m = None
self.v = None
def update(self, params, grads):
if self.m is None:
self.m, self.v = {}, {}
for key, val in params.items():
self.m[key] = np.zeros_like(val)
self.v[key] = np.zeros_like(val)
self.iter += 1
lr_t = self.lr * np.sqrt(1.0 - self.beta2 ** self.iter) / (1.0 - self.beta1 ** self.iter)
for key in params.keys():
self.m[key] += (1 - self.beta1) * (grads[key] - self.m[key])
self.v[key] += (1 - self.beta2) * (grads[key] ** 2 - self.v[key])
params[key] -= lr_t * self.m[key] / (np.sqrt(self.v[key]) + 1e-7)
class QNetwork:
def __init__(self, input_size, hidden_size, output_size, weight_init_std=0.01):
# 重みの初期化
self.params = {}
self.params['W1'] = weight_init_std * np.random.randn(input_size, hidden_size)
#self.params['b1'] = np.zeros(hidden_size)
self.params['W2'] = weight_init_std * np.random.randn(hidden_size, output_size)
#self.params['b2'] = np.zeros(output_size)
def predict(self, x):
W1, W2 = self.params['W1'], self.params['W2']
#b1, b2 = self.params['b1'], self.params['b2']
a1 = np.dot(x, W1)# + b1
z1 = relu(a1)
a2 = np.dot(z1, W2)# + b2
y = softmax(a2)
return y
def fit(self, x, t):
W1, W2 = self.params['W1'], self.params['W2']
#b1, b2 = self.params['b1'], self.params['b2']
grads = {}
batch_num = x.shape[0]
# forward
a1 = np.dot(x, W1)# + b1
z1 = relu(a1)
a2 = np.dot(z1, W2)# + b2
y = softmax(a2)
# backward
dy = (y - t) / batch_num
grads['W2'] = np.dot(z1.T, dy)
#grads['b2'] = np.sum(dy, axis=0)
da1 = np.dot(dy, W2.T)
dz1 = relu_grad(a1) * da1
grads['W1'] = np.dot(x.T, dz1)
#grads['b1'] = np.sum(dz1, axis=0)
# 更新
opt = Adam(lr=learning_rate)
opt.update(self.params, grads)
def loss(self, x, t):
y = self.predict(x)
return mean_squared_error(y, t)
class Memory():
def __init__(self, max_size=1000):
self.buffer = deque(maxlen=max_size)
def add(self, experience):
self.buffer.append(experience)
def sample(self, batch_size):
idx = np.random.choice(np.arange(len(self.buffer)),
size=batch_size,
replace=False)
return [self.buffer[ii] for ii in idx]
train_episodes = 1000 # max number of episodes to learn from
max_steps = 200 # max steps in an episode
gamma = 0.99 # future reward discount
# Exploration parameters
explore_start = 1.0 # exploration probability at start
explore_stop = 0.01 # minimum exploration probability
decay_rate = 0.0001 # exponential decay rate for exploration prob
# Network parameters
hidden_size = 16 # number of units in each Q-network hidden layer
learning_rate = 1e-4 # Q-network learning rate
# Memory parameters
memory_size = 10000 # memory capacity
batch_size = 32 # experience mini-batch size
pretrain_length = batch_size # number experiences to pretrain the memory
mainQN = QNetwork(input_size=4, hidden_size=hidden_size, output_size=2, weight_init_std=1.0)
###################################
## Populate the experience memory
###################################
# Initialize the simulation
env.reset()
# Take one random step to get the pole and cart moving
state, reward, done, _ = env.step(env.action_space.sample())
state = np.reshape(state, [1, 4])
memory = Memory(max_size=memory_size)
# Make a bunch of random actions and store the experiences
for ii in range(pretrain_length):
# Uncomment the line below to watch the simulation
# env.render()
# Make a random action
action = env.action_space.sample()
next_state, reward, done, _ = env.step(action)
next_state = np.reshape(next_state, [1, 4])
if done:
# The simulation fails so no next state
next_state = np.zeros(state.shape)
# Add experience to memory
memory.add((state, action, reward, next_state))
# Start new episode
env.reset()
# Take one random step to get the pole and cart moving
state, reward, done, _ = env.step(env.action_space.sample())
state = np.reshape(state, [1, 4])
else:
# Add experience to memory
memory.add((state, action, reward, next_state))
state = next_state
#############
## Training
#############
step = 0
for ep in range(1, train_episodes):
total_reward = 0
t = 0
while t < max_steps:
# Replay
inputs = np.zeros((batch_size, 4))
targets = np.zeros((batch_size, 2))
minibatch = memory.sample(batch_size)
for i, (state_b, action_b, reward_b, next_state_b) in enumerate(minibatch):
inputs[i:i+1] = state_b
target = reward_b
if not (next_state_b == np.zeros(state_b.shape)).all(axis=1):
target_Q = mainQN.predict(next_state_b)[0]
target = reward_b + gamma * np.amax(mainQN.predict(next_state_b)[0])
targets[i] = mainQN.predict(state_b)
targets[i][action_b] = target
mainQN.fit(inputs, targets)
step += 1
# Explore or Exploit
explore_p = explore_stop + (explore_start - explore_stop)*np.exp(-decay_rate*step)
if explore_p > np.random.rand():
# Make a random action
action = env.action_space.sample()
else:
# Get action from Q-network
Qs = mainQN.predict(state)[0]
action = np.argmax(Qs)
# Take action, get new state and reward
next_state, reward, done, _ = env.step(action)
next_state = np.reshape(next_state, [1, 4])
total_reward += reward
if done:
# the episode ends so no next state
next_state = np.zeros(state.shape)
t = max_steps
print('Episode: {}'.format(ep),
'Total reward: {}'.format(total_reward),
'Loss {:.6f}'.format(mainQN.loss(inputs, targets)),
'Explore P: {:.4f}'.format(explore_p))
# Add experience to memory
memory.add((state, action, reward, next_state))
# Start new episode
env.reset()
# Take one random step to get the pole and cart moving
state, reward, done, _ = env.step(env.action_space.sample())
state = np.reshape(state, [1, 4])
else:
# Add experience to memory
memory.add((state, action, reward, next_state))
state = next_state
t += 1
@tsu-nera
Copy link
Author

tsu-nera commented Jul 9, 2017

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment