Created
May 5, 2019 03:47
-
-
Save pocokhc/0ddb7c28608523d3fa692c5b0c7d3bb0 to your computer and use it in GitHub Desktop.
kera-rlでDQN用のAgentを実装したコードです。1つ目が入力画像なしで2つ目が入力画像ありです。
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gym | |
import pickle | |
import os | |
import numpy as np | |
import random | |
import tensorflow as tf | |
from keras.optimizers import Adam | |
from keras.models import Model | |
from keras.layers import * | |
from keras import backend as K | |
import rl.core | |
import matplotlib.pyplot as plt | |
from PIL import Image, ImageDraw | |
class PendulumProcessorForDQN(rl.core.Processor): | |
def __init__(self, enable_image=False, reshape_size=(84, 84)): | |
self.shape = reshape_size | |
self.enable_image = enable_image | |
def process_observation(self, observation): | |
if not self.enable_image: | |
return observation | |
img = self._get_rgb_state(observation) | |
img = Image.fromarray(img) | |
img = img.resize(self.shape).convert('L') # resize and convert to grayscale | |
return np.array(img) / 255 | |
def process_action(self, action): | |
ACT_ID_TO_VALUE = { | |
0: [-2.0], | |
1: [-1.0], | |
2: [0.0], | |
3: [+1.0], | |
4: [+2.0], | |
} | |
return ACT_ID_TO_VALUE[action] | |
def process_reward(self, reward): | |
return np.clip(reward, -1., 1.) | |
# 状態(x,y座標)から対応画像を描画する関数 | |
def _get_rgb_state(self, state): | |
img_size = 84 | |
h_size = img_size/2.0 | |
img = Image.new("RGB", (img_size, img_size), (255, 255, 255)) | |
dr = ImageDraw.Draw(img) | |
# 棒の長さ | |
l = img_size/4.0 * 3.0/ 2.0 | |
# 棒のラインの描写 | |
dr.line(((h_size - l * state[1], h_size - l * state[0]), (h_size, h_size)), (0, 0, 0), 1) | |
# 棒の中心の円を描写(それっぽくしてみた) | |
buff = img_size/32.0 | |
dr.ellipse(((h_size - buff, h_size - buff), (h_size + buff, h_size + buff)), | |
outline=(0, 0, 0), fill=(255, 0, 0)) | |
# 画像の一次元化(GrayScale化)とarrayへの変換 | |
pilImg = img.convert("L") | |
img_arr = np.asarray(pilImg) | |
# 画像の規格化 | |
img_arr = img_arr/255.0 | |
return img_arr | |
class DQNAgent(rl.core.Agent): | |
def __init__(self, | |
input_shape, | |
enable_image_layer, | |
nb_actions, | |
window_length=4, # 入力フレーム数 | |
memory_capacity=1000000, # 確保するメモリーサイズ | |
nb_steps_warmup=50000, # 初期のメモリー確保用step数(学習しない) | |
target_model_update=500, # target networkのupdate間隔 | |
action_interval=4, # アクションを実行する間隔 | |
train_interval=4, # 学習間隔 | |
batch_size=32, # batch_size | |
gamma=0.99, # Q学習の割引率 | |
initial_epsilon=1.0, # ϵ-greedy法の初期値 | |
final_epsilon=0.1, # ϵ-greedy法の最終値 | |
exploration_steps=1000000, # ϵ-greedy法の減少step数 | |
**kwargs): | |
super(DQNAgent, self).__init__(**kwargs) | |
self.compiled = False | |
self.input_shape = input_shape | |
self.enable_image_layer = enable_image_layer | |
self.nb_actions = nb_actions | |
self.window_length = window_length | |
self.nb_steps_warmup = nb_steps_warmup | |
self.target_model_update = target_model_update | |
self.action_interval = action_interval | |
self.train_interval = train_interval | |
self.gamma = gamma | |
self.batch_size = batch_size | |
self.initial_epsilon = initial_epsilon | |
self.epsilon_step = (initial_epsilon - final_epsilon) / exploration_steps | |
self.final_epsilon = final_epsilon | |
self.memory = ReplayMemory(capacity=memory_capacity) | |
self.model = self.build_network() # Q network | |
self.target_model = self.build_network() # target network | |
assert memory_capacity > self.batch_size, "Memory capacity is small.(Larger than batch size)" | |
assert self.nb_steps_warmup > self.batch_size, "Warmup steps is few.(Larger than batch size)" | |
def reset_states(self): | |
self.recent_observations = [np.zeros(self.input_shape) for _ in range(self.window_length+1)] | |
self.recent_action = 0 | |
self.recent_reward = 0 | |
self.repeated_action = 0 | |
def compile(self, optimizer=None, metrics=[]): | |
# target networkは更新がないので optimizerとlossは何でもいい | |
self.target_model.compile(optimizer='sgd', loss='mse') | |
def clipped_error_loss(y_true, y_pred): | |
err = y_true - y_pred # エラー | |
L2 = 0.5 * K.square(err) | |
L1 = K.abs(err) - 0.5 | |
# エラーが[-1,1]区間ならL2、それ以外ならL1を選択する。 | |
loss = tf.where((K.abs(err) < 1.0), L2, L1) # Keras does not cover where function in tensorflow :-( | |
return K.mean(loss) | |
self.model.compile(loss=clipped_error_loss, optimizer=optimizer, metrics=metrics) | |
self.compiled = True | |
def load_weights(self, filepath): | |
self.model.load_weights(filepath) | |
def save_weights(self, filepath, overwrite=False): | |
self.model.save_weights(filepath, overwrite=overwrite) | |
def forward(self, observation): | |
# windowサイズ分observationを保存する | |
self.recent_observations.append(observation) # 最後に追加 | |
self.recent_observations.pop(0) # 先頭を削除 | |
# 学習(次の状態が欲しいのでforwardで学習) | |
self.forward_train() | |
# フレームスキップ(action_interval毎に行動を選択する) | |
action = self.repeated_action | |
if self.step % self.action_interval == 0: | |
if self.training: | |
# ϵ をstepで減少。 | |
epsilon = self.initial_epsilon - self.step*self.epsilon_step | |
if epsilon < self.final_epsilon: | |
epsilon = self.final_epsilon | |
# ϵ-greedy法 | |
if epsilon > np.random.uniform(0, 1): | |
# ランダム | |
action = np.random.randint(0, self.nb_actions) | |
else: | |
# 現在の状態を取得し、最大Q値から行動を取得。 | |
state0 = self.recent_observations[1:] | |
q_values = self.model.predict(np.asarray([state0]), batch_size=1)[0] | |
action = np.argmax(q_values) | |
else: | |
# 現在の状態を取得し、最大Q値から行動を取得。 | |
state0 = self.recent_observations[1:] | |
q_values = self.model.predict(np.asarray([state0]), batch_size=1)[0] | |
action = np.argmax(q_values) | |
self.repeated_action = action | |
self.recent_action = action | |
return action | |
# 長いので関数に | |
def forward_train(self): | |
if not self.training: | |
return | |
self.memory.add((self.recent_observations[:self.window_length], self.recent_action, self.recent_reward, self.recent_observations[1:])) | |
# ReplayMemory確保のため一定期間学習しない。 | |
if self.step <= self.nb_steps_warmup: | |
return | |
# 学習の更新間隔 | |
if self.step % self.train_interval != 0: | |
return | |
batchs = self.memory.sample(self.batch_size) | |
state0_batch = [] | |
action_batch = [] | |
reward_batch = [] | |
state1_batch = [] | |
for batch in batchs: | |
state0_batch.append(batch[0]) | |
action_batch.append(batch[1]) | |
reward_batch.append(batch[2]) | |
state1_batch.append(batch[3]) | |
# 更新用に現在のQネットワークを出力(Q network) | |
outputs = self.model.predict(np.asarray(state0_batch), self.batch_size) | |
# 次の状態のQ値を取得(target_network) | |
target_qvals = self.target_model.predict(np.asarray(state1_batch), self.batch_size) | |
# Q学習、Q(St,At)=Q(St,At)+α(r+γmax(St+1,At+1)-Q(St,At)) | |
for i in range(self.batch_size): | |
maxq = np.max(target_qvals[i]) | |
td_error = reward_batch[i] + self.gamma * maxq | |
outputs[i][action_batch[i]] = td_error | |
# 学習 | |
self.model.train_on_batch(np.asarray(state0_batch), np.asarray(outputs)) | |
def backward(self, reward, terminal): | |
# 一定間隔でtarget modelに重さをコピー | |
if self.step % self.target_model_update == 0: | |
self.target_model.set_weights(self.model.get_weights()) | |
self.recent_reward = reward | |
return [] | |
@property | |
def layers(self): | |
return self.model.layers[:] | |
# NNモデルの作成 | |
def build_network(self): | |
# 入力層(window_length, width, height) | |
c = input_ = Input(shape=(self.window_length,) + self.input_shape) | |
if self.enable_image_layer: | |
c = Permute((2, 3, 1))(c) # (window,w,h) -> (w,h,window) | |
c = Conv2D(32, (8, 8), strides=(4, 4), padding="same")(c) | |
c = Activation("relu")(c) | |
c = Conv2D(64, (4, 4), strides=(2, 2), padding="same")(c) | |
c = Activation("relu")(c) | |
c = Conv2D(64, (3, 3), strides=(1, 1), padding="same")(c) | |
c = Activation("relu")(c) | |
c = Flatten()(c) | |
c = Dense(512, activation="relu")(c) | |
c = Dense(self.nb_actions, activation="linear")(c) # 出力層 | |
return Model(input_, c) | |
class ReplayMemory(): | |
def __init__(self, capacity): | |
self.capacity= capacity | |
self.index = 0 | |
self.memory = [] | |
def add(self, experience): | |
if len(self.memory) < self.capacity: | |
self.memory.append(None) | |
self.memory[self.index] = experience | |
self.index = (self.index + 1) % self.capacity | |
def sample(self, batch_size): | |
return random.sample(self.memory, batch_size) | |
env = gym.make("Pendulum-v0") | |
nb_actions = 5 # PendulumProcessorで5個と定義しているので5 | |
processor = PendulumProcessorForDQN(enable_image=False) | |
# 引数が多いので辞書で定義して渡しています。 | |
args={ | |
"input_shape": env.observation_space.shape, | |
"enable_image_layer": False, | |
"nb_actions": nb_actions, | |
"window_length": 1, # 入力フレーム数 | |
"memory_capacity": 100000, # 確保するメモリーサイズ | |
"nb_steps_warmup": 200, # 初期のメモリー確保用step数(学習しない) | |
"target_model_update": 100, # target networkのupdate間隔 | |
"action_interval": 1, # アクションを実行する間隔 | |
"train_interval": 1, # 学習する間隔 | |
"batch_size": 64, # batch_size | |
"gamma": 0.99, # Q学習の割引率 | |
"initial_epsilon": 1.0, # ϵ-greedy法の初期値 | |
"final_epsilon": 0.1, # ϵ-greedy法の最終値 | |
"exploration_steps": 5000, # ϵ-greedy法の減少step数 | |
"processor": processor, | |
} | |
agent = DQNAgent(**args) | |
agent.compile(optimizer=Adam()) | |
# 訓練 | |
print("--- start ---") | |
print("'Ctrl + C' is stop.") | |
history = agent.fit(env, nb_steps=50_000, visualize=False, verbose=1) | |
# 結果を表示 | |
plt.subplot(2,1,1) | |
plt.plot(history.history["nb_episode_steps"]) | |
plt.ylabel("step") | |
plt.subplot(2,1,2) | |
plt.plot(history.history["episode_reward"]) | |
plt.xlabel("episode") | |
plt.ylabel("reward") | |
plt.show() | |
# 訓練結果を見る | |
agent.test(env, nb_episodes=5, visualize=True) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gym | |
import pickle | |
import os | |
import numpy as np | |
import random | |
import tensorflow as tf | |
from keras.optimizers import Adam | |
from keras.models import Model | |
from keras.layers import * | |
from keras import backend as K | |
import rl.core | |
import matplotlib.pyplot as plt | |
from PIL import Image, ImageDraw | |
class PendulumProcessorForDQN(rl.core.Processor): | |
def __init__(self, enable_image=False, reshape_size=(84, 84)): | |
self.shape = reshape_size | |
self.enable_image = enable_image | |
def process_observation(self, observation): | |
if not self.enable_image: | |
return observation | |
img = self._get_rgb_state(observation) | |
img = Image.fromarray(img) | |
img = img.resize(self.shape).convert('L') # resize and convert to grayscale | |
return np.array(img) / 255 | |
def process_action(self, action): | |
ACT_ID_TO_VALUE = { | |
0: [-2.0], | |
1: [-1.0], | |
2: [0.0], | |
3: [+1.0], | |
4: [+2.0], | |
} | |
return ACT_ID_TO_VALUE[action] | |
def process_reward(self, reward): | |
return np.clip(reward, -1., 1.) | |
# 状態(x,y座標)から対応画像を描画する関数 | |
def _get_rgb_state(self, state): | |
img_size = 84 | |
h_size = img_size/2.0 | |
img = Image.new("RGB", (img_size, img_size), (255, 255, 255)) | |
dr = ImageDraw.Draw(img) | |
# 棒の長さ | |
l = img_size/4.0 * 3.0/ 2.0 | |
# 棒のラインの描写 | |
dr.line(((h_size - l * state[1], h_size - l * state[0]), (h_size, h_size)), (0, 0, 0), 1) | |
# 棒の中心の円を描写(それっぽくしてみた) | |
buff = img_size/32.0 | |
dr.ellipse(((h_size - buff, h_size - buff), (h_size + buff, h_size + buff)), | |
outline=(0, 0, 0), fill=(255, 0, 0)) | |
# 画像の一次元化(GrayScale化)とarrayへの変換 | |
pilImg = img.convert("L") | |
img_arr = np.asarray(pilImg) | |
# 画像の規格化 | |
img_arr = img_arr/255.0 | |
return img_arr | |
class DQNAgent(rl.core.Agent): | |
def __init__(self, | |
input_shape, | |
enable_image_layer, | |
nb_actions, | |
window_length=4, # 入力フレーム数 | |
memory_capacity=1000000, # 確保するメモリーサイズ | |
nb_steps_warmup=50000, # 初期のメモリー確保用step数(学習しない) | |
target_model_update=500, # target networkのupdate間隔 | |
action_interval=4, # アクションを実行する間隔 | |
train_interval=4, # 学習間隔 | |
batch_size=32, # batch_size | |
gamma=0.99, # Q学習の割引率 | |
initial_epsilon=1.0, # ϵ-greedy法の初期値 | |
final_epsilon=0.1, # ϵ-greedy法の最終値 | |
exploration_steps=1000000, # ϵ-greedy法の減少step数 | |
**kwargs): | |
super(DQNAgent, self).__init__(**kwargs) | |
self.compiled = False | |
self.input_shape = input_shape | |
self.enable_image_layer = enable_image_layer | |
self.nb_actions = nb_actions | |
self.window_length = window_length | |
self.nb_steps_warmup = nb_steps_warmup | |
self.target_model_update = target_model_update | |
self.action_interval = action_interval | |
self.train_interval = train_interval | |
self.gamma = gamma | |
self.batch_size = batch_size | |
self.initial_epsilon = initial_epsilon | |
self.epsilon_step = (initial_epsilon - final_epsilon) / exploration_steps | |
self.final_epsilon = final_epsilon | |
self.memory = ReplayMemory(capacity=memory_capacity) | |
self.model = self.build_network() # Q network | |
self.target_model = self.build_network() # target network | |
assert memory_capacity > self.batch_size, "Memory capacity is small.(Larger than batch size)" | |
assert self.nb_steps_warmup > self.batch_size, "Warmup steps is few.(Larger than batch size)" | |
def reset_states(self): | |
self.recent_observations = [np.zeros(self.input_shape) for _ in range(self.window_length+1)] | |
self.recent_action = 0 | |
self.recent_reward = 0 | |
self.repeated_action = 0 | |
def compile(self, optimizer=None, metrics=[]): | |
# target networkは更新がないので optimizerとlossは何でもいい | |
self.target_model.compile(optimizer='sgd', loss='mse') | |
def clipped_error_loss(y_true, y_pred): | |
err = y_true - y_pred # エラー | |
L2 = 0.5 * K.square(err) | |
L1 = K.abs(err) - 0.5 | |
# エラーが[-1,1]区間ならL2、それ以外ならL1を選択する。 | |
loss = tf.where((K.abs(err) < 1.0), L2, L1) # Keras does not cover where function in tensorflow :-( | |
return K.mean(loss) | |
self.model.compile(loss=clipped_error_loss, optimizer=optimizer, metrics=metrics) | |
self.compiled = True | |
def load_weights(self, filepath): | |
self.model.load_weights(filepath) | |
def save_weights(self, filepath, overwrite=False): | |
self.model.save_weights(filepath, overwrite=overwrite) | |
def forward(self, observation): | |
# windowサイズ分observationを保存する | |
self.recent_observations.append(observation) # 最後に追加 | |
self.recent_observations.pop(0) # 先頭を削除 | |
# 学習(次の状態が欲しいのでforwardで学習) | |
self.forward_train() | |
# フレームスキップ(action_interval毎に行動を選択する) | |
action = self.repeated_action | |
if self.step % self.action_interval == 0: | |
if self.training: | |
# ϵ をstepで減少。 | |
epsilon = self.initial_epsilon - self.step*self.epsilon_step | |
if epsilon < self.final_epsilon: | |
epsilon = self.final_epsilon | |
# ϵ-greedy法 | |
if epsilon > np.random.uniform(0, 1): | |
# ランダム | |
action = np.random.randint(0, self.nb_actions) | |
else: | |
# 現在の状態を取得し、最大Q値から行動を取得。 | |
state0 = self.recent_observations[1:] | |
q_values = self.model.predict(np.asarray([state0]), batch_size=1)[0] | |
action = np.argmax(q_values) | |
else: | |
# 現在の状態を取得し、最大Q値から行動を取得。 | |
state0 = self.recent_observations[1:] | |
q_values = self.model.predict(np.asarray([state0]), batch_size=1)[0] | |
action = np.argmax(q_values) | |
self.repeated_action = action | |
self.recent_action = action | |
return action | |
# 長いので関数に | |
def forward_train(self): | |
if not self.training: | |
return | |
self.memory.add((self.recent_observations[:self.window_length], self.recent_action, self.recent_reward, self.recent_observations[1:])) | |
# ReplayMemory確保のため一定期間学習しない。 | |
if self.step <= self.nb_steps_warmup: | |
return | |
# 学習の更新間隔 | |
if self.step % self.train_interval != 0: | |
return | |
batchs = self.memory.sample(self.batch_size) | |
state0_batch = [] | |
action_batch = [] | |
reward_batch = [] | |
state1_batch = [] | |
for batch in batchs: | |
state0_batch.append(batch[0]) | |
action_batch.append(batch[1]) | |
reward_batch.append(batch[2]) | |
state1_batch.append(batch[3]) | |
# 更新用に現在のQネットワークを出力(Q network) | |
outputs = self.model.predict(np.asarray(state0_batch), self.batch_size) | |
# 次の状態のQ値を取得(target_network) | |
target_qvals = self.target_model.predict(np.asarray(state1_batch), self.batch_size) | |
# Q学習、Q(St,At)=Q(St,At)+α(r+γmax(St+1,At+1)-Q(St,At)) | |
for i in range(self.batch_size): | |
maxq = np.max(target_qvals[i]) | |
td_error = reward_batch[i] + self.gamma * maxq | |
outputs[i][action_batch[i]] = td_error | |
# 学習 | |
self.model.train_on_batch(np.asarray(state0_batch), np.asarray(outputs)) | |
def backward(self, reward, terminal): | |
# 一定間隔でtarget modelに重さをコピー | |
if self.step % self.target_model_update == 0: | |
self.target_model.set_weights(self.model.get_weights()) | |
self.recent_reward = reward | |
return [] | |
@property | |
def layers(self): | |
return self.model.layers[:] | |
# NNモデルの作成 | |
def build_network(self): | |
# 入力層(window_length, width, height) | |
c = input_ = Input(shape=(self.window_length,) + self.input_shape) | |
if self.enable_image_layer: | |
c = Permute((2, 3, 1))(c) # (window,w,h) -> (w,h,window) | |
c = Conv2D(32, (8, 8), strides=(4, 4), padding="same")(c) | |
c = Activation("relu")(c) | |
c = Conv2D(64, (4, 4), strides=(2, 2), padding="same")(c) | |
c = Activation("relu")(c) | |
c = Conv2D(64, (3, 3), strides=(1, 1), padding="same")(c) | |
c = Activation("relu")(c) | |
c = Flatten()(c) | |
c = Dense(512, activation="relu")(c) | |
c = Dense(self.nb_actions, activation="linear")(c) # 出力層 | |
return Model(input_, c) | |
class ReplayMemory(): | |
def __init__(self, capacity): | |
self.capacity= capacity | |
self.index = 0 | |
self.memory = [] | |
def add(self, experience): | |
if len(self.memory) < self.capacity: | |
self.memory.append(None) | |
self.memory[self.index] = experience | |
self.index = (self.index + 1) % self.capacity | |
def sample(self, batch_size): | |
return random.sample(self.memory, batch_size) | |
env = gym.make("Pendulum-v0") | |
nb_actions = 5 # PendulumProcessorで5個と定義しているので5 | |
processor = PendulumProcessorForDQN(enable_image=True, reshape_size=(84, 84)) | |
# 引数が多いので辞書で定義して渡しています。 | |
args={ | |
"input_shape": (84, 84), | |
"enable_image_layer": True, | |
"nb_actions": nb_actions, | |
"window_length": 3, # 入力フレーム数 | |
"memory_capacity": 100000, # 確保するメモリーサイズ | |
"nb_steps_warmup": 200, # 初期のメモリー確保用step数(学習しない) | |
"target_model_update": 100, # target networkのupdate間隔 | |
"action_interval": 1, # アクションを実行する間隔 | |
"train_interval": 1, # 学習する間隔 | |
"batch_size": 8, # batch_size | |
"gamma": 0.99, # Q学習の割引率 | |
"initial_epsilon": 1.0, # ϵ-greedy法の初期値 | |
"final_epsilon": 0.1, # ϵ-greedy法の最終値 | |
"exploration_steps": 50000, # ϵ-greedy法の減少step数 | |
"processor": processor, | |
} | |
agent = DQNAgent(**args) | |
agent.compile(optimizer=Adam()) | |
# 訓練 | |
print("--- start ---") | |
print("'Ctrl + C' is stop.") | |
history = agent.fit(env, nb_steps=100_000, visualize=False, verbose=1) | |
# 結果を表示 | |
plt.subplot(2,1,1) | |
plt.plot(history.history["nb_episode_steps"]) | |
plt.ylabel("step") | |
plt.subplot(2,1,2) | |
plt.plot(history.history["episode_reward"]) | |
plt.xlabel("episode") | |
plt.ylabel("reward") | |
plt.show() | |
# 訓練結果を見る | |
agent.test(env, nb_episodes=5, visualize=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment