Last active
March 22, 2018 06:33
-
-
Save hnakagawa/a31f70c2eabcfdac918876c98185353e to your computer and use it in GitHub Desktop.
DDPG
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import tensorflow as tf | |
import numpy as np | |
import tf_utils as tfu | |
LAYER1_SIZE = 400 | |
LAYER2_SIZE = 300 | |
LEARNING_RATE = 1e-4 | |
TAU = 0.001 | |
BATCH_SIZE = 64 | |
class Actor: | |
def __init__(self, session, state_dim, action_dim): | |
self.session = session | |
self.state_dim = state_dim | |
self.action_dim = action_dim | |
self.state_input,\ | |
self.action_output,\ | |
self.net = self.create_network(state_dim, action_dim) | |
self.target_state_input,\ | |
self.target_action_output,\ | |
self.target_update,\ | |
self.target_net = self.create_target_network(state_dim, action_dim, self.net) | |
self.create_training_method() | |
self.session.run(tf.global_variables_initializer()) | |
self.update_target() | |
def create_training_method(self): | |
self.q_gradient_input = tf.placeholder("float", [None, self.action_dim]) | |
self.parameters_gradients = tf.gradients(self.action_output, | |
self.net, -self.q_gradient_input) | |
self.optimizer = tf.train.AdamOptimizer(LEARNING_RATE).apply_gradients(zip(self.parameters_gradients, self.net)) | |
def create_network(self,state_dim,action_dim): | |
layer1_size = LAYER1_SIZE | |
layer2_size = LAYER2_SIZE | |
state_input = tf.placeholder("float", [None, state_dim]) | |
W1 = tfu.variable([state_dim,layer1_size], state_dim) | |
b1 = tfu.variable([layer1_size], state_dim) | |
W2 = tfu.variable([layer1_size, layer2_size], layer1_size) | |
b2 = tfu.variable([layer2_size], layer1_size) | |
W3 = tf.Variable(tf.random_uniform([layer2_size, action_dim], -3e-3, 3e-3)) | |
b3 = tf.Variable(tf.random_uniform([action_dim], -3e-3, 3e-3)) | |
layer1 = tf.nn.relu(tf.matmul(state_input, W1) + b1) | |
layer2 = tf.nn.relu(tf.matmul(layer1, W2) + b2) | |
action_output = tf.tanh(tf.matmul(layer2, W3) + b3) | |
return state_input, action_output, [W1, b1, W2, b2, W3, b3] | |
def create_target_network(self,state_dim, action_dim,net): | |
state_input = tf.placeholder("float", [None, state_dim]) | |
ema = tf.train.ExponentialMovingAverage(decay=1 - TAU) | |
target_update = ema.apply(net) | |
target_net = [ema.average(x) for x in net] | |
layer1 = tf.nn.relu(tf.matmul(state_input, target_net[0]) + target_net[1]) | |
layer2 = tf.nn.relu(tf.matmul(layer1, target_net[2]) + target_net[3]) | |
action_output = tf.tanh(tf.matmul(layer2, target_net[4]) + target_net[5]) | |
return state_input, action_output, target_update, target_net | |
def update_target(self): | |
self.session.run(self.target_update) | |
def train(self, q_gradient_batch, state_batch): | |
self.session.run(self.optimizer, feed_dict={ | |
self.q_gradient_input: q_gradient_batch, | |
self.state_input: state_batch}) | |
def actions(self, state_batch): | |
return self.session.run(self.action_output, feed_dict={ | |
self.state_input: state_batch}) | |
def action(self, state): | |
return self.session.run(self.action_output, feed_dict={ | |
self.state_input: [state]})[0] | |
def target_actions(self, state_batch): | |
return self.session.run(self.target_action_output, feed_dict={ | |
self.target_state_input: state_batch}) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import tensorflow as tf | |
import numpy as np | |
import tf_utils as tfu | |
LAYER1_SIZE = 400 | |
LAYER2_SIZE = 300 | |
LEARNING_RATE = 1e-3 | |
TAU = 0.001 | |
L2 = 0.01 | |
class Critic: | |
def __init__(self, session, state_dim, action_dim): | |
self.session = session | |
self.time_step = 0 | |
self.state_input,\ | |
self.action_input,\ | |
self.q_value_output,\ | |
self.net = self.create_q_network(state_dim, action_dim) | |
self.target_state_input,\ | |
self.target_action_input,\ | |
self.target_q_value_output,\ | |
self.target_update = self.create_target_q_network(state_dim,action_dim,self.net) | |
self.create_training_method() | |
self.session.run(tf.global_variables_initializer()) | |
self.update_target() | |
def create_training_method(self): | |
self.y_input = tf.placeholder("float",[None, 1]) | |
weight_decay = tf.add_n([L2 * tf.nn.l2_loss(var) for var in self.net]) | |
self.cost = tf.reduce_mean(tf.square(self.y_input - self.q_value_output)) + weight_decay | |
self.optimizer = tf.train.AdamOptimizer(LEARNING_RATE).minimize(self.cost) | |
self.action_gradients = tf.gradients(self.q_value_output, self.action_input) | |
def create_q_network(self, state_dim, action_dim): | |
layer1_size = LAYER1_SIZE | |
layer2_size = LAYER2_SIZE | |
state_input = tf.placeholder("float", [None, state_dim]) | |
action_input = tf.placeholder("float", [None, action_dim]) | |
W1 = tfu.variable([state_dim,layer1_size], state_dim) | |
b1 = tfu.variable([layer1_size], state_dim) | |
W2 = tfu.variable([layer1_size, layer2_size], layer1_size + action_dim) | |
W2_action = tfu.variable([action_dim, layer2_size], layer1_size + action_dim) | |
b2 = tfu.variable([layer2_size], layer1_size + action_dim) | |
W3 = tf.Variable(tf.random_uniform([layer2_size, 1], -3e-3, 3e-3)) | |
b3 = tf.Variable(tf.random_uniform([1], -3e-3, 3e-3)) | |
layer1 = tf.nn.relu(tf.matmul(state_input, W1) + b1) | |
layer2 = tf.nn.relu(tf.matmul(layer1, W2) + tf.matmul(action_input, W2_action) + b2) | |
q_value_output = tf.identity(tf.matmul(layer2, W3) + b3) | |
return state_input, action_input, q_value_output, [W1, b1, W2, W2_action, b2, W3, b3] | |
def create_target_q_network(self, state_dim, action_dim, net): | |
state_input = tf.placeholder("float", [None, state_dim]) | |
action_input = tf.placeholder("float", [None, action_dim]) | |
ema = tf.train.ExponentialMovingAverage(decay=1 - TAU) | |
target_update = ema.apply(net) | |
target_net = [ema.average(x) for x in net] | |
layer1 = tf.nn.relu(tf.matmul(state_input, target_net[0]) + target_net[1]) | |
layer2 = tf.nn.relu(tf.matmul(layer1, target_net[2]) + tf.matmul(action_input, target_net[3]) + target_net[4]) | |
q_value_output = tf.identity(tf.matmul(layer2, target_net[5]) + target_net[6]) | |
return state_input, action_input, q_value_output, target_update | |
def update_target(self): | |
self.session.run(self.target_update) | |
def train(self, y_batch, state_batch, action_batch): | |
self.time_step += 1 | |
self.session.run(self.optimizer, feed_dict={ | |
self.y_input: y_batch, | |
self.state_input: state_batch, | |
self.action_input: action_batch}) | |
def gradients(self, state_batch, action_batch): | |
return self.session.run(self.action_gradients, feed_dict={ | |
self.state_input: state_batch, | |
self.action_input: action_batch})[0] | |
def target_q(self,state_batch, action_batch): | |
return self.session.run(self.target_q_value_output, feed_dict={ | |
self.target_state_input: state_batch, | |
self.target_action_input: action_batch}) | |
def q_value(self,state_batch, action_batch): | |
return self.session.run(self.q_value_output, feed_dict={ | |
self.state_input: state_batch, | |
self.action_input: action_batch}) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import tensorflow as tf | |
import numpy as np | |
from ou_noise import OUNoise | |
from critic import Critic | |
from actor import Actor | |
from replay_buffer import ReplayBuffer | |
BATCH_SIZE = 64 | |
GAMMA = 0.99 | |
REPLAY_BUFFER_SIZE = 1000000 | |
REPLAY_START_SIZE = 10000 | |
class DDPG: | |
def __init__(self, session, state_dim, action_dim): | |
self.session = session | |
self.state_dim = state_dim | |
self.action_dim = action_dim | |
self.actor = Actor(self.session, self.state_dim, self.action_dim) | |
self.critic = Critic(self.session, self.state_dim, self.action_dim) | |
self.replay_buffer = ReplayBuffer(REPLAY_BUFFER_SIZE) | |
self.exploration_noise = OUNoise(self.action_dim) | |
def train(self): | |
minibatch = self.replay_buffer.get_batch(BATCH_SIZE) | |
state_batch = np.asarray([data[0] for data in minibatch]) | |
action_batch = np.asarray([data[1] for data in minibatch]) | |
reward_batch = np.asarray([data[2] for data in minibatch]) | |
next_state_batch = np.asarray([data[3] for data in minibatch]) | |
done_batch = np.asarray([data[4] for data in minibatch]) | |
action_batch = np.resize(action_batch, [BATCH_SIZE, self.action_dim]) | |
next_action_batch = self.actor.target_actions(next_state_batch) | |
q_value_batch = self.critic.target_q(next_state_batch, next_action_batch) | |
y_batch = [] | |
for i in range(len(minibatch)): | |
rw = reward_batch[i] if done_batch[i] \ | |
else reward_batch[i] + GAMMA * q_value_batch[i] | |
y_batch.append(rw) | |
y_batch = np.resize(y_batch, [BATCH_SIZE, 1]) | |
self.critic.train(y_batch, state_batch, action_batch) | |
action_batch_for_gradients = self.actor.actions(state_batch) | |
q_gradient_batch = self.critic.gradients(state_batch, action_batch_for_gradients) | |
self.actor.train(q_gradient_batch, state_batch) | |
self.actor.update_target() | |
self.critic.update_target() | |
def action(self, state, noice=False): | |
action = self.actor.action(state) | |
return action if not noice else action + self.exploration_noise.noise() | |
def perceive(self, state, action, reward, next_state, done): | |
self.replay_buffer.append(state, action, reward, next_state, done) | |
if self.replay_buffer.count > REPLAY_START_SIZE: | |
self.train() | |
if done: | |
self.exploration_noise.reset() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import gym | |
from gym import wrappers | |
ENV_NAME = 'Pendulum-v0' | |
class Environment: | |
@classmethod | |
def create(self): | |
base_env = wrappers.Monitor(env=gym.make(ENV_NAME), | |
directory='environment', force=True) | |
action_space = base_env.action_space | |
observation_space = base_env.observation_space | |
env_type = type(base_env) | |
class EnvironmentWrapper(env_type): | |
def __init__(self): | |
self.__dict__.update(base_env.__dict__) | |
# Observation space | |
if np.any(observation_space.high < 1e10): | |
high = observation_space.high | |
low = observation_space.low | |
self.ob_abs = (high + low) / 2. | |
self.ob_rel = (high - low) / 2. | |
else: | |
self.ob_abs = np.zeros_like(observation_space.high) | |
self.ob_rel = np.ones_like(observation_space.high) | |
# Action space | |
high = action_space.high | |
low = action_space.low | |
self.action_abs = (high + low) / 2. | |
self.action_rel = (high - low) / 2. | |
# Rewards | |
self.r_sc = 0.1 | |
self.r_c = 0. | |
self.observation_space = gym.spaces.Box(self.nomalized_observation(observation_space.low), | |
self.nomalized_observation(observation_space.high)) | |
self.action_space = gym.spaces.Box(-np.ones_like(action_space.high), | |
np.ones_like(action_space.high)) | |
def assertEqual(a,b): assert np.all(a == b), '{} != {}'.format(a,b) | |
assertEqual(self.nomalized_action(self.action_space.low), action_space.low) | |
assertEqual(self.nomalized_action(self.action_space.high), action_space.high) | |
def nomalized_observation(self, ob): | |
return (ob - self.ob_abs) / self.ob_rel | |
def nomalized_action(self, action): | |
return self.action_rel * action + self.action_abs | |
def nomalized_reward(self, reward): | |
return self.r_sc * reward + self.r_c | |
def step(self, action): | |
a = np.clip(self.nomalized_action(action), self.action_space.low, self.action_space.high) | |
ob, reward, term, info = env_type.step(self, a) | |
ob = self.nomalized_observation(ob) | |
return ob, reward, term, info | |
return EnvironmentWrapper() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import numpy.random as nr | |
# Ornstein-Uhlenbeck Noise | |
class OUNoise: | |
def __init__(self, action_dim, mu=0, theta=0.15, sigma=0.2): | |
self.action_dim = action_dim | |
self.mu = mu | |
self.theta = theta | |
self.sigma = sigma | |
self.state = np.ones(self.action_dim) * self.mu | |
self.reset() | |
def reset(self): | |
self.state = np.ones(self.action_dim) * self.mu | |
def noise(self): | |
x = self.state | |
dx = self.theta * (self.mu - x) + self.sigma * nr.randn(len(x)) | |
self.state = x + dx | |
return self.state |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import deque | |
import random | |
class ReplayBuffer: | |
def __init__(self, buffer_size): | |
self.buffer_size = buffer_size | |
self.buffer = deque() | |
self.count = 0 | |
def get_batch(self, batch_size): | |
return random.sample(self.buffer, batch_size) | |
def append(self, state, action, reward, new_state, done): | |
experience = (state, action, reward, new_state, done) | |
if self.count < self.buffer_size: | |
self.buffer.append(experience) | |
self.count += 1 | |
else: | |
self.buffer.popleft() | |
self.buffer.append(experience) | |
def clear(self): | |
self.buffer = deque() | |
self.count = 0 | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import argparse | |
import tensorflow as tf | |
from env import * | |
from ddpg import DDPG | |
NETWORK_SAVE_PATH = 'network/' | |
TEST_EPOCH = 10 | |
class Runner: | |
def run(self, test_only=False, render=False): | |
if not os.path.exists(NETWORK_SAVE_PATH): | |
os.makedirs(NETWORK_SAVE_PATH) | |
env = Environment.create() | |
session = tf.InteractiveSession() | |
agent = DDPG(session, env.observation_space.shape[0], env.action_space.shape[0]) | |
try: | |
session.run(tf.global_variables_initializer()) | |
num_steps = env.spec.tags.get('wrapper_config.TimeLimit.max_episode_steps') | |
for episode in range(1000000): | |
state = env.reset() | |
print('episode: {}'.format(episode)) | |
if (episode % 100 == 0 and episode > 100) or test_only == True: | |
self._test(episode, env, agent, num_steps, render) | |
else: | |
self._train(episode, env, agent, num_steps, render) | |
finally: | |
if session is not None: | |
session.close() | |
def _train(self, episode, env, agent, num_steps, render): | |
state = env.reset() | |
for step in range(num_steps): | |
action, next_state, reward, done, _ = self._step(env, agent, | |
state, True, render) | |
agent.perceive(state, action, reward, next_state, done) | |
state = next_state | |
if done: | |
break | |
def _test(self, episode, env, agent, num_steps, render): | |
state = env.reset() | |
total_reward = 0 | |
for c in range(TEST_EPOCH): | |
state = env.reset() | |
for i in range(num_steps): | |
action, state, reward, done, _ = self._step(env, agent, | |
state, False, render) | |
total_reward += reward | |
if done: | |
break | |
ave_reward = total_reward / TEST_EPOCH | |
print('episode: {}, Average Reward:{}'.format(episode, ave_reward)) | |
def _step(self, env, agent, state, train, render): | |
if render: | |
env.render() | |
action = agent.action(state, train) | |
arr = list(env.step(action)) | |
arr.insert(0, action) | |
return arr | |
def main(): | |
parser = argparse.ArgumentParser(description='I am DDPG') | |
parser.add_argument('--test-only', action='store_const', | |
const=True, default=False, help='test only or not') | |
parser.add_argument('--render', action='store_const', | |
const=True, default=True, help='render or not') | |
args = parser.parse_args() | |
Runner().run(args.test_only, args.render) | |
if __name__ == '__main__': | |
main() | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import math | |
import tensorflow as tf | |
def variable(shape, f): | |
return tf.Variable(tf.random_uniform(shape, -1 / math.sqrt(f), 1 / math.sqrt(f))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment