Skip to content

Instantly share code, notes, and snippets.

@pocokhc
Last active March 1, 2020 06:42
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pocokhc/3e6cfec2de933f8320be1885e6fa6f00 to your computer and use it in GitHub Desktop.
Save pocokhc/3e6cfec2de933f8320be1885e6fa6f00 to your computer and use it in GitHub Desktop.
kera-rlでRainbow用のAgentを実装したコードです。
import gym
import pickle
import os
import numpy as np
import random
import tensorflow as tf
from keras.optimizers import Adam
from keras.models import Model
from keras.layers import *
from keras import backend as K
import rl.core
import matplotlib.pyplot as plt
from PIL import Image, ImageDraw
class PendulumProcessorForDQN(rl.core.Processor):
def __init__(self, enable_image=False, image_size=84):
self.image_size = image_size
self.enable_image = enable_image
self.mode = "train"
def process_observation(self, observation):
if not self.enable_image:
return observation
return self._get_rgb_state(observation) # reshazeせずに返す
def process_action(self, action):
ACT_ID_TO_VALUE = {
0: [-2.0],
1: [-1.0],
2: [0.0],
3: [+1.0],
4: [+2.0],
}
return ACT_ID_TO_VALUE[action]
def process_reward(self, reward):
if self.mode == "test": # testは本当の値を返す
return reward
# return np.clip(reward, -1., 1.)
# -16.5~0 を -1~1 に正規化
self.max = 0
self.min = -16.5
# min max normarization
if (self.max - self.min) == 0:
return 0
M = 1
m = -0.5
return ((reward - self.min) / (self.max - self.min))*(M - m) + m
# 状態(x,y座標)から対応画像を描画する関数
def _get_rgb_state(self, state):
img_size = self.image_size
h_size = img_size/2.0
img = Image.new("RGB", (img_size, img_size), (255, 255, 255))
dr = ImageDraw.Draw(img)
# 棒の長さ
l = img_size/4.0 * 3.0/ 2.0
# 棒のラインの描写
dr.line(((h_size - l * state[1], h_size - l * state[0]), (h_size, h_size)), (0, 0, 0), 1)
# 棒の中心の円を描写(それっぽくしてみた)
buff = img_size/32.0
dr.ellipse(((h_size - buff, h_size - buff), (h_size + buff, h_size + buff)),
outline=(0, 0, 0), fill=(255, 0, 0))
# 画像の一次元化(GrayScale化)とarrayへの変換
pilImg = img.convert("L")
img_arr = np.asarray(pilImg)
# 画像の規格化
img_arr = img_arr/255.0
return img_arr
class RainbowAgent(rl.core.Agent):
def __init__(self,
input_shape,
enable_image_layer,
nb_actions,
window_length=4, # 入力フレーム数
memory_type="replay", # 使用するメモリ
memory_capacity=1000000, # 確保するメモリーサイズ
per_alpha=0.6, # PERの確率反映率
per_beta_initial=0.4, # IS反映率の初期値
per_beta_steps=1000000, # IS反映率の上昇step数
per_enable_is=False, # ISを有効にするかどうか
nb_steps_warmup=50000, # 初期のメモリー確保用step数(学習しない)
target_model_update=500, # target networkのupdate間隔
action_interval=4, # アクションを実行する間隔
train_interval=4, # 学習間隔
batch_size=32, # batch_size
gamma=0.99, # Q学習の割引率
initial_epsilon=1.0, # ϵ-greedy法の初期値
final_epsilon=0.1, # ϵ-greedy法の最終値
exploration_steps=1000000, # ϵ-greedy法の減少step数
multireward_steps=3, # multistep reward
dence_units_num=512, # Dence層のユニット数
enable_double_dqn=False,
enable_dueling_network=False,
enable_noisynet=False,
**kwargs):
super(RainbowAgent, self).__init__(**kwargs)
self.compiled = False
self.input_shape = input_shape
self.enable_image_layer = enable_image_layer
self.nb_actions = nb_actions
self.window_length = window_length
self.nb_steps_warmup = nb_steps_warmup
self.target_model_update = target_model_update
self.action_interval = action_interval
self.train_interval = train_interval
self.gamma = gamma
self.batch_size = batch_size
self.multireward_steps = multireward_steps
self.dence_units_num = dence_units_num
self.initial_epsilon = initial_epsilon
self.epsilon_step = (initial_epsilon - final_epsilon) / exploration_steps
self.final_epsilon = final_epsilon
if memory_type == "replay":
self.memory = ReplayMemory(memory_capacity)
elif memory_type == "per_greedy":
self.memory = PERGreedyMemory(memory_capacity)
elif memory_type == "per_proportional":
self.memory = PERProportionalMemory(memory_capacity, per_alpha, per_beta_initial, per_beta_steps, per_enable_is)
elif memory_type == "per_rankbase":
self.memory = PERRankBaseMemory(memory_capacity, per_alpha, per_beta_initial, per_beta_steps, per_enable_is)
else:
raise ValueError('memory_type is ["replay","per_proportional","per_rankbase"]')
self.enable_double_dqn = enable_double_dqn
self.enable_dueling_network = enable_dueling_network
self.enable_noisynet = enable_noisynet
self.model = self.build_network() # Q network
self.target_model = self.build_network() # target network
assert memory_capacity > self.batch_size, "Memory capacity is small.(Larger than batch size)"
assert self.nb_steps_warmup > self.batch_size, "Warmup steps is few.(Larger than batch size)"
def reset_states(self):
self.recent_action = 0
self.repeated_action = 0
self.recent_reward = [0 for _ in range(self.multireward_steps)]
self.recent_observations = [np.zeros(self.input_shape) for _ in range(self.window_length+self.multireward_steps)]
def compile(self, optimizer=None, metrics=[]):
# target networkは更新がないので optimizerとlossは何でもいい
self.target_model.compile(optimizer='sgd', loss='mse')
def clipped_error_loss(y_true, y_pred):
err = y_true - y_pred # エラー
L2 = 0.5 * K.square(err)
L1 = K.abs(err) - 0.5
# エラーが[-1,1]区間ならL2、それ以外ならL1を選択する。
loss = tf.where((K.abs(err) < 1.0), L2, L1) # Keras does not cover where function in tensorflow :-(
return K.mean(loss)
self.model.compile(loss=clipped_error_loss, optimizer=optimizer, metrics=metrics)
self.compiled = True
def load_weights(self, filepath):
self.model.load_weights(filepath)
def save_weights(self, filepath, overwrite=False):
self.model.save_weights(filepath, overwrite=overwrite)
def forward(self, observation):
# windowサイズ分observationを保存する
self.recent_observations.append(observation) # 最後に追加
self.recent_observations.pop(0) # 先頭を削除
# 学習(次の状態が欲しいのでforwardで学習)
self.forward_train()
# フレームスキップ(action_interval毎に行動を選択する)
action = self.repeated_action
if self.step % self.action_interval == 0:
# noisy netが有効の場合はそちらで探索する
if self.training and not self.enable_noisynet:
# ϵ をstepで減少。
epsilon = self.initial_epsilon - self.step*self.epsilon_step
if epsilon < self.final_epsilon:
epsilon = self.final_epsilon
# ϵ-greedy法
if epsilon > np.random.uniform(0, 1):
# ランダム
action = np.random.randint(0, self.nb_actions)
else:
# 現在の状態を取得し、最大Q値から行動を取得。
state0 = self.recent_observations[-self.window_length:]
q_values = self.model.predict(np.asarray([state0]), batch_size=1)[0]
action = np.argmax(q_values)
else:
# 現在の状態を取得し、最大Q値から行動を取得。
state0 = self.recent_observations[-self.window_length:]
q_values = self.model.predict(np.asarray([state0]), batch_size=1)[0]
action = np.argmax(q_values)
# リピート用
self.repeated_action = action
self.recent_action = action
return action
# 長いので関数に
def forward_train(self):
if not self.training:
return
# 報酬を割引しつつ加算する。
reward = 0
for i, r in enumerate(self.recent_reward):
reward += r * (self.gamma ** i)
self.memory.add(
(self.recent_observations[:self.window_length],
self.recent_action,
reward,
self.recent_observations[-self.window_length:]))
# ReplayMemory確保のため一定期間学習しない。
if self.step <= self.nb_steps_warmup:
return
# 学習の更新間隔
if self.step % self.train_interval != 0:
return
(indexes, batchs, weights) = self.memory.sample(self.batch_size, self.step)
state0_batch = []
action_batch = []
reward_batch = []
state1_batch = []
for batch in batchs:
state0_batch.append(batch[0])
action_batch.append(batch[1])
reward_batch.append(batch[2])
state1_batch.append(batch[3])
# 更新用に現在のQネットワークを出力(Q network)
outputs = self.model.predict(np.asarray(state0_batch), self.batch_size)
if self.enable_double_dqn:
# TargetNetworkとQNetworkのQ値を出す
state1_model_qvals_batch = self.model.predict(np.asarray(state1_batch), self.batch_size)
state1_target_qvals_batch = self.target_model.predict(np.asarray(state1_batch), self.batch_size)
for i in range(self.batch_size):
action = np.argmax(state1_model_qvals_batch[i]) # modelからアクションを出す
maxq = state1_target_qvals_batch[i][action] # Q値はtarget_modelを使って出す
td_error = reward_batch[i] + (self.gamma ** self.multireward_steps) * maxq
td_error *= weights[i]
td_error_diff = outputs[i][action_batch[i]] - td_error # TD誤差を取得
outputs[i][action_batch[i]] = td_error # 更新
# TD誤差を更新
self.memory.update(indexes[i], batchs[i], td_error_diff)
else:
# 次の状態のQ値を取得(target_network)
target_qvals = self.target_model.predict(np.asarray(state1_batch), self.batch_size)
# Q学習、Q(St,At)=Q(St,At)+α(r+γmax(St+1,At+1)-Q(St,At))
for i in range(self.batch_size):
maxq = np.max(target_qvals[i])
td_error = reward_batch[i] + (self.gamma ** self.multireward_steps) * maxq
td_error *= weights[i]
td_error_diff = outputs[i][action_batch[i]] - td_error # TD誤差を取得
outputs[i][action_batch[i]] = td_error
self.memory.update(batchs[i], td_error_diff)
# 学習
self.model.train_on_batch(np.asarray(state0_batch), np.asarray(outputs))
def backward(self, reward, terminal):
self.recent_reward.append(reward) # 最後に追加
self.recent_reward.pop(0) # 先頭を削除
# 一定間隔でtarget modelに重さをコピー
if self.step % self.target_model_update == 0:
self.target_model.set_weights(self.model.get_weights())
return []
@property
def layers(self):
return self.model.layers[:]
# NNモデルの作成
def build_network(self):
# 入力層(window_length, width, height)
c = input_ = Input(shape=(self.window_length,) + self.input_shape)
if self.enable_image_layer:
c = Permute((2, 3, 1))(c) # (window,w,h) -> (w,h,window)
c = Conv2D(32, (8, 8), strides=(4, 4), padding="same")(c)
c = Activation("relu")(c)
c = Conv2D(64, (4, 4), strides=(2, 2), padding="same")(c)
c = Activation("relu")(c)
c = Conv2D(64, (3, 3), strides=(1, 1), padding="same")(c)
c = Activation("relu")(c)
c = Flatten()(c)
if self.enable_dueling_network:
# value
v = Dense(self.dence_units_num, activation="relu")(c)
if self.enable_noisynet:
v = NoisyDense(1)(v)
else:
v = Dense(1)(v)
# advance
adv = Dense(self.dence_units_num, activation='relu')(c)
if self.enable_noisynet:
adv = NoisyDense(self.nb_actions)(adv)
else:
adv = Dense(self.nb_actions)(adv)
# 連結で結合
c = Concatenate()([v,adv])
c = Lambda(lambda a: K.expand_dims(a[:, 0], -1) + a[:, 1:] - K.mean(a[:, 1:], axis=1, keepdims=True), output_shape=(self.nb_actions,))(c)
else:
c = Dense(self.dence_units_num, activation="relu")(c)
if self.enable_noisynet:
c = NoisyDense(self.nb_actions, activation="linear")(c)
else:
c = Dense(self.nb_actions, activation="linear")(c)
return Model(input_, c)
class ReplayMemory():
def __init__(self, capacity):
self.capacity= capacity
self.index = 0
self.memory = []
def add(self, experience):
if len(self.memory) < self.capacity:
self.memory.append(None)
self.memory[self.index] = experience
self.index = (self.index + 1) % self.capacity
def update(self, idx, experience, td_error):
pass
def sample(self, batch_size, steps):
batchs = random.sample(self.memory, batch_size)
indexes = np.empty(batch_size, dtype='float32')
weights = [ 1 for _ in range(batch_size)]
return (indexes, batchs, weights)
import heapq
class _head_wrapper():
def __init__(self, data):
self.d = data
def __eq__(self, other):
return True
class PERGreedyMemory():
def __init__(self, capacity):
self.buffer = []
self.capacity = capacity
self.max_priority = 1
def add(self, experience):
if self.capacity <= len(self.buffer):
# 上限より多い場合は最後の要素を削除
self.buffer.pop()
# priority は最初は最大を選択
experience = _head_wrapper(experience)
heapq.heappush(self.buffer, (-self.max_priority, experience))
def update(self, idx, experience, td_error):
# heapqは最小値を出すためマイナス
experience = _head_wrapper(experience)
heapq.heappush(self.buffer, (-td_error, experience))
# 最大値を更新
if self.max_priority < td_error:
self.max_priority = td_error
def sample(self, batch_size, step):
# 取り出す(学習後に再度追加)
batchs = [heapq.heappop(self.buffer)[1].d for _ in range(batch_size)]
indexes = np.empty(batch_size, dtype='float32')
weights = [ 1 for _ in range(batch_size)]
return (indexes, batchs, weights)
#copy from https://github.com/jaromiru/AI-blog/blob/5aa9f0b/SumTree.py
import numpy
class SumTree:
write = 0
def __init__(self, capacity):
self.capacity = capacity
self.tree = numpy.zeros( 2*capacity - 1 )
self.data = numpy.zeros( capacity, dtype=object )
def _propagate(self, idx, change):
parent = (idx - 1) // 2
self.tree[parent] += change
if parent != 0:
self._propagate(parent, change)
def _retrieve(self, idx, s):
left = 2 * idx + 1
right = left + 1
if left >= len(self.tree):
return idx
if s <= self.tree[left]:
return self._retrieve(left, s)
else:
return self._retrieve(right, s-self.tree[left])
def total(self):
return self.tree[0]
def add(self, p, data):
idx = self.write + self.capacity - 1
self.data[self.write] = data
self.update(idx, p)
self.write += 1
if self.write >= self.capacity:
self.write = 0
def update(self, idx, p):
change = p - self.tree[idx]
self.tree[idx] = p
self._propagate(idx, change)
def get(self, s):
idx = self._retrieve(0, s)
dataIdx = idx - self.capacity + 1
return (idx, self.tree[idx], self.data[dataIdx])
class PERProportionalMemory():
def __init__(self, capacity, alpha, beta_initial, beta_steps, enable_is):
self.capacity = capacity
self.tree = SumTree(capacity)
self.alpha = alpha
self.beta_initial = beta_initial
self.beta_steps = beta_steps
self.enable_is = enable_is
self.max_priority = 1
self.size = 0
def add(self, experience):
self.tree.add(self.max_priority, experience)
self.size += 1
if self.size > self.capacity:
self.size = self.capacity
def update(self, index, experience, td_error):
priority = (abs(td_error) + 0.0001) ** self.alpha
self.tree.update(index, priority)
if self.max_priority < priority:
self.max_priority = priority
def sample(self, batch_size, step):
indexes = []
batchs = []
weights = np.empty(batch_size, dtype='float32')
if self.enable_is:
# βは最初は低く、学習終わりに1にする
beta = self.beta_initial + (1 - self.beta_initial) * step / self.beta_steps
# 合計を均等に割り、その範囲内からそれぞれ乱数を出す。
total = self.tree.total()
for i in range(batch_size):
# indexesにないものを追加
loop_over = True
for _ in range(100): # for safety
r = random.random()*total
(idx, priority, experience) = self.tree.get(r)
if idx not in indexes:
loop_over = False
break
assert not loop_over
indexes.append(idx)
batchs.append(experience)
if self.enable_is:
# 重要度サンプリングを計算
weights[i] = (self.size * priority / total) ** (-beta)
else:
weights[i] = 1 # 無効なら1
if self.enable_is:
# 安定性の理由から最大値で正規化
weights = weights / weights.max()
return (indexes ,batchs, weights)
import bisect
class _bisect_wrapper():
def __init__(self, data):
self.d = data
self.priority = 0
self.p = 0
def __lt__(self, o): # a<b
return self.priority > o.priority
def rank_sum(k, a):
return k*( 2+(k-1)*a )/2
def rank_sum_inverse(k, a):
if a == 0:
return k
t = a-2 + math.sqrt((2-a)**2 + 8*a*k)
return t/(2*a)
class PERRankBaseMemory():
def __init__(self, capacity, alpha, beta_initial, beta_steps, enable_is):
self.capacity = capacity
self.buffer = []
self.alpha = alpha
self.beta_initial = beta_initial
self.beta_steps = beta_steps
self.enable_is = enable_is
self.max_priority = 1
def add(self, experience):
if self.capacity <= len(self.buffer):
# 上限より多い場合は最後の要素を削除
self.buffer.pop()
experience = _bisect_wrapper(experience)
experience.priority = self.max_priority
bisect.insort(self.buffer, experience)
def update(self, index, experience, td_error):
priority = (abs(td_error) + 0.0001) # priority を計算
experience = _bisect_wrapper(experience)
experience.priority = priority
bisect.insort(self.buffer, experience)
if self.max_priority < priority:
self.max_priority = priority
def sample(self, batch_size, step):
indexes = []
batchs = []
weights = np.empty(batch_size, dtype='float32')
if self.enable_is:
# βは最初は低く、学習終わりに1にする。
beta = self.beta_initial + (1 - self.beta_initial) * step / self.beta_steps
# 合計値をだす
total = rank_sum(len(self.buffer), self.alpha)
# index_lst
index_lst = []
for _ in range(batch_size):
# index_lstにないものを追加
for _ in range(100): # for safety
r = random.random()*total
index = rank_sum_inverse(r, self.alpha)
index = int(index) # 整数にする(切り捨て)
if index not in index_lst:
index_lst.append(index)
break
assert len(index_lst) == batch_size
index_lst.sort()
buffer_size = len(self.buffer)
for i, index in enumerate(reversed(index_lst)):
o = self.buffer.pop(index) # 後ろから取得するのでindexに変化なし
batchs.append(o.d)
indexes.append(index)
if self.enable_is:
# 重要度サンプリングを計算
priority = (rank_sum(index+1, self.alpha) - rank_sum(index, self.alpha)) / total
weights[i] = (buffer_size * priority) ** (-beta)
else:
weights[i] = 1 # 無効なら1
if self.enable_is:
# 安定性の理由から最大値で正規化
weights = weights / weights.max()
return (indexes, batchs, weights)
#copy from https://github.com/OctThe16th/Noisy-A3C-Keras
class NoisyDense(Layer):
def __init__(self, units,
sigma_init=0.02,
activation=None,
use_bias=True,
kernel_initializer='glorot_uniform',
bias_initializer='zeros',
kernel_regularizer=None,
bias_regularizer=None,
activity_regularizer=None,
kernel_constraint=None,
bias_constraint=None,
**kwargs):
if 'input_shape' not in kwargs and 'input_dim' in kwargs:
kwargs['input_shape'] = (kwargs.pop('input_dim'),)
super(NoisyDense, self).__init__(**kwargs)
self.units = units
self.sigma_init = sigma_init
self.activation = activations.get(activation)
self.use_bias = use_bias
self.kernel_initializer = initializers.get(kernel_initializer)
self.bias_initializer = initializers.get(bias_initializer)
self.kernel_regularizer = regularizers.get(kernel_regularizer)
self.bias_regularizer = regularizers.get(bias_regularizer)
self.activity_regularizer = regularizers.get(activity_regularizer)
self.kernel_constraint = constraints.get(kernel_constraint)
self.bias_constraint = constraints.get(bias_constraint)
def build(self, input_shape):
assert len(input_shape) >= 2
self.input_dim = input_shape[-1]
self.kernel = self.add_weight(shape=(self.input_dim, self.units),
initializer=self.kernel_initializer,
name='kernel',
regularizer=self.kernel_regularizer,
constraint=self.kernel_constraint)
self.sigma_kernel = self.add_weight(shape=(self.input_dim, self.units),
initializer=initializers.Constant(value=self.sigma_init),
name='sigma_kernel'
)
if self.use_bias:
self.bias = self.add_weight(shape=(self.units,),
initializer=self.bias_initializer,
name='bias',
regularizer=self.bias_regularizer,
constraint=self.bias_constraint)
self.sigma_bias = self.add_weight(shape=(self.units,),
initializer=initializers.Constant(value=self.sigma_init),
name='sigma_bias')
else:
self.bias = None
self.epsilon_bias = None
self.epsilon_kernel = K.zeros(shape=(self.input_dim, self.units))
self.epsilon_bias = K.zeros(shape=(self.units,))
self.sample_noise()
super(NoisyDense, self).build(input_shape)
def call(self, X):
perturbation = self.sigma_kernel * self.epsilon_kernel
perturbed_kernel = self.kernel + perturbation
output = K.dot(X, perturbed_kernel)
if self.use_bias:
bias_perturbation = self.sigma_bias * self.epsilon_bias
perturbed_bias = self.bias + bias_perturbation
output = K.bias_add(output, perturbed_bias)
if self.activation is not None:
output = self.activation(output)
return output
def compute_output_shape(self, input_shape):
assert input_shape and len(input_shape) >= 2
assert input_shape[-1]
output_shape = list(input_shape)
output_shape[-1] = self.units
return tuple(output_shape)
def sample_noise(self):
K.set_value(self.epsilon_kernel, np.random.normal(0, 1, (self.input_dim, self.units)))
K.set_value(self.epsilon_bias, np.random.normal(0, 1, (self.units,)))
def remove_noise(self):
K.set_value(self.epsilon_kernel, np.zeros(shape=(self.input_dim, self.units)))
K.set_value(self.epsilon_bias, np.zeros(shape=self.units,))
#-------------------------------------------
def main_no_image():
env = gym.make("Pendulum-v0")
nb_actions = 5 # PendulumProcessorで5個と定義しているので5
processor = PendulumProcessorForDQN(enable_image=False)
# 引数が多いので辞書で定義して渡しています。
args={
"input_shape": env.observation_space.shape,
"enable_image_layer": False,
"nb_actions": nb_actions,
"window_length": 1, # 入力フレーム数
"memory_capacity": 10_000, # 確保するメモリーサイズ
"nb_steps_warmup": 200, # 初期のメモリー確保用step数(学習しない)
"target_model_update": 100, # target networkのupdate間隔
"action_interval": 1, # アクションを実行する間隔
"train_interval": 1, # 学習する間隔
"batch_size": 64, # batch_size
"gamma": 0.99, # Q学習の割引率
"initial_epsilon": 1.0, # ϵ-greedy法の初期値
"final_epsilon": 0.1, # ϵ-greedy法の最終値
"exploration_steps": 5000, # ϵ-greedy法の減少step数
"processor": processor,
# 今回追加分
"memory_type": "per_proportional", # メモリの種類
"per_alpha": 0.6, # PERの確率反映率
"per_beta_initial": 0.0, # IS反映率の初期値
"per_beta_steps": 100_000, # IS反映率の上昇step数
"per_enable_is": False, # ISを有効にするかどうか
"multireward_steps": 3, # multistep reward
"enable_double_dqn": True,
"enable_dueling_network": True,
"enable_noisynet": False,
"dence_units_num": 64, # Dence層のユニット数
}
agent = RainbowAgent(**args)
agent.compile(optimizer=Adam())
print(agent.model.summary())
# 訓練
print("--- start ---")
print("'Ctrl + C' is stop.")
history = agent.fit(env, nb_steps=50_000, visualize=False, verbose=1)
# 結果を表示
plt.subplot(1,1,1)
plt.plot(history.history["episode_reward"])
plt.xlabel("episode")
plt.ylabel("reward")
plt.show()
# 訓練結果を見る
processor.mode = "test" # env本来の報酬を返す
agent.test(env, nb_episodes=5, visualize=True)
def main_image():
env = gym.make("Pendulum-v0")
nb_actions = 5 # PendulumProcessorで5個と定義しているので5
processor = PendulumProcessorForDQN(enable_image=True, image_size=84)
# 引数が多いので辞書で定義して渡しています。
args={
"input_shape": (84, 84),
"enable_image_layer": True,
"nb_actions": nb_actions,
"window_length": 4, # 入力フレーム数
"memory_capacity": 1_000_000, # 確保するメモリーサイズ
"nb_steps_warmup": 200, # 初期のメモリー確保用step数(学習しない)
"target_model_update": 500, # target networkのupdate間隔
"action_interval": 1, # アクションを実行する間隔
"train_interval": 1, # 学習する間隔
"batch_size": 16, # batch_size
"gamma": 0.99, # Q学習の割引率
"initial_epsilon": 1.0, # ϵ-greedy法の初期値
"final_epsilon": 0.01, # ϵ-greedy法の最終値
"exploration_steps": 1000, # ϵ-greedy法の減少step数
"processor": processor,
# 今回追加分
"memory_type": "per_proportional", # メモリの種類
"per_alpha": 1.0, # PERの確率反映率
"per_beta_initial": 0.0, # IS反映率の初期値
"per_beta_steps": 5000, # IS反映率の上昇step数
"per_enable_is": False, # ISを有効にするかどうか
"multireward_steps": 1, # multistep reward
"enable_double_dqn": True,
"enable_dueling_network": True,
"enable_noisynet": False,
"dence_units_num": 64, # Dence層のユニット数
}
agent = RainbowAgent(**args)
agent.compile(optimizer=Adam())
print(agent.model.summary())
# 訓練
print("--- start ---")
print("'Ctrl + C' is stop.")
history = agent.fit(env, nb_steps=50_000, visualize=False, verbose=1)
# 結果を表示
plt.subplot(1,1,1)
plt.plot(history.history["episode_reward"])
plt.xlabel("episode")
plt.ylabel("reward")
plt.show()
# 訓練結果を見る
processor.mode = "test" # env本来の報酬を返す
agent.test(env, nb_episodes=5, visualize=True)
# コメントアウトで切り替え
main_no_image()
#main_image()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment