Skip to content

Instantly share code, notes, and snippets.

@hnakagawa
Last active March 22, 2018 06:33
Show Gist options
  • Save hnakagawa/a31f70c2eabcfdac918876c98185353e to your computer and use it in GitHub Desktop.
Save hnakagawa/a31f70c2eabcfdac918876c98185353e to your computer and use it in GitHub Desktop.
DDPG
import tensorflow as tf
import numpy as np
import tf_utils as tfu
LAYER1_SIZE = 400
LAYER2_SIZE = 300
LEARNING_RATE = 1e-4
TAU = 0.001
BATCH_SIZE = 64
class Actor:
def __init__(self, session, state_dim, action_dim):
self.session = session
self.state_dim = state_dim
self.action_dim = action_dim
self.state_input,\
self.action_output,\
self.net = self.create_network(state_dim, action_dim)
self.target_state_input,\
self.target_action_output,\
self.target_update,\
self.target_net = self.create_target_network(state_dim, action_dim, self.net)
self.create_training_method()
self.session.run(tf.global_variables_initializer())
self.update_target()
def create_training_method(self):
self.q_gradient_input = tf.placeholder("float", [None, self.action_dim])
self.parameters_gradients = tf.gradients(self.action_output,
self.net, -self.q_gradient_input)
self.optimizer = tf.train.AdamOptimizer(LEARNING_RATE).apply_gradients(zip(self.parameters_gradients, self.net))
def create_network(self,state_dim,action_dim):
layer1_size = LAYER1_SIZE
layer2_size = LAYER2_SIZE
state_input = tf.placeholder("float", [None, state_dim])
W1 = tfu.variable([state_dim,layer1_size], state_dim)
b1 = tfu.variable([layer1_size], state_dim)
W2 = tfu.variable([layer1_size, layer2_size], layer1_size)
b2 = tfu.variable([layer2_size], layer1_size)
W3 = tf.Variable(tf.random_uniform([layer2_size, action_dim], -3e-3, 3e-3))
b3 = tf.Variable(tf.random_uniform([action_dim], -3e-3, 3e-3))
layer1 = tf.nn.relu(tf.matmul(state_input, W1) + b1)
layer2 = tf.nn.relu(tf.matmul(layer1, W2) + b2)
action_output = tf.tanh(tf.matmul(layer2, W3) + b3)
return state_input, action_output, [W1, b1, W2, b2, W3, b3]
def create_target_network(self,state_dim, action_dim,net):
state_input = tf.placeholder("float", [None, state_dim])
ema = tf.train.ExponentialMovingAverage(decay=1 - TAU)
target_update = ema.apply(net)
target_net = [ema.average(x) for x in net]
layer1 = tf.nn.relu(tf.matmul(state_input, target_net[0]) + target_net[1])
layer2 = tf.nn.relu(tf.matmul(layer1, target_net[2]) + target_net[3])
action_output = tf.tanh(tf.matmul(layer2, target_net[4]) + target_net[5])
return state_input, action_output, target_update, target_net
def update_target(self):
self.session.run(self.target_update)
def train(self, q_gradient_batch, state_batch):
self.session.run(self.optimizer, feed_dict={
self.q_gradient_input: q_gradient_batch,
self.state_input: state_batch})
def actions(self, state_batch):
return self.session.run(self.action_output, feed_dict={
self.state_input: state_batch})
def action(self, state):
return self.session.run(self.action_output, feed_dict={
self.state_input: [state]})[0]
def target_actions(self, state_batch):
return self.session.run(self.target_action_output, feed_dict={
self.target_state_input: state_batch})
import tensorflow as tf
import numpy as np
import tf_utils as tfu
LAYER1_SIZE = 400
LAYER2_SIZE = 300
LEARNING_RATE = 1e-3
TAU = 0.001
L2 = 0.01
class Critic:
def __init__(self, session, state_dim, action_dim):
self.session = session
self.time_step = 0
self.state_input,\
self.action_input,\
self.q_value_output,\
self.net = self.create_q_network(state_dim, action_dim)
self.target_state_input,\
self.target_action_input,\
self.target_q_value_output,\
self.target_update = self.create_target_q_network(state_dim,action_dim,self.net)
self.create_training_method()
self.session.run(tf.global_variables_initializer())
self.update_target()
def create_training_method(self):
self.y_input = tf.placeholder("float",[None, 1])
weight_decay = tf.add_n([L2 * tf.nn.l2_loss(var) for var in self.net])
self.cost = tf.reduce_mean(tf.square(self.y_input - self.q_value_output)) + weight_decay
self.optimizer = tf.train.AdamOptimizer(LEARNING_RATE).minimize(self.cost)
self.action_gradients = tf.gradients(self.q_value_output, self.action_input)
def create_q_network(self, state_dim, action_dim):
layer1_size = LAYER1_SIZE
layer2_size = LAYER2_SIZE
state_input = tf.placeholder("float", [None, state_dim])
action_input = tf.placeholder("float", [None, action_dim])
W1 = tfu.variable([state_dim,layer1_size], state_dim)
b1 = tfu.variable([layer1_size], state_dim)
W2 = tfu.variable([layer1_size, layer2_size], layer1_size + action_dim)
W2_action = tfu.variable([action_dim, layer2_size], layer1_size + action_dim)
b2 = tfu.variable([layer2_size], layer1_size + action_dim)
W3 = tf.Variable(tf.random_uniform([layer2_size, 1], -3e-3, 3e-3))
b3 = tf.Variable(tf.random_uniform([1], -3e-3, 3e-3))
layer1 = tf.nn.relu(tf.matmul(state_input, W1) + b1)
layer2 = tf.nn.relu(tf.matmul(layer1, W2) + tf.matmul(action_input, W2_action) + b2)
q_value_output = tf.identity(tf.matmul(layer2, W3) + b3)
return state_input, action_input, q_value_output, [W1, b1, W2, W2_action, b2, W3, b3]
def create_target_q_network(self, state_dim, action_dim, net):
state_input = tf.placeholder("float", [None, state_dim])
action_input = tf.placeholder("float", [None, action_dim])
ema = tf.train.ExponentialMovingAverage(decay=1 - TAU)
target_update = ema.apply(net)
target_net = [ema.average(x) for x in net]
layer1 = tf.nn.relu(tf.matmul(state_input, target_net[0]) + target_net[1])
layer2 = tf.nn.relu(tf.matmul(layer1, target_net[2]) + tf.matmul(action_input, target_net[3]) + target_net[4])
q_value_output = tf.identity(tf.matmul(layer2, target_net[5]) + target_net[6])
return state_input, action_input, q_value_output, target_update
def update_target(self):
self.session.run(self.target_update)
def train(self, y_batch, state_batch, action_batch):
self.time_step += 1
self.session.run(self.optimizer, feed_dict={
self.y_input: y_batch,
self.state_input: state_batch,
self.action_input: action_batch})
def gradients(self, state_batch, action_batch):
return self.session.run(self.action_gradients, feed_dict={
self.state_input: state_batch,
self.action_input: action_batch})[0]
def target_q(self,state_batch, action_batch):
return self.session.run(self.target_q_value_output, feed_dict={
self.target_state_input: state_batch,
self.target_action_input: action_batch})
def q_value(self,state_batch, action_batch):
return self.session.run(self.q_value_output, feed_dict={
self.state_input: state_batch,
self.action_input: action_batch})
import tensorflow as tf
import numpy as np
from ou_noise import OUNoise
from critic import Critic
from actor import Actor
from replay_buffer import ReplayBuffer
BATCH_SIZE = 64
GAMMA = 0.99
REPLAY_BUFFER_SIZE = 1000000
REPLAY_START_SIZE = 10000
class DDPG:
def __init__(self, session, state_dim, action_dim):
self.session = session
self.state_dim = state_dim
self.action_dim = action_dim
self.actor = Actor(self.session, self.state_dim, self.action_dim)
self.critic = Critic(self.session, self.state_dim, self.action_dim)
self.replay_buffer = ReplayBuffer(REPLAY_BUFFER_SIZE)
self.exploration_noise = OUNoise(self.action_dim)
def train(self):
minibatch = self.replay_buffer.get_batch(BATCH_SIZE)
state_batch = np.asarray([data[0] for data in minibatch])
action_batch = np.asarray([data[1] for data in minibatch])
reward_batch = np.asarray([data[2] for data in minibatch])
next_state_batch = np.asarray([data[3] for data in minibatch])
done_batch = np.asarray([data[4] for data in minibatch])
action_batch = np.resize(action_batch, [BATCH_SIZE, self.action_dim])
next_action_batch = self.actor.target_actions(next_state_batch)
q_value_batch = self.critic.target_q(next_state_batch, next_action_batch)
y_batch = []
for i in range(len(minibatch)):
rw = reward_batch[i] if done_batch[i] \
else reward_batch[i] + GAMMA * q_value_batch[i]
y_batch.append(rw)
y_batch = np.resize(y_batch, [BATCH_SIZE, 1])
self.critic.train(y_batch, state_batch, action_batch)
action_batch_for_gradients = self.actor.actions(state_batch)
q_gradient_batch = self.critic.gradients(state_batch, action_batch_for_gradients)
self.actor.train(q_gradient_batch, state_batch)
self.actor.update_target()
self.critic.update_target()
def action(self, state, noice=False):
action = self.actor.action(state)
return action if not noice else action + self.exploration_noise.noise()
def perceive(self, state, action, reward, next_state, done):
self.replay_buffer.append(state, action, reward, next_state, done)
if self.replay_buffer.count > REPLAY_START_SIZE:
self.train()
if done:
self.exploration_noise.reset()
import numpy as np
import gym
from gym import wrappers
ENV_NAME = 'Pendulum-v0'
class Environment:
@classmethod
def create(self):
base_env = wrappers.Monitor(env=gym.make(ENV_NAME),
directory='environment', force=True)
action_space = base_env.action_space
observation_space = base_env.observation_space
env_type = type(base_env)
class EnvironmentWrapper(env_type):
def __init__(self):
self.__dict__.update(base_env.__dict__)
# Observation space
if np.any(observation_space.high < 1e10):
high = observation_space.high
low = observation_space.low
self.ob_abs = (high + low) / 2.
self.ob_rel = (high - low) / 2.
else:
self.ob_abs = np.zeros_like(observation_space.high)
self.ob_rel = np.ones_like(observation_space.high)
# Action space
high = action_space.high
low = action_space.low
self.action_abs = (high + low) / 2.
self.action_rel = (high - low) / 2.
# Rewards
self.r_sc = 0.1
self.r_c = 0.
self.observation_space = gym.spaces.Box(self.nomalized_observation(observation_space.low),
self.nomalized_observation(observation_space.high))
self.action_space = gym.spaces.Box(-np.ones_like(action_space.high),
np.ones_like(action_space.high))
def assertEqual(a,b): assert np.all(a == b), '{} != {}'.format(a,b)
assertEqual(self.nomalized_action(self.action_space.low), action_space.low)
assertEqual(self.nomalized_action(self.action_space.high), action_space.high)
def nomalized_observation(self, ob):
return (ob - self.ob_abs) / self.ob_rel
def nomalized_action(self, action):
return self.action_rel * action + self.action_abs
def nomalized_reward(self, reward):
return self.r_sc * reward + self.r_c
def step(self, action):
a = np.clip(self.nomalized_action(action), self.action_space.low, self.action_space.high)
ob, reward, term, info = env_type.step(self, a)
ob = self.nomalized_observation(ob)
return ob, reward, term, info
return EnvironmentWrapper()
import numpy as np
import numpy.random as nr
# Ornstein-Uhlenbeck Noise
class OUNoise:
def __init__(self, action_dim, mu=0, theta=0.15, sigma=0.2):
self.action_dim = action_dim
self.mu = mu
self.theta = theta
self.sigma = sigma
self.state = np.ones(self.action_dim) * self.mu
self.reset()
def reset(self):
self.state = np.ones(self.action_dim) * self.mu
def noise(self):
x = self.state
dx = self.theta * (self.mu - x) + self.sigma * nr.randn(len(x))
self.state = x + dx
return self.state
from collections import deque
import random
class ReplayBuffer:
def __init__(self, buffer_size):
self.buffer_size = buffer_size
self.buffer = deque()
self.count = 0
def get_batch(self, batch_size):
return random.sample(self.buffer, batch_size)
def append(self, state, action, reward, new_state, done):
experience = (state, action, reward, new_state, done)
if self.count < self.buffer_size:
self.buffer.append(experience)
self.count += 1
else:
self.buffer.popleft()
self.buffer.append(experience)
def clear(self):
self.buffer = deque()
self.count = 0
import os
import argparse
import tensorflow as tf
from env import *
from ddpg import DDPG
NETWORK_SAVE_PATH = 'network/'
TEST_EPOCH = 10
class Runner:
def run(self, test_only=False, render=False):
if not os.path.exists(NETWORK_SAVE_PATH):
os.makedirs(NETWORK_SAVE_PATH)
env = Environment.create()
session = tf.InteractiveSession()
agent = DDPG(session, env.observation_space.shape[0], env.action_space.shape[0])
try:
session.run(tf.global_variables_initializer())
num_steps = env.spec.tags.get('wrapper_config.TimeLimit.max_episode_steps')
for episode in range(1000000):
state = env.reset()
print('episode: {}'.format(episode))
if (episode % 100 == 0 and episode > 100) or test_only == True:
self._test(episode, env, agent, num_steps, render)
else:
self._train(episode, env, agent, num_steps, render)
finally:
if session is not None:
session.close()
def _train(self, episode, env, agent, num_steps, render):
state = env.reset()
for step in range(num_steps):
action, next_state, reward, done, _ = self._step(env, agent,
state, True, render)
agent.perceive(state, action, reward, next_state, done)
state = next_state
if done:
break
def _test(self, episode, env, agent, num_steps, render):
state = env.reset()
total_reward = 0
for c in range(TEST_EPOCH):
state = env.reset()
for i in range(num_steps):
action, state, reward, done, _ = self._step(env, agent,
state, False, render)
total_reward += reward
if done:
break
ave_reward = total_reward / TEST_EPOCH
print('episode: {}, Average Reward:{}'.format(episode, ave_reward))
def _step(self, env, agent, state, train, render):
if render:
env.render()
action = agent.action(state, train)
arr = list(env.step(action))
arr.insert(0, action)
return arr
def main():
parser = argparse.ArgumentParser(description='I am DDPG')
parser.add_argument('--test-only', action='store_const',
const=True, default=False, help='test only or not')
parser.add_argument('--render', action='store_const',
const=True, default=True, help='render or not')
args = parser.parse_args()
Runner().run(args.test_only, args.render)
if __name__ == '__main__':
main()
import math
import tensorflow as tf
def variable(shape, f):
return tf.Variable(tf.random_uniform(shape, -1 / math.sqrt(f), 1 / math.sqrt(f)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment