Created
November 19, 2017 17:43
-
-
Save mvrozanti/2f6a38a148c7c18f694e194c556d7b52 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import atexit | |
import itertools | |
import os | |
import random | |
from collections import deque | |
import numpy as np | |
from keras import backend as K | |
from keras.layers import Dense | |
from keras.models import Sequential | |
from keras.optimizers import Adam | |
class DQNAgent: | |
def __init__(self, state_size, action_size): | |
self.state_size = state_size | |
self.action_size = action_size | |
self.memory = deque(maxlen=2000) | |
self.gamma = 1.0 # discount rate | |
self.epsilon = 1.0 # exploration rate | |
self.epsilon_min = 0.01 | |
self.epsilon_decay = 0.99#0.99 | |
self.learning_rate = 0.1 | |
self.model = self._build_model() | |
self.target_model = self._build_model() | |
self.update_target_model() | |
def _huber_loss(self, target, prediction): | |
# sqrt(1+error^2)-1 | |
error = prediction - target | |
return K.mean(K.sqrt(1 + K.square(error)) - 1, axis=-1) | |
def _build_model(self): | |
# Neural Net for Deep-Q learning Model | |
model = Sequential() | |
model.add(Dense(12, input_dim=self.state_size, activation='relu')) | |
model.add(Dense(12, activation='relu')) | |
model.add(Dense(self.action_size, activation='linear')) | |
model.compile(loss=self._huber_loss, optimizer=Adam(lr=self.learning_rate)) | |
return model | |
def update_target_model(self): | |
# copy weights from model to target_model | |
self.target_model.set_weights(self.model.get_weights()) | |
def remember(self, state, action, reward, next_state, done): | |
self.memory.append((state, action, reward, next_state, done)) | |
def act(self, state): | |
if np.random.rand() <= self.epsilon: | |
return random.randrange(self.action_size) | |
act_values = self.model.predict(state) | |
return np.argmax(act_values[0]) # returns action | |
def replay(self, batch_size): | |
minibatch = random.sample(self.memory, batch_size) | |
for state, action, reward, next_state, done in minibatch: | |
target = self.model.predict(state) | |
if done: | |
target[0][action] = reward | |
else: | |
a = self.model.predict(next_state)[0] | |
t = self.target_model.predict(next_state)[0] | |
target[0][action] = reward + self.gamma * t[np.argmax(a)] | |
self.model.fit(state, target, epochs=1, verbose=0) | |
if self.epsilon > self.epsilon_min: | |
self.epsilon *= self.epsilon_decay | |
def load(self, name): | |
self.model.load_weights(name) | |
def save(self, name): | |
self.model.save_weights(name) | |
class TicTacToe(): | |
def __init__(self): | |
self.reset() | |
def reset(self): | |
self.turn = random.choice([1, -1]) | |
self.board = [[0 for _ in range(0, 3)] for _ in range(0, 3)] | |
return self.gen_state() | |
def winner(self): | |
for x in range(0, 3): # rows | |
if self.board[x][0] != 0 and self.board[x][0] == self.board[x][1] == self.board[x][2]: | |
return self.board[x][0] | |
for y in range(0, 3): # cols | |
if self.board[0][y] != 0 and self.board[0][y] == self.board[1][y] == self.board[2][y]: | |
return self.board[0][y] | |
if self.board[1][1] != 0 and self.board[0][0] == self.board[1][1] == self.board[2][2]: return self.board[1][1] | |
if self.board[1][1] != 0 and self.board[0][2] == self.board[1][1] == self.board[2][0]: return self.board[1][1] | |
return None | |
def phone2xy(self, phone): | |
return [i for i in itertools.product(range(3), repeat=2)][phone] # <3 | |
def available_move(self): | |
pass | |
def step(self, action): | |
x, y = self.phone2xy(action) | |
if not (0 <= x <= 2) or not (0 <= y <= 2): return # self.gen_state(), -10, False | |
if self.board[x][y] == 0: | |
self.board[x][y] = self.turn | |
else: | |
return self.gen_state(), 1, False | |
winner = self.winner() | |
if winner: | |
reward = -1 if winner == self.turn else 0 | |
else: | |
reward = 0 | |
self.turn = -self.turn | |
return self.gen_state(), reward, bool(winner) or not sum([1 for row in self.board if 0 in row]) | |
def gen_state(self): | |
return np.array([e for row in self.board for e in row]) | |
def __str__(self): | |
s = '-' * 10 + '\n' | |
for row in self.board: | |
for cell in row: | |
s += str('x' if cell > 0 else ' ' if cell == 0 else 'o') | |
s += '\n' | |
return s + '-' * 10 | |
def swap(agents): | |
aux = agents[-1] | |
agents[-1] = agents[1] | |
agents[1] = aux | |
if __name__ == "__main__": | |
INTERACTIVE = 0 | |
EPISODES = 500000 | |
state_size = action_size = 9 | |
agent = DQNAgent(state_size, action_size) | |
agent2 = DQNAgent(state_size, action_size) | |
filename = 'ttt' | |
try: | |
agent.load(filename) | |
agent2.load(filename + '2') | |
except: | |
print('SOMETHING WENT VERY WRONG') | |
os._exit(0) | |
done = False | |
batch_size = 32 | |
ttt = TicTacToe() | |
atexit.register(agent.save, filename) | |
atexit.register(agent2.save, filename + '2') | |
# agents = {-1:agent,1:agent2} | |
# if random.getrandbits(1): # avoid local minima? | |
# swap(agents) | |
if INTERACTIVE: agent.epsilon = agent2.epsilon = agent.epsilon_min | |
agent.total_reward = agent2.total_reward = 0 | |
i = 1 | |
for e in range(EPISODES): | |
state = ttt.reset() | |
state = np.reshape(state, [1, state_size]) | |
for time in range(500): | |
if INTERACTIVE: | |
if ttt.turn == -1: | |
tile = int(input('tile=')) | |
state, _, done = ttt.step(tile) | |
state = np.reshape(state, [1, state_size]) | |
else: | |
if ttt.turn == 1: # random choice eliminates negative competition (?) | |
while True: | |
action = random.choice(range(action_size)) | |
x, y = ttt.phone2xy(action) | |
if ttt.board[x][y] == 0: break | |
next_state, r, done = ttt.step(action) | |
state = np.reshape(next_state, [1, state_size]) | |
else: | |
action = agent.act(state) | |
next_state, reward, done = ttt.step(action) | |
next_state = np.reshape(next_state, [1, state_size]) | |
agent.total_reward += reward | |
print('Current reward =', reward) | |
agent.remember(state, action, reward, next_state, done) | |
state = next_state | |
i += 1 | |
if done: | |
if not INTERACTIVE: | |
print(ttt) | |
print('Winner:', ttt.winner()) | |
print('Epsilon', agent.epsilon) | |
print( | |
"episode: {}/{}, agent: {}, time: {}, e: {:.2}".format(e, EPISODES, agent, time, agent.epsilon)) | |
print('Avg Reward=', agent.total_reward / i) | |
agent.update_target_model() | |
# if random.getrandbits(1): swap(agents) | |
break | |
if len(agent.memory) > batch_size: | |
agent.replay(batch_size) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment