Skip to content

Instantly share code, notes, and snippets.

@pocokhc
Last active May 23, 2019 13:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pocokhc/09f641449fbde8ea1a79cf117b49e964 to your computer and use it in GitHub Desktop.
Save pocokhc/09f641449fbde8ea1a79cf117b49e964 to your computer and use it in GitHub Desktop.
DQNのDuelingNetworkでSaliencyMapとGrad_CAMを試したコードです。
import gym
import pickle
import os
import numpy as np
import random
import tensorflow as tf
from keras.optimizers import Adam
from keras.models import Model
from keras.layers import *
from keras import backend as K
import rl.core
import matplotlib.pyplot as plt
from PIL import Image, ImageDraw
class PendulumProcessorForDQN(rl.core.Processor):
def __init__(self, enable_image=False, image_size=84):
self.image_size = image_size
self.enable_image = enable_image
self.mode = "train"
def process_observation(self, observation):
if not self.enable_image:
return observation
return self._get_rgb_state(observation) # reshazeせずに返す
def process_action(self, action):
ACT_ID_TO_VALUE = {
0: [-2.0],
1: [-1.0],
2: [0.0],
3: [+1.0],
4: [+2.0],
}
return ACT_ID_TO_VALUE[action]
def process_reward(self, reward):
if self.mode == "test": # testは本当の値を返す
return reward
# return np.clip(reward, -1., 1.)
# -16.5~0 を -1~1 に正規化
self.max = 0
self.min = -16.5
# min max normarization
if (self.max - self.min) == 0:
return 0
M = 1
m = -0.5
return ((reward - self.min) / (self.max - self.min))*(M - m) + m
# 状態(x,y座標)から対応画像を描画する関数
def _get_rgb_state(self, state):
img_size = self.image_size
h_size = img_size/2.0
img = Image.new("RGB", (img_size, img_size), (255, 255, 255))
dr = ImageDraw.Draw(img)
# 棒の長さ
l = img_size/4.0 * 3.0/ 2.0
# 棒のラインの描写
dr.line(((h_size - l * state[1], h_size - l * state[0]), (h_size, h_size)), (0, 0, 0), 1)
# 棒の中心の円を描写(それっぽくしてみた)
buff = img_size/32.0
dr.ellipse(((h_size - buff, h_size - buff), (h_size + buff, h_size + buff)),
outline=(0, 0, 0), fill=(255, 0, 0))
# 画像の一次元化(GrayScale化)とarrayへの変換
pilImg = img.convert("L")
img_arr = np.asarray(pilImg)
# 画像の規格化
img_arr = img_arr/255.0
return img_arr
class RainbowAgent(rl.core.Agent):
def __init__(self,
input_shape,
enable_image_layer,
nb_actions,
window_length=4, # 入力フレーム数
memory_type="replay", # 使用するメモリ
memory_capacity=1000000, # 確保するメモリーサイズ
per_alpha=0.6, # PERの確率反映率
per_beta_initial=0.4, # IS反映率の初期値
per_beta_steps=1000000, # IS反映率の上昇step数
per_enable_is=False, # ISを有効にするかどうか
nb_steps_warmup=50000, # 初期のメモリー確保用step数(学習しない)
target_model_update=500, # target networkのupdate間隔
action_interval=4, # アクションを実行する間隔
train_interval=4, # 学習間隔
batch_size=32, # batch_size
gamma=0.99, # Q学習の割引率
initial_epsilon=1.0, # ϵ-greedy法の初期値
final_epsilon=0.1, # ϵ-greedy法の最終値
exploration_steps=1000000, # ϵ-greedy法の減少step数
multireward_steps=3, # multistep reward
dence_units_num=512, # Dence層のユニット数
enable_double_dqn=False,
enable_dueling_network=False,
enable_noisynet=False,
**kwargs):
super(RainbowAgent, self).__init__(**kwargs)
self.compiled = False
self.input_shape = input_shape
self.enable_image_layer = enable_image_layer
self.nb_actions = nb_actions
self.window_length = window_length
self.nb_steps_warmup = nb_steps_warmup
self.target_model_update = target_model_update
self.action_interval = action_interval
self.train_interval = train_interval
self.gamma = gamma
self.batch_size = batch_size
self.multireward_steps = multireward_steps
self.dence_units_num = dence_units_num
self.initial_epsilon = initial_epsilon
self.epsilon_step = (initial_epsilon - final_epsilon) / exploration_steps
self.final_epsilon = final_epsilon
if memory_type == "replay":
self.memory = ReplayMemory(memory_capacity)
elif memory_type == "per_greedy":
self.memory = PERGreedyMemory(memory_capacity)
elif memory_type == "per_proportional":
self.memory = PERProportionalMemory(memory_capacity, per_alpha, per_beta_initial, per_beta_steps, per_enable_is)
elif memory_type == "per_rankbase":
self.memory = PERRankBaseMemory(memory_capacity, per_alpha, per_beta_initial, per_beta_steps, per_enable_is)
else:
raise ValueError('memory_type is ["replay","per_proportional","per_rankbase"]')
self.enable_double_dqn = enable_double_dqn
self.enable_dueling_network = enable_dueling_network
self.enable_noisynet = enable_noisynet
self.model = self.build_network() # Q network
self.target_model = self.build_network() # target network
assert memory_capacity > self.batch_size, "Memory capacity is small.(Larger than batch size)"
assert self.nb_steps_warmup > self.batch_size, "Warmup steps is few.(Larger than batch size)"
def reset_states(self):
self.recent_action = 0
self.repeated_action = 0
self.recent_reward = [0 for _ in range(self.multireward_steps)]
self.recent_observations = [np.zeros(self.input_shape) for _ in range(self.window_length+self.multireward_steps)]
def compile(self, optimizer=None, metrics=[]):
# target networkは更新がないので optimizerとlossは何でもいい
self.target_model.compile(optimizer='sgd', loss='mse')
def clipped_error_loss(y_true, y_pred):
err = y_true - y_pred # エラー
L2 = 0.5 * K.square(err)
L1 = K.abs(err) - 0.5
# エラーが[-1,1]区間ならL2、それ以外ならL1を選択する。
loss = tf.where((K.abs(err) < 1.0), L2, L1) # Keras does not cover where function in tensorflow :-(
return K.mean(loss)
self.model.compile(loss=clipped_error_loss, optimizer=optimizer, metrics=metrics)
self.compiled = True
def load_weights(self, filepath):
self.model.load_weights(filepath)
self.target_model.load_weights(filepath)
def save_weights(self, filepath, overwrite=False):
self.model.save_weights(filepath, overwrite=overwrite)
def forward(self, observation):
# windowサイズ分observationを保存する
self.recent_observations.append(observation) # 最後に追加
self.recent_observations.pop(0) # 先頭を削除
# 学習(次の状態が欲しいのでforwardで学習)
self.forward_train()
# フレームスキップ(action_interval毎に行動を選択する)
action = self.repeated_action
if self.step % self.action_interval == 0:
# noisy netが有効の場合はそちらで探索する
if self.training and not self.enable_noisynet:
# ϵ をstepで減少。
epsilon = self.initial_epsilon - self.step*self.epsilon_step
if epsilon < self.final_epsilon:
epsilon = self.final_epsilon
# ϵ-greedy法
if epsilon > np.random.uniform(0, 1):
# ランダム
action = np.random.randint(0, self.nb_actions)
else:
# 現在の状態を取得し、最大Q値から行動を取得。
state0 = self.recent_observations[-self.window_length:]
q_values = self.model.predict(np.asarray([state0]), batch_size=1)[0]
action = np.argmax(q_values)
else:
# 現在の状態を取得し、最大Q値から行動を取得。
state0 = self.recent_observations[-self.window_length:]
q_values = self.model.predict(np.asarray([state0]), batch_size=1)[0]
action = np.argmax(q_values)
# リピート用
self.repeated_action = action
self.recent_action = action
return action
# 長いので関数に
def forward_train(self):
if not self.training:
return
# 報酬を割引しつつ加算する。
reward = 0
for i, r in enumerate(self.recent_reward):
reward += r * (self.gamma ** i)
self.memory.add(
(self.recent_observations[:self.window_length],
self.recent_action,
reward,
self.recent_observations[-self.window_length:]))
# ReplayMemory確保のため一定期間学習しない。
if self.step <= self.nb_steps_warmup:
return
# 学習の更新間隔
if self.step % self.train_interval != 0:
return
(indexes, batchs, weights) = self.memory.sample(self.batch_size, self.step)
state0_batch = []
action_batch = []
reward_batch = []
state1_batch = []
for batch in batchs:
state0_batch.append(batch[0])
action_batch.append(batch[1])
reward_batch.append(batch[2])
state1_batch.append(batch[3])
# 更新用に現在のQネットワークを出力(Q network)
outputs = self.model.predict(np.asarray(state0_batch), self.batch_size)
if self.enable_double_dqn:
# TargetNetworkとQNetworkのQ値を出す
state1_model_qvals_batch = self.model.predict(np.asarray(state1_batch), self.batch_size)
state1_target_qvals_batch = self.target_model.predict(np.asarray(state1_batch), self.batch_size)
for i in range(self.batch_size):
action = np.argmax(state1_model_qvals_batch[i]) # modelからアクションを出す
maxq = state1_target_qvals_batch[i][action] # Q値はtarget_modelを使って出す
td_error = reward_batch[i] + (self.gamma ** self.multireward_steps) * maxq
td_error *= weights[i]
td_error_diff = outputs[i][action_batch[i]] - td_error # TD誤差を取得
outputs[i][action_batch[i]] = td_error # 更新
# TD誤差を更新
self.memory.update(indexes[i], batchs[i], td_error_diff)
else:
# 次の状態のQ値を取得(target_network)
target_qvals = self.target_model.predict(np.asarray(state1_batch), self.batch_size)
# Q学習、Q(St,At)=Q(St,At)+α(r+γmax(St+1,At+1)-Q(St,At))
for i in range(self.batch_size):
maxq = np.max(target_qvals[i])
td_error = reward_batch[i] + (self.gamma ** self.multireward_steps) * maxq
td_error *= weights[i]
td_error_diff = outputs[i][action_batch[i]] - td_error # TD誤差を取得
outputs[i][action_batch[i]] = td_error
self.memory.update(batchs[i], td_error_diff)
# 学習
self.model.train_on_batch(np.asarray(state0_batch), np.asarray(outputs))
def backward(self, reward, terminal):
self.recent_reward.append(reward) # 最後に追加
self.recent_reward.pop(0) # 先頭を削除
# 一定間隔でtarget modelに重さをコピー
if self.step % self.target_model_update == 0:
self.target_model.set_weights(self.model.get_weights())
return []
@property
def layers(self):
return self.model.layers[:]
# NNモデルの作成
def build_network(self):
# 入力層(window_length, width, height)
c = input_ = Input(shape=(self.window_length,) + self.input_shape)
if self.enable_image_layer:
c = Permute((2, 3, 1))(c) # (window,w,h) -> (w,h,window)
c = Conv2D(32, (8, 8), strides=(4, 4), padding="same", name="c1")(c)
c = Activation("relu")(c)
c = Conv2D(64, (4, 4), strides=(2, 2), padding="same", name="c2")(c)
c = Activation("relu")(c)
c = Conv2D(64, (3, 3), strides=(1, 1), padding="same", name="c3")(c)
c = Activation("relu")(c)
c = Flatten()(c)
if self.enable_dueling_network:
# value
v = Dense(self.dence_units_num, activation="relu")(c)
if self.enable_noisynet:
v = NoisyDense(1)(v)
else:
v = Dense(1, name="v")(v)
# advance
adv = Dense(self.dence_units_num, activation='relu')(c)
if self.enable_noisynet:
adv = NoisyDense(self.nb_actions)(adv)
else:
adv = Dense(self.nb_actions, name="adv")(adv)
# 連結で結合
c = Concatenate()([v,adv])
c = Lambda(lambda a: K.expand_dims(a[:, 0], -1) + a[:, 1:] - K.mean(a[:, 1:], axis=1, keepdims=True), output_shape=(self.nb_actions,))(c)
else:
c = Dense(self.dence_units_num, activation="relu")(c)
if self.enable_noisynet:
c = NoisyDense(self.nb_actions, activation="linear")(c)
else:
c = Dense(self.nb_actions, activation="linear")(c)
return Model(input_, c)
class ReplayMemory():
def __init__(self, capacity):
self.capacity= capacity
self.index = 0
self.memory = []
def add(self, experience):
if len(self.memory) < self.capacity:
self.memory.append(None)
self.memory[self.index] = experience
self.index = (self.index + 1) % self.capacity
def update(self, idx, experience, td_error):
pass
def sample(self, batch_size, steps):
batchs = random.sample(self.memory, batch_size)
indexes = np.empty(batch_size, dtype='float32')
weights = [ 1 for _ in range(batch_size)]
return (indexes, batchs, weights)
import heapq
class _head_wrapper():
def __init__(self, data):
self.d = data
def __eq__(self, other):
return True
class PERGreedyMemory():
def __init__(self, capacity):
self.buffer = []
self.capacity = capacity
self.max_priority = 1
def add(self, experience):
if self.capacity <= len(self.buffer):
# 上限より多い場合は最後の要素を削除
self.buffer.pop()
# priority は最初は最大を選択
experience = _head_wrapper(experience)
heapq.heappush(self.buffer, (-self.max_priority, experience))
def update(self, idx, experience, td_error):
# heapqは最小値を出すためマイナス
experience = _head_wrapper(experience)
heapq.heappush(self.buffer, (-td_error, experience))
# 最大値を更新
if self.max_priority < td_error:
self.max_priority = td_error
def sample(self, batch_size, step):
# 取り出す(学習後に再度追加)
batchs = [heapq.heappop(self.buffer)[1].d for _ in range(batch_size)]
indexes = np.empty(batch_size, dtype='float32')
weights = [ 1 for _ in range(batch_size)]
return (indexes, batchs, weights)
#copy from https://github.com/jaromiru/AI-blog/blob/5aa9f0b/SumTree.py
import numpy
class SumTree:
write = 0
def __init__(self, capacity):
self.capacity = capacity
self.tree = numpy.zeros( 2*capacity - 1 )
self.data = numpy.zeros( capacity, dtype=object )
def _propagate(self, idx, change):
parent = (idx - 1) // 2
self.tree[parent] += change
if parent != 0:
self._propagate(parent, change)
def _retrieve(self, idx, s):
left = 2 * idx + 1
right = left + 1
if left >= len(self.tree):
return idx
if s <= self.tree[left]:
return self._retrieve(left, s)
else:
return self._retrieve(right, s-self.tree[left])
def total(self):
return self.tree[0]
def add(self, p, data):
idx = self.write + self.capacity - 1
self.data[self.write] = data
self.update(idx, p)
self.write += 1
if self.write >= self.capacity:
self.write = 0
def update(self, idx, p):
change = p - self.tree[idx]
self.tree[idx] = p
self._propagate(idx, change)
def get(self, s):
idx = self._retrieve(0, s)
dataIdx = idx - self.capacity + 1
return (idx, self.tree[idx], self.data[dataIdx])
class PERProportionalMemory():
def __init__(self, capacity, alpha, beta_initial, beta_steps, enable_is):
self.capacity = capacity
self.tree = SumTree(capacity)
self.alpha = alpha
self.beta_initial = beta_initial
self.beta_steps = beta_steps
self.enable_is = enable_is
self.max_priority = 1
def add(self, experience):
self.tree.add(self.max_priority, experience)
def update(self, index, experience, td_error):
priority = (abs(td_error) + 0.0001) ** self.alpha
self.tree.update(index, priority)
if self.max_priority < priority:
self.max_priority = priority
def sample(self, batch_size, step):
indexes = []
batchs = []
weights = np.empty(batch_size, dtype='float32')
if self.enable_is:
# βは最初は低く、学習終わりに1にする
beta = self.beta_initial + (1 - self.beta_initial) * step / self.beta_steps
# 合計を均等に割り、その範囲内からそれぞれ乱数を出す。
total = self.tree.total()
section = total / batch_size
for i in range(batch_size):
r = section*i + random.random()*section
(idx, priority, experience) = self.tree.get(r)
indexes.append(idx)
batchs.append(experience)
if self.enable_is:
# 重要度サンプリングを計算
weights[i] = (self.capacity * priority / total) ** (-beta)
else:
weights[i] = 1 # 無効なら1
if self.enable_is:
# 安定性の理由から最大値で正規化
weights = weights / weights.max()
return (indexes ,batchs, weights)
import bisect
class _bisect_wrapper():
def __init__(self, data):
self.d = data
self.priority = 0
self.p = 0
def __lt__(self, o): # a<b
return self.priority > o.priority
class PERRankBaseMemory():
def __init__(self, capacity, alpha, beta_initial, beta_steps, enable_is):
self.capacity = capacity
self.buffer = []
self.alpha = alpha
self.beta_initial = beta_initial
self.beta_steps = beta_steps
self.enable_is = enable_is
self.max_priority = 1
def add(self, experience):
if self.capacity <= len(self.buffer):
# 上限より多い場合は最後の要素を削除
self.buffer.pop()
experience = _bisect_wrapper(experience)
experience.priority = self.max_priority
bisect.insort(self.buffer, experience)
def update(self, index, experience, td_error):
priority = (abs(td_error) + 0.0001) # priority を計算
experience = _bisect_wrapper(experience)
experience.priority = priority
bisect.insort(self.buffer, experience)
if self.max_priority < priority:
self.max_priority = priority
def sample(self, batch_size, step):
indexes = []
batchs = []
weights = np.empty(batch_size, dtype='float32')
if self.enable_is:
# βは最初は低く、学習終わりに1にする。
beta = self.beta_initial + (1 - self.beta_initial) * step / self.beta_steps
total = 0
for i, o in enumerate(self.buffer):
o.index = i
o.p = (len(self.buffer) - i) ** self.alpha
total += o.p
o.p_total = total
# 合計を均等に割り、その範囲内からそれぞれ乱数を出す。
index_lst = []
section = total / batch_size
rand = []
for i in range(batch_size):
rand.append(section*i + random.random()*section)
rand_i = 0
for i in range(len(self.buffer)):
if rand[rand_i] < self.buffer[i].p_total:
index_lst.append(i)
rand_i += 1
if rand_i >= len(rand):
break
for i, index in enumerate(reversed(index_lst)):
o = self.buffer.pop(index) # 後ろから取得するのでindexに変化なし
batchs.append(o.d)
indexes.append(index)
if self.enable_is:
# 重要度サンプリングを計算
priority = o.p
weights[i] = (self.capacity * priority / total) ** (-beta)
else:
weights[i] = 1 # 無効なら1
if self.enable_is:
# 安定性の理由から最大値で正規化
weights = weights / weights.max()
return (indexes, batchs, weights)
#copy from https://github.com/OctThe16th/Noisy-A3C-Keras
class NoisyDense(Layer):
def __init__(self, units,
sigma_init=0.02,
activation=None,
use_bias=True,
kernel_initializer='glorot_uniform',
bias_initializer='zeros',
kernel_regularizer=None,
bias_regularizer=None,
activity_regularizer=None,
kernel_constraint=None,
bias_constraint=None,
**kwargs):
if 'input_shape' not in kwargs and 'input_dim' in kwargs:
kwargs['input_shape'] = (kwargs.pop('input_dim'),)
super(NoisyDense, self).__init__(**kwargs)
self.units = units
self.sigma_init = sigma_init
self.activation = activations.get(activation)
self.use_bias = use_bias
self.kernel_initializer = initializers.get(kernel_initializer)
self.bias_initializer = initializers.get(bias_initializer)
self.kernel_regularizer = regularizers.get(kernel_regularizer)
self.bias_regularizer = regularizers.get(bias_regularizer)
self.activity_regularizer = regularizers.get(activity_regularizer)
self.kernel_constraint = constraints.get(kernel_constraint)
self.bias_constraint = constraints.get(bias_constraint)
def build(self, input_shape):
assert len(input_shape) >= 2
self.input_dim = input_shape[-1]
self.kernel = self.add_weight(shape=(self.input_dim, self.units),
initializer=self.kernel_initializer,
name='kernel',
regularizer=self.kernel_regularizer,
constraint=self.kernel_constraint)
self.sigma_kernel = self.add_weight(shape=(self.input_dim, self.units),
initializer=initializers.Constant(value=self.sigma_init),
name='sigma_kernel'
)
if self.use_bias:
self.bias = self.add_weight(shape=(self.units,),
initializer=self.bias_initializer,
name='bias',
regularizer=self.bias_regularizer,
constraint=self.bias_constraint)
self.sigma_bias = self.add_weight(shape=(self.units,),
initializer=initializers.Constant(value=self.sigma_init),
name='sigma_bias')
else:
self.bias = None
self.epsilon_bias = None
self.epsilon_kernel = K.zeros(shape=(self.input_dim, self.units))
self.epsilon_bias = K.zeros(shape=(self.units,))
self.sample_noise()
super(NoisyDense, self).build(input_shape)
def call(self, X):
perturbation = self.sigma_kernel * self.epsilon_kernel
perturbed_kernel = self.kernel + perturbation
output = K.dot(X, perturbed_kernel)
if self.use_bias:
bias_perturbation = self.sigma_bias * self.epsilon_bias
perturbed_bias = self.bias + bias_perturbation
output = K.bias_add(output, perturbed_bias)
if self.activation is not None:
output = self.activation(output)
return output
def compute_output_shape(self, input_shape):
assert input_shape and len(input_shape) >= 2
assert input_shape[-1]
output_shape = list(input_shape)
output_shape[-1] = self.units
return tuple(output_shape)
def sample_noise(self):
K.set_value(self.epsilon_kernel, np.random.normal(0, 1, (self.input_dim, self.units)))
K.set_value(self.epsilon_bias, np.random.normal(0, 1, (self.units,)))
def remove_noise(self):
K.set_value(self.epsilon_kernel, np.zeros(shape=(self.input_dim, self.units)))
K.set_value(self.epsilon_bias, np.zeros(shape=self.units,))
#-------------------------------------------
env = gym.make("Pendulum-v0")
nb_actions = 5 # PendulumProcessorで5個と定義しているので5
processor = PendulumProcessorForDQN(enable_image=True, image_size=84)
# 引数が多いので辞書で定義して渡しています。
args={
"input_shape": (84, 84),
"enable_image_layer": True,
"nb_actions": nb_actions,
"window_length": 4, # 入力フレーム数
"memory_capacity": 1_000_000, # 確保するメモリーサイズ
"nb_steps_warmup": 200, # 初期のメモリー確保用step数(学習しない)
"target_model_update": 500, # target networkのupdate間隔
"action_interval": 1, # アクションを実行する間隔
"train_interval": 1, # 学習する間隔
"batch_size": 16, # batch_size
"gamma": 0.99, # Q学習の割引率
"initial_epsilon": 1.0, # ϵ-greedy法の初期値
"final_epsilon": 0.1, # ϵ-greedy法の最終値
"exploration_steps": 1000, # ϵ-greedy法の減少step数
"processor": processor,
# 今回追加分
"memory_type": "per_proportional", # メモリの種類
"per_alpha": 1.0, # PERの確率反映率
"per_beta_initial": 0.0, # IS反映率の初期値
"per_beta_steps": 5000, # IS反映率の上昇step数
"per_enable_is": False, # ISを有効にするかどうか
"multireward_steps": 1, # multistep reward
"enable_double_dqn": True,
"enable_dueling_network": True,
"enable_noisynet": False,
"dence_units_num": 64, # Dence層のユニット数
}
agent = RainbowAgent(**args)
agent.compile(optimizer=Adam(lr=0.00025))
print(agent.model.summary())
# 訓練
print("--- start ---")
print("'Ctrl + C' is stop.")
history = agent.fit(env, nb_steps=50_000, visualize=False, verbose=1)
# 訓練結果を見る
class ObservationLogger(rl.callbacks.Callback):
def __init__(self):
self.observations = []
def on_step_end(self, step, logs):
self.observations.append(logs["observation"])
processor.mode = "test" # env本来の報酬を返す
logger = ObservationLogger()
agent.test(env, nb_episodes=1, visualize=True, callbacks=[logger])
#----------------------
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.animation
import cv2
def grad_cam(c_output, c_val, img, shape):
c_output = c_output[0]
c_val = c_val[0]
weights = np.mean(c_val, axis=(0, 1))
cam = np.dot(c_output, weights)
cam = cv2.resize(cam, shape, cv2.INTER_LINEAR)
cam = np.maximum(cam, 0)
cam = cam / cam.max()
cam = cv2.applyColorMap(np.uint8(255 * cam), cv2.COLORMAP_JET)
rate = 0.4
cam = cv2.addWeighted(src1=img, alpha=(1-rate), src2=cam, beta=rate, gamma=0)
cam = cv2.cvtColor(cam, cv2.COLOR_BGR2RGB) # 色をRGBに変換
return cam
def plot(frame):
if frame % 50 == 0: # debug
print(frame)
# global から変数を引っ張っているので注意
observations = logger.observations
window_length = agent.window_length
model = agent.model
# 入力分の frame がたまるまで待つ
if frame < window_length:
return
# 入力用の変数を作成
# 入力は window_length の長さ分必要(DQN編を参照)
input_state = observations[frame - window_length:frame]
# ついでに shape も取得
shape = np.asarray(observations[0]).shape
# 出力用のオリジナル画像を作成
# 形式は(w,h)でかつ0~1で正規化されているので画像形式に変換
img = np.asarray(observations[frame]) # (w,h)
img *= 255
img = cv2.cvtColor(np.uint8(img), cv2.COLOR_GRAY2BGR) # (w,h) -> (w,h,3)
c1_output = model.get_layer("c1").output
c2_output = model.get_layer("c2").output
c3_output = model.get_layer("c3").output
v_output = model.get_layer("v").output
adv_output = model.get_layer("adv").output
# 予測結果を出す
prediction = model.predict(np.asarray([input_state]), 1)[0]
class_idx = np.argmax(prediction)
class_output = model.output[0][class_idx]
# 各勾配を定義
# adv層は出力と同じ(action数)なので予測結果を指定
# v層はUnit数が1つしかないので0を指定
grads_c1 = K.gradients(class_output, c1_output)[0]
grads_c2 = K.gradients(class_output, c2_output)[0]
grads_c3 = K.gradients(class_output, c3_output)[0]
grads_adv = K.gradients(adv_output[0][class_idx], model.input)[0]
grads_v = K.gradients(v_output[0][0], model.input)[0]
# functionを定義、1度にすべて計算
grads_func = K.function([model.input, K.learning_phase()],
[c1_output, grads_c1, c2_output, grads_c2, c3_output, grads_c3, grads_adv, grads_v])
# 勾配を計算
(c1_output, c1_val, c2_output, c2_val, c3_output, c3_val, adv_val, v_val) = grads_func([np.asarray([input_state]), 0])
adv_val = adv_val[0][window_length-1] # window_length あるので最後のフレーム情報を取得
v_val = v_val[0][window_length-1] # window_length あるので最後のフレーム情報を取得
# SaliencyMap
adv_val = np.abs(adv_val.reshape(shape))
v_val = np.abs(v_val.reshape(shape))
# Grad-CAMの計算と画像化、3回も書きたくないので関数化
cam1 = grad_cam(c1_output, c1_val, img, shape)
cam2 = grad_cam(c2_output, c2_val, img, shape)
cam3 = grad_cam(c3_output, c3_val, img, shape)
# plot
imgs = [img, cam1, cam2, cam3, adv_val, v_val]
names = ["original", "c1", "c2", "c3", "advance", "value"]
cmaps = ["", "", "", "", "gray", "gray"]
for i in range(len(imgs)):
plt.subplot(2, 3, i+1)
plt.gca().tick_params(labelbottom="off",bottom="off") # x軸の削除
plt.gca().tick_params(labelleft="off",left="off") # y軸の削除
plt.title(names[i]).set_fontsize(12)
if cmaps[i] == "":
plt.imshow(imgs[i])
else:
plt.imshow(imgs[i], cmap=cmaps[i])
# animation
plt.figure(figsize=(8.0, 6.0), dpi = 100) # 大きさを指定
plt.axis('off')
# FuncAnimation で plot 関数を指定します。
ani = matplotlib.animation.FuncAnimation(plt.gcf(), plot, frames=len(logger.observations), interval=5)
#ani.save('anim.mp4', writer="ffmpeg")
#ani.save('anim.gif', writer="imagemagick", fps=60)
plt.show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment