Skip to content

Instantly share code, notes, and snippets.

@trtd56
Created July 30, 2016 06:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save trtd56/9988293d7f2de7f7b242197d7ffc9a7a to your computer and use it in GitHub Desktop.
Save trtd56/9988293d7f2de7f7b242197d7ffc9a7a to your computer and use it in GitHub Desktop.
Chainer✕OpenAI GymでDQN(もどき)に挑戦! ref: http://qiita.com/trtd56/items/3a09d37788d8d13ff131
$ pip install gym
class Neuralnet(Chain):
def __init__(self, n_in, n_out):
super(Neuralnet, self).__init__(
L1 = L.Linear(n_in, 100),
L2 = L.Linear(100, 100),
L3 = L.Linear(100, 100),
Q_value = L.Linear(100, n_out, initialW=np.zeros((n_out, 100), dtype=np.float32))
)
def Q_func(self, x):
h = F.leaky_relu(self.L1(x))
h = F.leaky_relu(self.L2(h))
h = F.leaky_relu(self.L3(h))
h = self.Q_value(h)
return h
class Agent():
def __init__(self, n_st, n_act, seed):
self.n_act = n_act
self.model = Neuralnet(n_st, n_act)
self.target_model = copy.deepcopy(self.model)
self.optimizer = optimizers.Adam()
self.optimizer.setup(self.model)
self.memory = deque()
self.loss = 0
self.step = 0
self.gamma = 0.99 # 割引率
self.mem_size = 1000 # Experience Replayのために覚えておく経験の数
self.batch_size = 100 # Experience Replayの際のミニバッチの大きさ
self.train_freq = 10 # ニューラルネットワークの学習間隔
self.target_update_freq = 20 # ターゲットネットワークの同期間隔
# ε-greedy
self.epsilon = 1 # εの初期値
self.epsilon_decay = 0.005 # εの減衰値
self.epsilon_min = 0 # εの最小値
self.exploration = 1000 # εを減衰し始めるまでのステップ数(今回はメモリーが貯まるまで)
def stock_experience(self, st, act, r, st_dash, ep_end):
self.memory.append((st, act, r, st_dash, ep_end))
if len(self.memory) > self.mem_size:
self.memory.popleft()
def suffle_memory(self):
mem = np.array(self.memory)
return np.random.permutation(mem)
def parse_batch(self, batch):
st, act, r, st_dash, ep_end = [], [], [], [], []
for i in xrange(self.batch_size):
st.append(batch[i][0])
act.append(batch[i][1])
r.append(batch[i][2])
st_dash.append(batch[i][3])
ep_end.append(batch[i][4])
st = np.array(st, dtype=np.float32)
act = np.array(act, dtype=np.int8)
r = np.array(r, dtype=np.float32)
st_dash = np.array(st_dash, dtype=np.float32)
ep_end = np.array(ep_end, dtype=np.bool)
return st, act, r, st_dash, ep_end
def experience_replay(self):
mem = self.suffle_memory()
perm = np.array(xrange(len(mem)))
for start in perm[::self.batch_size]:
index = perm[start:start+self.batch_size]
batch = mem[index]
st, act, r, st_d, ep_end = self.parse_batch(batch)
self.model.zerograds()
loss = self.forward(st, act, r, st_d, ep_end)
loss.backward()
self.optimizer.update()
def forward(self, st, act, r, st_dash, ep_end):
s = Variable(st)
s_dash = Variable(st_dash)
Q = self.model.Q_func(s)
tmp = self.target_model.Q_func(s_dash)
tmp = list(map(np.max, tmp.data))
max_Q_dash = np.asanyarray(tmp, dtype=np.float32)
target = np.asanyarray(copy.deepcopy(Q.data), dtype=np.float32)
for i in xrange(self.batch_size):
target[i, act[i]] = r[i] + (self.gamma * max_Q_dash[i]) * (not ep_end[i])
loss = F.mean_squared_error(Q, Variable(target))
return loss
def get_action(self, st):
if np.random.rand() < self.epsilon:
return np.random.randint(0, self.n_act)
else:
s = Variable(st)
Q = self.model.Q_func(s)
Q = Q.data[0]
a = np.argmax(Q)
return np.asarray(a, dtype=np.int8)
def reduce_epsilon(self):
if self.epsilon > self.epsilon_min and self.exploration < self.step:
self.epsilon -= self.epsilon_decay
def train(self):
if len(self.memory) >= self.mem_size:
if self.step % self.train_freq == 0:
self.experience_replay()
self.reduce_epsilon()
if self.step % self.target_update_freq == 0:
self.target_model = copy.deepcopy(self.model)
self.step += 1
def main(env_name):
env = gym.make(env_name)
view_path = "./video/" + env_name
n_st = env.observation_space.shape[0]
if type(env.action_space) == gym.spaces.discrete.Discrete:
# CartPole-v0, Acrobot-v0, MountainCar-v0
n_act = env.action_space.n
action_list = range(0, n_act)
elif type(env.action_space) == gym.spaces.box.Box:
# Pendulum-v0
action_list = [np.array([a]) for a in [-2.0, 2.0]]
n_act = len(action_list)
agent = Agent(n_st, n_act, seed)
env.monitor.start(view_path, video_callable=None, force=True, seed=seed)
for i_episode in xrange(1000):
observation = env.reset()
for t in xrange(200):
env.render()
state = observation.astype(np.float32).reshape((1,n_st))
act_i = agent.get_action(state)
action = action_list[act_i]
observation, reward, ep_end, _ = env.step(action)
state_dash = observation.astype(np.float32).reshape((1,n_st))
agent.stock_experience(state, act_i, reward, state_dash, ep_end)
agent.train()
if ep_end:
break
env.monitor.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment