Skip to content

Instantly share code, notes, and snippets.

@mvrozanti
Created November 19, 2017 17:43
Show Gist options
  • Save mvrozanti/2f6a38a148c7c18f694e194c556d7b52 to your computer and use it in GitHub Desktop.
Save mvrozanti/2f6a38a148c7c18f694e194c556d7b52 to your computer and use it in GitHub Desktop.
import atexit
import itertools
import os
import random
from collections import deque
import numpy as np
from keras import backend as K
from keras.layers import Dense
from keras.models import Sequential
from keras.optimizers import Adam
class DQNAgent:
def __init__(self, state_size, action_size):
self.state_size = state_size
self.action_size = action_size
self.memory = deque(maxlen=2000)
self.gamma = 1.0 # discount rate
self.epsilon = 1.0 # exploration rate
self.epsilon_min = 0.01
self.epsilon_decay = 0.99#0.99
self.learning_rate = 0.1
self.model = self._build_model()
self.target_model = self._build_model()
self.update_target_model()
def _huber_loss(self, target, prediction):
# sqrt(1+error^2)-1
error = prediction - target
return K.mean(K.sqrt(1 + K.square(error)) - 1, axis=-1)
def _build_model(self):
# Neural Net for Deep-Q learning Model
model = Sequential()
model.add(Dense(12, input_dim=self.state_size, activation='relu'))
model.add(Dense(12, activation='relu'))
model.add(Dense(self.action_size, activation='linear'))
model.compile(loss=self._huber_loss, optimizer=Adam(lr=self.learning_rate))
return model
def update_target_model(self):
# copy weights from model to target_model
self.target_model.set_weights(self.model.get_weights())
def remember(self, state, action, reward, next_state, done):
self.memory.append((state, action, reward, next_state, done))
def act(self, state):
if np.random.rand() <= self.epsilon:
return random.randrange(self.action_size)
act_values = self.model.predict(state)
return np.argmax(act_values[0]) # returns action
def replay(self, batch_size):
minibatch = random.sample(self.memory, batch_size)
for state, action, reward, next_state, done in minibatch:
target = self.model.predict(state)
if done:
target[0][action] = reward
else:
a = self.model.predict(next_state)[0]
t = self.target_model.predict(next_state)[0]
target[0][action] = reward + self.gamma * t[np.argmax(a)]
self.model.fit(state, target, epochs=1, verbose=0)
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay
def load(self, name):
self.model.load_weights(name)
def save(self, name):
self.model.save_weights(name)
class TicTacToe():
def __init__(self):
self.reset()
def reset(self):
self.turn = random.choice([1, -1])
self.board = [[0 for _ in range(0, 3)] for _ in range(0, 3)]
return self.gen_state()
def winner(self):
for x in range(0, 3): # rows
if self.board[x][0] != 0 and self.board[x][0] == self.board[x][1] == self.board[x][2]:
return self.board[x][0]
for y in range(0, 3): # cols
if self.board[0][y] != 0 and self.board[0][y] == self.board[1][y] == self.board[2][y]:
return self.board[0][y]
if self.board[1][1] != 0 and self.board[0][0] == self.board[1][1] == self.board[2][2]: return self.board[1][1]
if self.board[1][1] != 0 and self.board[0][2] == self.board[1][1] == self.board[2][0]: return self.board[1][1]
return None
def phone2xy(self, phone):
return [i for i in itertools.product(range(3), repeat=2)][phone] # <3
def available_move(self):
pass
def step(self, action):
x, y = self.phone2xy(action)
if not (0 <= x <= 2) or not (0 <= y <= 2): return # self.gen_state(), -10, False
if self.board[x][y] == 0:
self.board[x][y] = self.turn
else:
return self.gen_state(), 1, False
winner = self.winner()
if winner:
reward = -1 if winner == self.turn else 0
else:
reward = 0
self.turn = -self.turn
return self.gen_state(), reward, bool(winner) or not sum([1 for row in self.board if 0 in row])
def gen_state(self):
return np.array([e for row in self.board for e in row])
def __str__(self):
s = '-' * 10 + '\n'
for row in self.board:
for cell in row:
s += str('x' if cell > 0 else ' ' if cell == 0 else 'o')
s += '\n'
return s + '-' * 10
def swap(agents):
aux = agents[-1]
agents[-1] = agents[1]
agents[1] = aux
if __name__ == "__main__":
INTERACTIVE = 0
EPISODES = 500000
state_size = action_size = 9
agent = DQNAgent(state_size, action_size)
agent2 = DQNAgent(state_size, action_size)
filename = 'ttt'
try:
agent.load(filename)
agent2.load(filename + '2')
except:
print('SOMETHING WENT VERY WRONG')
os._exit(0)
done = False
batch_size = 32
ttt = TicTacToe()
atexit.register(agent.save, filename)
atexit.register(agent2.save, filename + '2')
# agents = {-1:agent,1:agent2}
# if random.getrandbits(1): # avoid local minima?
# swap(agents)
if INTERACTIVE: agent.epsilon = agent2.epsilon = agent.epsilon_min
agent.total_reward = agent2.total_reward = 0
i = 1
for e in range(EPISODES):
state = ttt.reset()
state = np.reshape(state, [1, state_size])
for time in range(500):
if INTERACTIVE:
if ttt.turn == -1:
tile = int(input('tile='))
state, _, done = ttt.step(tile)
state = np.reshape(state, [1, state_size])
else:
if ttt.turn == 1: # random choice eliminates negative competition (?)
while True:
action = random.choice(range(action_size))
x, y = ttt.phone2xy(action)
if ttt.board[x][y] == 0: break
next_state, r, done = ttt.step(action)
state = np.reshape(next_state, [1, state_size])
else:
action = agent.act(state)
next_state, reward, done = ttt.step(action)
next_state = np.reshape(next_state, [1, state_size])
agent.total_reward += reward
print('Current reward =', reward)
agent.remember(state, action, reward, next_state, done)
state = next_state
i += 1
if done:
if not INTERACTIVE:
print(ttt)
print('Winner:', ttt.winner())
print('Epsilon', agent.epsilon)
print(
"episode: {}/{}, agent: {}, time: {}, e: {:.2}".format(e, EPISODES, agent, time, agent.epsilon))
print('Avg Reward=', agent.total_reward / i)
agent.update_target_model()
# if random.getrandbits(1): swap(agents)
break
if len(agent.memory) > batch_size:
agent.replay(batch_size)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment