Skip to content

Instantly share code, notes, and snippets.

@tsu-nera
Created July 9, 2017 08:42
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save tsu-nera/6250f01a8f1a5ae37bdda84ebb424e5f to your computer and use it in GitHub Desktop.
DQN cartpole with numpy only
import gym
import numpy as np
from collections import deque
from gym import wrappers
# Create the Cart-Pole game environment
env = gym.make('CartPole-v0')
env = wrappers.Monitor(env, '/tmp/cartpole-experiment-1')
def relu(x):
return np.maximum(0, x)
def relu_grad(dout):
mask = (dout <= 0)
dout[mask] = 0
dx = dout
return dx
def softmax(x):
if x.ndim == 2:
x = x.T
x = x - np.max(x, axis=0)
y = np.exp(x) / np.sum(np.exp(x), axis=0)
return y.T
x = x - np.max(x) # オーバーフロー対策
return np.exp(x) / np.sum(np.exp(x))
def mean_squared_error(y, t):
return 0.5 * np.sum((y-t)**2)
class SGD:
"""確率的勾配降下法(Stochastic Gradient Descent)"""
def __init__(self, lr=0.01):
self.lr = lr
def update(self, params, grads):
for key in params.keys():
params[key] -= self.lr * grads[key]
class Adam:
"""Adam (http://arxiv.org/abs/1412.6980v8)"""
def __init__(self, lr=0.001, beta1=0.9, beta2=0.999):
self.lr = lr
self.beta1 = beta1
self.beta2 = beta2
self.iter = 0
self.m = None
self.v = None
def update(self, params, grads):
if self.m is None:
self.m, self.v = {}, {}
for key, val in params.items():
self.m[key] = np.zeros_like(val)
self.v[key] = np.zeros_like(val)
self.iter += 1
lr_t = self.lr * np.sqrt(1.0 - self.beta2 ** self.iter) / (1.0 - self.beta1 ** self.iter)
for key in params.keys():
self.m[key] += (1 - self.beta1) * (grads[key] - self.m[key])
self.v[key] += (1 - self.beta2) * (grads[key] ** 2 - self.v[key])
params[key] -= lr_t * self.m[key] / (np.sqrt(self.v[key]) + 1e-7)
class QNetwork:
def __init__(self, input_size, hidden_size, output_size, weight_init_std=0.01):
# 重みの初期化
self.params = {}
self.params['W1'] = weight_init_std * np.random.randn(input_size, hidden_size)
#self.params['b1'] = np.zeros(hidden_size)
self.params['W2'] = weight_init_std * np.random.randn(hidden_size, output_size)
#self.params['b2'] = np.zeros(output_size)
def predict(self, x):
W1, W2 = self.params['W1'], self.params['W2']
#b1, b2 = self.params['b1'], self.params['b2']
a1 = np.dot(x, W1)# + b1
z1 = relu(a1)
a2 = np.dot(z1, W2)# + b2
y = softmax(a2)
return y
def fit(self, x, t):
W1, W2 = self.params['W1'], self.params['W2']
#b1, b2 = self.params['b1'], self.params['b2']
grads = {}
batch_num = x.shape[0]
# forward
a1 = np.dot(x, W1)# + b1
z1 = relu(a1)
a2 = np.dot(z1, W2)# + b2
y = softmax(a2)
# backward
dy = (y - t) / batch_num
grads['W2'] = np.dot(z1.T, dy)
#grads['b2'] = np.sum(dy, axis=0)
da1 = np.dot(dy, W2.T)
dz1 = relu_grad(a1) * da1
grads['W1'] = np.dot(x.T, dz1)
#grads['b1'] = np.sum(dz1, axis=0)
# 更新
opt = Adam(lr=learning_rate)
opt.update(self.params, grads)
def loss(self, x, t):
y = self.predict(x)
return mean_squared_error(y, t)
class Memory():
def __init__(self, max_size=1000):
self.buffer = deque(maxlen=max_size)
def add(self, experience):
self.buffer.append(experience)
def sample(self, batch_size):
idx = np.random.choice(np.arange(len(self.buffer)),
size=batch_size,
replace=False)
return [self.buffer[ii] for ii in idx]
train_episodes = 1000 # max number of episodes to learn from
max_steps = 200 # max steps in an episode
gamma = 0.99 # future reward discount
# Exploration parameters
explore_start = 1.0 # exploration probability at start
explore_stop = 0.01 # minimum exploration probability
decay_rate = 0.0001 # exponential decay rate for exploration prob
# Network parameters
hidden_size = 16 # number of units in each Q-network hidden layer
learning_rate = 1e-4 # Q-network learning rate
# Memory parameters
memory_size = 10000 # memory capacity
batch_size = 32 # experience mini-batch size
pretrain_length = batch_size # number experiences to pretrain the memory
mainQN = QNetwork(input_size=4, hidden_size=hidden_size, output_size=2, weight_init_std=1.0)
###################################
## Populate the experience memory
###################################
# Initialize the simulation
env.reset()
# Take one random step to get the pole and cart moving
state, reward, done, _ = env.step(env.action_space.sample())
state = np.reshape(state, [1, 4])
memory = Memory(max_size=memory_size)
# Make a bunch of random actions and store the experiences
for ii in range(pretrain_length):
# Uncomment the line below to watch the simulation
# env.render()
# Make a random action
action = env.action_space.sample()
next_state, reward, done, _ = env.step(action)
next_state = np.reshape(next_state, [1, 4])
if done:
# The simulation fails so no next state
next_state = np.zeros(state.shape)
# Add experience to memory
memory.add((state, action, reward, next_state))
# Start new episode
env.reset()
# Take one random step to get the pole and cart moving
state, reward, done, _ = env.step(env.action_space.sample())
state = np.reshape(state, [1, 4])
else:
# Add experience to memory
memory.add((state, action, reward, next_state))
state = next_state
#############
## Training
#############
step = 0
for ep in range(1, train_episodes):
total_reward = 0
t = 0
while t < max_steps:
# Replay
inputs = np.zeros((batch_size, 4))
targets = np.zeros((batch_size, 2))
minibatch = memory.sample(batch_size)
for i, (state_b, action_b, reward_b, next_state_b) in enumerate(minibatch):
inputs[i:i+1] = state_b
target = reward_b
if not (next_state_b == np.zeros(state_b.shape)).all(axis=1):
target_Q = mainQN.predict(next_state_b)[0]
target = reward_b + gamma * np.amax(mainQN.predict(next_state_b)[0])
targets[i] = mainQN.predict(state_b)
targets[i][action_b] = target
mainQN.fit(inputs, targets)
step += 1
# Explore or Exploit
explore_p = explore_stop + (explore_start - explore_stop)*np.exp(-decay_rate*step)
if explore_p > np.random.rand():
# Make a random action
action = env.action_space.sample()
else:
# Get action from Q-network
Qs = mainQN.predict(state)[0]
action = np.argmax(Qs)
# Take action, get new state and reward
next_state, reward, done, _ = env.step(action)
next_state = np.reshape(next_state, [1, 4])
total_reward += reward
if done:
# the episode ends so no next state
next_state = np.zeros(state.shape)
t = max_steps
print('Episode: {}'.format(ep),
'Total reward: {}'.format(total_reward),
'Loss {:.6f}'.format(mainQN.loss(inputs, targets)),
'Explore P: {:.4f}'.format(explore_p))
# Add experience to memory
memory.add((state, action, reward, next_state))
# Start new episode
env.reset()
# Take one random step to get the pole and cart moving
state, reward, done, _ = env.step(env.action_space.sample())
state = np.reshape(state, [1, 4])
else:
# Add experience to memory
memory.add((state, action, reward, next_state))
state = next_state
t += 1
@tsu-nera
Copy link
Author

tsu-nera commented Jul 9, 2017

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment