TieTacToe game agent using a kind of reinforcement learning algorithms
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import pickle
# TicTacToe game has nine stateus with nine actions. An user can put his ston on any postion in the borad except
def set_state_inplace(S, action, P_no):
''' S is numpy array.'''
assert S[action] == 0, 'position should be empty to put a new stone'
S[action] = P_no # User numpy to insert action in the specific position
def calc_reward(S):
mask_l = tf.constant([[1,1,1,0,0,0,0,0,0], [0,0,0,1,1,1,0,0,0], [0,0,0,0,0,0,1,1,1],
[1,0,0,1,0,0,1,0,0], [0,1,0,0,1,0,0,1,0], [0,0,1,0,0,1,0,0,1],
[1,0,0,0,1,0,0,0,1], [0,0,1,0,1,0,1,0,0]], dtype=tf.int16)
for mask in mask_l:
# print(mask)
mask_S = mask * S
# print(mask_S)
for player in [1,2]:
abs_err = tf.reduce_sum(tf.abs(mask_S - player * mask))
# print(abs_err)
if abs_err == 0:
# print(f'Player{player} wins')
return player
return 0
def calc_reward_tf(S):
mask_l = tf.constant([[1,1,1,0,0,0,0,0,0], [0,0,0,1,1,1,0,0,0], [0,0,0,0,0,0,1,1,1],
[1,0,0,1,0,0,1,0,0], [0,1,0,0,1,0,0,1,0], [0,0,1,0,0,1,0,0,1],
[1,0,0,0,1,0,0,0,1], [0,0,1,0,1,0,1,0,0]], dtype=tf.int32)
S = tf.constant(S, dtype=tf.int32)
S = tf.reshape(S, shape=(1,-1))
S_cp = tf.matmul(tf.ones((mask_l.shape[0],1),dtype=tf.int32), S)
mask_S = mask_l * S_cp
for player in [1, 2]:
if tf.reduce_any(tf.reduce_sum(tf.abs(mask_S - player * mask_l),axis=1) == 0):
return player
return 0
def one_of_amax(arr, disp_flag=False):
results = np.where(arr == np.amax(arr))[0]
if disp_flag:
print('Equally max actions:', results)
action = results[np.random.randint(0, len(results), 1)[0]]
return action
class Q_System:
def __init__(self, N_A=9, N_Symbols=3, epsilon=0.01, disp_flag=False):
N_A : Number of actions
N_Symbols : Number of possible symbols in each point: 0, 1, 2,
representing empty, player1, player2
N_S : Number of states
if N_A is not None:
self.disp_flag = disp_flag
N_S = N_Symbols**N_A
self.Qsa = [np.zeros((N_S,N_A)), np.zeros((N_S,N_A))]
self.N_A = N_A
self.N_Symbols = N_Symbols
self.epsilon = epsilon
self.disp_flag = False
self.Qsa = None
self.N_A = None
self.N_Symbols = None
self.epsilon = None
def save(self):
f = open('tictactoe_data.pckl', 'wb')
obj = [self.N_A, self.N_Symbols, self.epsilon, self.Qsa]
pickle.dump(obj, f)
def load(self):
f = open('tictactoe_data.pckl', 'rb')
obj = pickle.load(f)
[self.N_A, self.N_Symbols, self.epsilon, self.Qsa] = obj
def calc_S_idx(self, S):
S_idx = 0
unit = 1
for s in S:
S_idx += s*unit
unit *= self.N_Symbols
return S_idx
def _policy_random(self, S, a):
return 1 / self.N_A
def policy_random(self, P_no, S, action_list):
action_prob = []
S_idx = self.calc_S_idx(S)
for _ in action_list:
action_idx = tf.squeeze(tf.random.categorical(tf.math.log([action_prob]),1)).numpy()
if action_idx == len(action_prob): # if all zeros in actoin_prob
action = action_list[tf.squeeze(np.random.randint(0, len(action_list), 1))]
action = action_list[action_idx]
if self.disp_flag:
print('S_idx', S_idx, 'action', action, 'action_list', action_list, 'action_prob', action_prob)
return action
def policy(self, P_no, S, action_list):
action_prob = []
S_idx = self.calc_S_idx(S)
for a in action_list:
# We consider max Q with epsilon greedy
if tf.squeeze(tf.random.uniform([1,1])) > self.epsilon:
action = action_list[one_of_amax(action_prob)]
action = action_list[np.random.randint(0,len(action_list),1)[0]]
if self.disp_flag:
print('S_idx', S_idx, 'action', action,
'action_list', action_list, 'action_prob', action_prob)
return action
def _r0_policy(self, P_no, S, action_list):
action_prob = []
S_idx = self.calc_S_idx(S)
for a in action_list:
action_prob.append(self.Qsa[P_no-1][S_idx, a])
action_idx = tf.squeeze(tf.random.categorical(tf.math.log([action_prob]),1)).numpy()
if action_idx == len(action_prob): # if all zeros in actoin_prob
action = action_list[tf.squeeze(np.random.randint(0, len(action_list), 1))]
action = action_list[action_idx]
if self.disp_flag:
print('S_idx', S_idx, 'action', action, 'action_list', action_list, 'action_prob', action_prob)
return action
def find_action_list(self, S):
action_list = []
no_occupied = 0
for a in range(self.N_A):
if S[a] == 0:
no_occupied += 1
return action_list, no_occupied
# Take action_prob at the given state
def get_action(self, P_no, S):
"""Return action, done
action_list, no_occupied = self.find_action_list(S)
# Since number of possible actions are reduced,
# denominator is also updated.
action = self.policy(P_no, S, action_list)
done = no_occupied == (self.N_A - 1)
return action, done
def get_action_with_random(self, P_no, S):
"""Return action, done
action_list, no_occupied = self.find_action_list(S)
# Since number of possible actions are reduced,
# denominator is also updated.
if P_no == 1:
action = self.policy(P_no, S, action_list)
action = self.policy_random(P_no, S, action_list)
done = no_occupied == (self.N_A - 1)
return action, done
def get_action_with_human(self, P_no, S):
action, done = get_action_with_human(self, P_no, S)
- Playing with human
P_no : Human player index which represents 1=fist, 2=second playing
action_list, no_occupied = self.find_action_list(S)
# Since number of possible actions are reduced,
# denominator is also updated.
print('The current game state is:')
print('Action index:')
print('Avaiable actions: ', action_list)
rand_idx = np.random.randint(0, len(action_list))
random_action = action_list[int(rand_idx)]
action = None
while action not in action_list:
action = input_default(f'Type your action (default={random_action}): ', random_action, int)
if action not in action_list:
print('Type action again among in the avaible action list:', action_list)
done = no_occupied == (self.N_A - 1)
return action, done
def play(self, P_no):
Buff = play(self, P_no)
P_no: player number, which is 1 or 2
Buff = {'P_no': [], 'S':[], 'a':[], 'r':[], 'S_next': []}: gathered information during learning
where S, a, r, S_next are state, action, rewrd, and next state
1. Buff =
2. Buff =
N_A = self.N_A
Buff = {'P_no': [], 'S':[], 'a':[], 'r':[], 'S_next': []}
S = np.zeros((N_A,),dtype='int16') # #state == #action
if self.disp_flag:
print('S:', S)
done = False
while done == False:
action, done = self.get_action(P_no, S)
set_state_inplace(S, action, P_no)
if self.disp_flag:
print('S:', S)
win_player = calc_reward_tf(S)
reward = 0 if win_player == 0 else 1
P_no = 1 if P_no == 2 else 2
if win_player:
done = True
if self.disp_flag:
if win_player:
print(f'player {win_player} win')
print(f'Tie game')
return Buff
def play_with_random(self, P_no):
Buff = play(self, P_no)
P_no: player number, which is 1 or 2
Buff = {'P_no': [], 'S':[], 'a':[], 'r':[], 'S_next': []}: gathered information during learning
where S, a, r, S_next are state, action, rewrd, and next state
1. Buff =
2. Buff =
N_A = self.N_A
Buff = {'P_no': [], 'S':[], 'a':[], 'r':[], 'S_next': []}
S = np.zeros((N_A,),dtype='int16') # #state == #action
if self.disp_flag:
print('S:', S)
done = False
while done == False:
action, done = self.get_action_with_random(P_no, S)
set_state_inplace(S, action, P_no)
if self.disp_flag:
print('S:', S)
win_player = calc_reward_tf(S)
reward = 0 if win_player == 0 else 1
P_no = 1 if P_no == 2 else 2
if win_player:
done = True
if self.disp_flag:
if win_player:
print(f'player {win_player} win')
print(f'Tie game')
return Buff
def play_with_human(self, player_human=1):
Buff = play_with_human(self, P_no)
- Playing with human
P_no: player number, which is 1 or 2
Buff = {'P_no': [], 'S':[], 'a':[], 'r':[], 'S_next': []}: gathered information during learning
where S, a, r, S_next are state, action, rewrd, and next state
1. Buff =
2. Buff =
N_A = self.N_A
Buff = {'P_no': [], 'S':[], 'a':[], 'r':[], 'S_next': []}
S = np.zeros((N_A,),dtype='int16') # #state == #action
if self.disp_flag:
print('S:', S)
P_no = 1 # set P_no = 1 while player_human could be 1 or 2
done = False
while done == False:
if player_human == P_no:
action, done = self.get_action_with_human(P_no, S)
P_no_trained_agent = 1 # random agent is 2
action, done = self.get_action_with_random(P_no_trained_agent, S)
set_state_inplace(S, action, P_no)
if self.disp_flag:
print('S:', S)
win_player = calc_reward_tf(S)
reward = 0 if win_player == 0 else 1
P_no = 1 if P_no == 2 else 2
if win_player:
done = True
if win_player == player_human:
print('You win')
elif win_player != 0:
print('You lose')
print('Tie game')
return Buff
def update_Qsa_inplace(self, Buff, ff=0.9, lr=0.01):
def discounted_inplace(Buff_r):
Convert a reward vector to a discounted return vector using ff,
where ff means forgeting factor.
[Input] Buff_r = Buff[r]: stores rewards in each episode
g_prev = 0
for i, r_l in enumerate(reversed(Buff_r)):
Buff_r[-i-1] = r_l + ff * g_prev
g_prev = Buff_r[-i-1]
def updateQsa_inplace(Qsa_player, Buff_player):
if self.disp_flag:
print('S, S_idx, a, lr * r, Qsa_player[S_idx,a]')
for S, a, r in zip(Buff_player['S'], Buff_player['a'], Buff_player['r']):
S_idx = self.calc_S_idx(S)
Qsa_player[S_idx,a] += lr * r
if self.disp_flag:
print(S, S_idx, a, lr * r, Qsa_player[S_idx,a])
# def updateQsa_stages_inplace(player, Qsa_player, Buff_player, disp_flag=True):
def buff_depart(Buff):
Buff_dual = [{'S':[], 'a':[], 'r':[]}, {'S':[], 'a':[], 'r':[]}]
for i, (p, S, a, r) in enumerate(zip(Buff['P_no'], Buff['S'], Buff['a'], Buff['r'])):
if i > 0:
# final reward for a player is reward of a next player
prev_p = 2 if p==1 else 1
Buff_dual[prev_p-1]['r'][-1] = -r # 1 for player#2 --> -1 for player#1, vice versa
if self.disp_flag:
print('i, prev_p, Buff_dual[prev_p-1]')
print(i, prev_p, Buff_dual[prev_p-1])
if self.disp_flag:
print('i, p, Buff_dual[p-1]')
print(i, p, Buff_dual[p-1])
return Buff_dual
Buff_dual = buff_depart(Buff)
# player#1
for player in [1,2]:
discounted_inplace(Buff_dual[player-1]['r']) # for player#1
if self.disp_flag:
print('player:', player)
print("Buff_dual[player-1]['r']", Buff_dual[player-1]['r'])
updateQsa_inplace(self.Qsa[player-1], Buff_dual[player-1])
# updateQsa_stages_inplace(player, self.Qsa_stages[player-1], Buff_dual[player-1])
def learning(self, N_episodes=2, ff=0.9, lr=0.01, print_cnt=10):
cnt_trace = [cnt, ...]: cnt vector are stacked in cnt_trace
cnt = [0, 0, 0] # tie, p1, p2
cnt_trace = [cnt.copy()]
player = 1
for episode in range(N_episodes):
# print('===================================')
# Can save this data for play 2 as well
# Decrease epsilon with respect to epside
self.epsilon = 1 / (1 + episode/100)
# self.epsilon = 1 / (1 + episode)
Buff =
self.update_Qsa_inplace(Buff, ff=ff, lr=lr)
win_player = 0 if Buff['r'][-1] == 0 else Buff['P_no'][-1]
cnt[win_player] += 1
player = 2 if player == 1 else 1
if episode % print_cnt == 0:
print(episode, cnt)
print('S = [0,0,0, 0,0,0, 0,0,0]')
print('Qsa[0][0,:]', [f'{self.Qsa[0][0,a]:.1e}' for a in range(9)])
print('Qsa[1][0,:]', [f'{self.Qsa[1][0,a]:.1e}' for a in range(9)])
S = [1,1,0, 2,1,2, 1,2,2]
S_idx = self.calc_S_idx(S)
print('S = ', S)
print(f'Qsa[0][{S_idx},:]', [f'{self.Qsa[0][S_idx,a]:.1e}' for a in range(9)])
print(f'Qsa[1][{S_idx},:]', [f'{self.Qsa[1][S_idx,a]:.1e}' for a in range(9)])
S = [1,1,0, 2,0,0, 2,0,0]
S_idx = self.calc_S_idx(S)
print('S = ', S)
print(f'Qsa[0][{S_idx},:]', [f'{self.Qsa[0][S_idx,a]:.1e}' for a in range(9)])
print(f'Qsa[1][{S_idx},:]', [f'{self.Qsa[1][S_idx,a]:.1e}' for a in range(9)])
return cnt_trace
class Testing:
def __init__(self, fn_name):
- Testing('calc_reward_tf')
if fn_name == 'calc_reward_tf':
elif fn_name == 'find_action_list':
elif fn_name == 'get_action':
elif fn_name == 'all':
def test_calc_reward_tf(self):
S_examples = tf.constant([[0,0,0, 0,0,0, 0,0,0],
[1,1,1, 2,0,2, 2,0,0],
[0,0,2, 1,2,1, 2,0,0]])
print('Testing: calc_reward_tf')
answer = [0, 1, 2]
test = [calc_reward_tf(S) for S in S_examples]
if test == answer:
print('Test Ok')
print('Test fail')
def test_find_action_list(self):
print('Testing: test_find_action_list')
print('''[[0 0 0]
[0 0 0]
[0 0 0]] [0, 1, 2, 3, 4, 5, 6, 7, 8] 0
[[0 2 0]
[0 1 0]
[1 0 2]] [0, 2, 3, 5, 7] 4''')
N_A = 9
N_Symbols = 3
my_Q_System = Q_System(N_A, N_Symbols)
S_l = [[0,0,0, 0,0,0, 0,0,0], [0,2,0, 0,1,0, 1,0,2]]
for S in S_l:
action_list, no_occupied = my_Q_System.find_action_list(S)
print(np.reshape(S,(3,3)), action_list, no_occupied)
def test_get_action(self):
print('Testing: get_action')
Equally max actions: [0]
S_idx 0 action 0 action_list [0, 1, 2, 3, 4, 5, 6, 7, 8] action_prob [1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
[[0 0 0]
[0 0 0]
[0 0 0]] 1 0
Equally max actions: [0]
S_idx 13950 action 0 action_list [0, 1, 3, 5, 7] action_prob [1.0, 0.0, 0.0, 0.0, 0.0]
[[0 0 2]
[0 1 0]
[1 0 2]] 1 0
Equally max actions: [0]
S_idx 0 action 0 action_list [0, 1, 2, 3, 4, 5, 6, 7, 8] action_prob [1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
[[0 0 0]
[0 0 0]
[0 0 0]] 2 0
Equally max actions: [0]
S_idx 13950 action 0 action_list [0, 1, 3, 5, 7] action_prob [1.0, 0.0, 0.0, 0.0, 0.0]
[[0 0 2]
[0 1 0]
[1 0 2]] 2 0''')
N_A = 9
N_Symbols = 3
my_Q_System = Q_System(N_A, N_Symbols)
S_l = [[0,0,0, 0,0,0, 0,0,0], [0,0,2, 0,1,0, 1,0,2]]
for P_no in [1,2]:
for S in S_l:
S_idx = my_Q_System.calc_S_idx(S)
my_Q_System.Qsa[P_no-1][S_idx,:] = np.array([1.0,0.0,0, 0,0,0, 0,0,0])
action, _ = my_Q_System.get_action(P_no, S)
print(np.reshape(S,(3,3)), P_no, action)
def _main():
ff = 0.9
lr = 0.01
N_episodes = 2
N_Symbols = 3 # 0=empty, 1=plyaer1, 2=player2
N_A = 9 # (0,0), (0,1), ..., (2,2)
my_Q_System = Q_System(N_A, N_Symbols)
cnt = [0, 0, 0] # tie, p1, p2
player = 1
for episode in range(N_episodes):
# print('===================================')
# Can save this data for play 2 as well
Buff =
my_Q_System.update_Qsa_inplace(Buff, ff=ff, lr=lr)
win_player = 0 if Buff['r'][-1] == 0 else Buff['P_no'][-1]
cnt[win_player] += 1
player = 2 if player == 1 else 1
if episode % 10 == 0:
print(episode, cnt)
def plot_cnt_trace(cnt_trace):
N_cnt = len(cnt_trace)
cnt_d = {'Equal':np.zeros(N_cnt,dtype=int), 'P1':np.zeros(N_cnt,dtype=int), 'P2':np.zeros(N_cnt,dtype=int)}
for i, cnt in enumerate(cnt_trace):
cnt_d['Equal'][i] = cnt[0]
cnt_d['P1'][i] = cnt[1]
cnt_d['P2'][i] = cnt[2]
plt.plot(range(N_cnt), cnt_d['Equal'], label='Equal')
plt.plot(range(N_cnt), cnt_d['P1'], label='Plyaer1 wins')
plt.plot(range(N_cnt), cnt_d['P2'], label='Plyaer2 wins')
plt.title('Learned (P#1) vs. Random (P#2) policies during learning')
def learning_stage(N_episodes=100, save_flag=True, fig_flag=False):
ff = 0.9
lr = 0.01
N_Symbols = 3 # 0=empty, 1=plyaer1, 2=player2
N_A = 9 # (0,0), (0,1), ..., (2,2)
print_cnt = N_episodes / 10
my_Q_System = Q_System(N_A, N_Symbols)
cnt_trace = my_Q_System.learning(N_episodes=N_episodes, ff=ff, lr=lr, print_cnt=print_cnt)
cnt = cnt_trace[-1]
print(N_episodes, cnt)
if save_flag:
if fig_flag:
return my_Q_System
def input_default(str, defalut_value, dtype=int):
answer = input(str)
if answer == '':
return defalut_value
return dtype(answer)
def main():
Q1 = input_default('1. Loading a trained agent (0) or Learning a new agent (1)? (default=0) ', 0, int)
if Q1 == 0:
print('Loading the trained agent...')
Q2 = input_default('2. Do you want to start first?(0=yes,1=no,default=0) ', 0, int)
player_human = Q2 + 1
if player_human == 1:
print('You=1, Agent=2')
print('Agent=1, You=2')
trained_Q_System = Q_System(None)
# print(len(trained_Q_System.Qsa))
print('Start to learn a new agent...')
Q2 = input_default('2. How many episode do you want to learn?(default=10000) ', 10000, int)
# my_Q_System = learning_stage(N_episodes=Q2, fig_flag=True)
_ = learning_stage(N_episodes=Q2, fig_flag=True)
# print(len(my_Q_System.Qsa))
if __name__ == "__main__":
# Testing('all')
