Skip to content

Instantly share code, notes, and snippets.

@jskDr
Last active March 15, 2020 05:42
Show Gist options
  • Save jskDr/e4b8a64aebafe822e70043d13d9304a1 to your computer and use it in GitHub Desktop.
Save jskDr/e4b8a64aebafe822e70043d13d9304a1 to your computer and use it in GitHub Desktop.
TieTacToe game agent using a kind of reinforcement learning algorithms
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import pickle
# TicTacToe game has nine stateus with nine actions. An user can put his ston on any postion in the borad except
def set_state_inplace(S, action, P_no):
''' S is numpy array.'''
assert S[action] == 0, 'position should be empty to put a new stone'
S[action] = P_no # User numpy to insert action in the specific position
def calc_reward(S):
mask_l = tf.constant([[1,1,1,0,0,0,0,0,0], [0,0,0,1,1,1,0,0,0], [0,0,0,0,0,0,1,1,1],
[1,0,0,1,0,0,1,0,0], [0,1,0,0,1,0,0,1,0], [0,0,1,0,0,1,0,0,1],
[1,0,0,0,1,0,0,0,1], [0,0,1,0,1,0,1,0,0]], dtype=tf.int16)
for mask in mask_l:
# print(mask)
mask_S = mask * S
# print(mask_S)
for player in [1,2]:
abs_err = tf.reduce_sum(tf.abs(mask_S - player * mask))
# print(abs_err)
if abs_err == 0:
# print(f'Player{player} wins')
return player
return 0
def calc_reward_tf(S):
mask_l = tf.constant([[1,1,1,0,0,0,0,0,0], [0,0,0,1,1,1,0,0,0], [0,0,0,0,0,0,1,1,1],
[1,0,0,1,0,0,1,0,0], [0,1,0,0,1,0,0,1,0], [0,0,1,0,0,1,0,0,1],
[1,0,0,0,1,0,0,0,1], [0,0,1,0,1,0,1,0,0]], dtype=tf.int32)
S = tf.constant(S, dtype=tf.int32)
S = tf.reshape(S, shape=(1,-1))
S_cp = tf.matmul(tf.ones((mask_l.shape[0],1),dtype=tf.int32), S)
mask_S = mask_l * S_cp
for player in [1, 2]:
if tf.reduce_any(tf.reduce_sum(tf.abs(mask_S - player * mask_l),axis=1) == 0):
return player
return 0
def one_of_amax(arr, disp_flag=False):
results = np.where(arr == np.amax(arr))[0]
if disp_flag:
print('Equally max actions:', results)
action = results[np.random.randint(0, len(results), 1)[0]]
return action
class Q_System:
def __init__(self, N_A=9, N_Symbols=3, epsilon=0.01, disp_flag=False):
"""
N_A : Number of actions
N_Symbols : Number of possible symbols in each point: 0, 1, 2,
representing empty, player1, player2
N_S : Number of states
"""
if N_A is not None:
self.disp_flag = disp_flag
N_S = N_Symbols**N_A
self.Qsa = [np.zeros((N_S,N_A)), np.zeros((N_S,N_A))]
self.N_A = N_A
self.N_Symbols = N_Symbols
self.epsilon = epsilon
else:
self.disp_flag = False
self.Qsa = None
self.N_A = None
self.N_Symbols = None
self.epsilon = None
def save(self):
f = open('tictactoe_data.pckl', 'wb')
obj = [self.N_A, self.N_Symbols, self.epsilon, self.Qsa]
pickle.dump(obj, f)
f.close()
def load(self):
f = open('tictactoe_data.pckl', 'rb')
obj = pickle.load(f)
[self.N_A, self.N_Symbols, self.epsilon, self.Qsa] = obj
f.close()
def calc_S_idx(self, S):
S_idx = 0
unit = 1
for s in S:
S_idx += s*unit
unit *= self.N_Symbols
return S_idx
def _policy_random(self, S, a):
return 1 / self.N_A
def policy_random(self, P_no, S, action_list):
action_prob = []
S_idx = self.calc_S_idx(S)
for _ in action_list:
action_prob.append(1/len(action_list))
action_idx = tf.squeeze(tf.random.categorical(tf.math.log([action_prob]),1)).numpy()
if action_idx == len(action_prob): # if all zeros in actoin_prob
action = action_list[tf.squeeze(np.random.randint(0, len(action_list), 1))]
else:
action = action_list[action_idx]
if self.disp_flag:
print('S_idx', S_idx, 'action', action, 'action_list', action_list, 'action_prob', action_prob)
return action
def policy(self, P_no, S, action_list):
action_prob = []
S_idx = self.calc_S_idx(S)
for a in action_list:
action_prob.append(self.Qsa[P_no-1][S_idx,a])
# We consider max Q with epsilon greedy
if tf.squeeze(tf.random.uniform([1,1])) > self.epsilon:
action = action_list[one_of_amax(action_prob)]
else:
action = action_list[np.random.randint(0,len(action_list),1)[0]]
if self.disp_flag:
print('S_idx', S_idx, 'action', action,
'action_list', action_list, 'action_prob', action_prob)
return action
def _r0_policy(self, P_no, S, action_list):
action_prob = []
S_idx = self.calc_S_idx(S)
for a in action_list:
action_prob.append(self.Qsa[P_no-1][S_idx, a])
action_idx = tf.squeeze(tf.random.categorical(tf.math.log([action_prob]),1)).numpy()
if action_idx == len(action_prob): # if all zeros in actoin_prob
action = action_list[tf.squeeze(np.random.randint(0, len(action_list), 1))]
else:
action = action_list[action_idx]
if self.disp_flag:
print('S_idx', S_idx, 'action', action, 'action_list', action_list, 'action_prob', action_prob)
return action
def find_action_list(self, S):
action_list = []
no_occupied = 0
for a in range(self.N_A):
if S[a] == 0:
action_list.append(a)
else:
no_occupied += 1
return action_list, no_occupied
# Take action_prob at the given state
def get_action(self, P_no, S):
"""Return action, done
"""
action_list, no_occupied = self.find_action_list(S)
# Since number of possible actions are reduced,
# denominator is also updated.
action = self.policy(P_no, S, action_list)
done = no_occupied == (self.N_A - 1)
return action, done
def get_action_with_random(self, P_no, S):
"""Return action, done
"""
action_list, no_occupied = self.find_action_list(S)
# Since number of possible actions are reduced,
# denominator is also updated.
if P_no == 1:
action = self.policy(P_no, S, action_list)
else:
action = self.policy_random(P_no, S, action_list)
done = no_occupied == (self.N_A - 1)
return action, done
def get_action_with_human(self, P_no, S):
"""
action, done = get_action_with_human(self, P_no, S)
- Playing with human
[Inputs]
P_no : Human player index which represents 1=fist, 2=second playing
"""
action_list, no_occupied = self.find_action_list(S)
# Since number of possible actions are reduced,
# denominator is also updated.
print('The current game state is:')
print(S.reshape(3,3))
print('Action index:')
print(np.array(range(9)).reshape(3,3))
print('Avaiable actions: ', action_list)
rand_idx = np.random.randint(0, len(action_list))
random_action = action_list[int(rand_idx)]
action = None
while action not in action_list:
action = input_default(f'Type your action (default={random_action}): ', random_action, int)
if action not in action_list:
print('Type action again among in the avaible action list:', action_list)
done = no_occupied == (self.N_A - 1)
return action, done
def play(self, P_no):
"""
Buff = play(self, P_no)
[Inputs]
P_no: player number, which is 1 or 2
[Returns]
Buff = {'P_no': [], 'S':[], 'a':[], 'r':[], 'S_next': []}: gathered information during learning
where S, a, r, S_next are state, action, rewrd, and next state
[Examples]
1. Buff = self.play(1)
2. Buff = self.play(2)
"""
N_A = self.N_A
Buff = {'P_no': [], 'S':[], 'a':[], 'r':[], 'S_next': []}
S = np.zeros((N_A,),dtype='int16') # #state == #action
if self.disp_flag:
print('S:', S)
done = False
while done == False:
action, done = self.get_action(P_no, S)
Buff['P_no'].append(P_no)
Buff['S'].append(S.copy())
Buff['a'].append(action)
set_state_inplace(S, action, P_no)
Buff['S_next'].append(S.copy())
if self.disp_flag:
print('S:', S)
win_player = calc_reward_tf(S)
reward = 0 if win_player == 0 else 1
Buff['r'].append(reward)
P_no = 1 if P_no == 2 else 2
if win_player:
done = True
if self.disp_flag:
if win_player:
print(f'player {win_player} win')
else:
print(f'Tie game')
return Buff
def play_with_random(self, P_no):
"""
Buff = play(self, P_no)
[Inputs]
P_no: player number, which is 1 or 2
[Returns]
Buff = {'P_no': [], 'S':[], 'a':[], 'r':[], 'S_next': []}: gathered information during learning
where S, a, r, S_next are state, action, rewrd, and next state
[Examples]
1. Buff = self.play(1)
2. Buff = self.play(2)
"""
N_A = self.N_A
Buff = {'P_no': [], 'S':[], 'a':[], 'r':[], 'S_next': []}
S = np.zeros((N_A,),dtype='int16') # #state == #action
if self.disp_flag:
print('S:', S)
done = False
while done == False:
action, done = self.get_action_with_random(P_no, S)
Buff['P_no'].append(P_no)
Buff['S'].append(S.copy())
Buff['a'].append(action)
set_state_inplace(S, action, P_no)
Buff['S_next'].append(S.copy())
if self.disp_flag:
print('S:', S)
win_player = calc_reward_tf(S)
reward = 0 if win_player == 0 else 1
Buff['r'].append(reward)
P_no = 1 if P_no == 2 else 2
if win_player:
done = True
if self.disp_flag:
if win_player:
print(f'player {win_player} win')
else:
print(f'Tie game')
return Buff
def play_with_human(self, player_human=1):
"""
Buff = play_with_human(self, P_no)
- Playing with human
[Inputs]
P_no: player number, which is 1 or 2
[Returns]
Buff = {'P_no': [], 'S':[], 'a':[], 'r':[], 'S_next': []}: gathered information during learning
where S, a, r, S_next are state, action, rewrd, and next state
[Examples]
1. Buff = self.play(1)
2. Buff = self.play(2)
"""
N_A = self.N_A
Buff = {'P_no': [], 'S':[], 'a':[], 'r':[], 'S_next': []}
S = np.zeros((N_A,),dtype='int16') # #state == #action
if self.disp_flag:
print('S:', S)
P_no = 1 # set P_no = 1 while player_human could be 1 or 2
done = False
while done == False:
if player_human == P_no:
action, done = self.get_action_with_human(P_no, S)
else:
P_no_trained_agent = 1 # random agent is 2
action, done = self.get_action_with_random(P_no_trained_agent, S)
Buff['P_no'].append(P_no)
Buff['S'].append(S.copy())
Buff['a'].append(action)
set_state_inplace(S, action, P_no)
Buff['S_next'].append(S.copy())
if self.disp_flag:
print('S:', S)
win_player = calc_reward_tf(S)
reward = 0 if win_player == 0 else 1
Buff['r'].append(reward)
P_no = 1 if P_no == 2 else 2
if win_player:
done = True
print(S.reshape(3,3))
if win_player == player_human:
print('You win')
elif win_player != 0:
print('You lose')
else:
print('Tie game')
return Buff
def update_Qsa_inplace(self, Buff, ff=0.9, lr=0.01):
def discounted_inplace(Buff_r):
"""discounted_inplace(Buff_r):
Convert a reward vector to a discounted return vector using ff,
where ff means forgeting factor.
[Input] Buff_r = Buff[r]: stores rewards in each episode
"""
g_prev = 0
for i, r_l in enumerate(reversed(Buff_r)):
Buff_r[-i-1] = r_l + ff * g_prev
g_prev = Buff_r[-i-1]
def updateQsa_inplace(Qsa_player, Buff_player):
if self.disp_flag:
print('---------------------------------------')
print('S, S_idx, a, lr * r, Qsa_player[S_idx,a]')
for S, a, r in zip(Buff_player['S'], Buff_player['a'], Buff_player['r']):
S_idx = self.calc_S_idx(S)
Qsa_player[S_idx,a] += lr * r
if self.disp_flag:
print(S, S_idx, a, lr * r, Qsa_player[S_idx,a])
# def updateQsa_stages_inplace(player, Qsa_player, Buff_player, disp_flag=True):
def buff_depart(Buff):
Buff_dual = [{'S':[], 'a':[], 'r':[]}, {'S':[], 'a':[], 'r':[]}]
for i, (p, S, a, r) in enumerate(zip(Buff['P_no'], Buff['S'], Buff['a'], Buff['r'])):
if i > 0:
# final reward for a player is reward of a next player
prev_p = 2 if p==1 else 1
Buff_dual[prev_p-1]['r'][-1] = -r # 1 for player#2 --> -1 for player#1, vice versa
if self.disp_flag:
print('i, prev_p, Buff_dual[prev_p-1]')
print(i, prev_p, Buff_dual[prev_p-1])
Buff_dual[p-1]['S'].append(S)
Buff_dual[p-1]['a'].append(a)
Buff_dual[p-1]['r'].append(r)
if self.disp_flag:
print('i, p, Buff_dual[p-1]')
print(i, p, Buff_dual[p-1])
return Buff_dual
Buff_dual = buff_depart(Buff)
# player#1
for player in [1,2]:
discounted_inplace(Buff_dual[player-1]['r']) # for player#1
if self.disp_flag:
print('player:', player)
print("Buff_dual[player-1]['r']", Buff_dual[player-1]['r'])
updateQsa_inplace(self.Qsa[player-1], Buff_dual[player-1])
# updateQsa_stages_inplace(player, self.Qsa_stages[player-1], Buff_dual[player-1])
def learning(self, N_episodes=2, ff=0.9, lr=0.01, print_cnt=10):
"""Return:
cnt_trace = [cnt, ...]: cnt vector are stacked in cnt_trace
"""
cnt = [0, 0, 0] # tie, p1, p2
cnt_trace = [cnt.copy()]
player = 1
for episode in range(N_episodes):
# print('===================================')
# Can save this data for play 2 as well
# Decrease epsilon with respect to epside
self.epsilon = 1 / (1 + episode/100)
# self.epsilon = 1 / (1 + episode)
Buff = self.play(player)
self.update_Qsa_inplace(Buff, ff=ff, lr=lr)
win_player = 0 if Buff['r'][-1] == 0 else Buff['P_no'][-1]
cnt[win_player] += 1
cnt_trace.append(cnt.copy())
player = 2 if player == 1 else 1
if episode % print_cnt == 0:
print(episode, cnt)
print('S = [0,0,0, 0,0,0, 0,0,0]')
print('Qsa[0][0,:]', [f'{self.Qsa[0][0,a]:.1e}' for a in range(9)])
print('Qsa[1][0,:]', [f'{self.Qsa[1][0,a]:.1e}' for a in range(9)])
S = [1,1,0, 2,1,2, 1,2,2]
S_idx = self.calc_S_idx(S)
print('S = ', S)
print(f'Qsa[0][{S_idx},:]', [f'{self.Qsa[0][S_idx,a]:.1e}' for a in range(9)])
print(f'Qsa[1][{S_idx},:]', [f'{self.Qsa[1][S_idx,a]:.1e}' for a in range(9)])
S = [1,1,0, 2,0,0, 2,0,0]
S_idx = self.calc_S_idx(S)
print('S = ', S)
print(f'Qsa[0][{S_idx},:]', [f'{self.Qsa[0][S_idx,a]:.1e}' for a in range(9)])
print(f'Qsa[1][{S_idx},:]', [f'{self.Qsa[1][S_idx,a]:.1e}' for a in range(9)])
return cnt_trace
class Testing:
def __init__(self, fn_name):
'''Usages:
- Testing('calc_reward_tf')
'''
if fn_name == 'calc_reward_tf':
self.test_calc_reward_tf()
elif fn_name == 'find_action_list':
self.test_find_action_list()
elif fn_name == 'get_action':
self.test_get_action()
elif fn_name == 'all':
self.test_calc_reward_tf()
self.test_find_action_list()
self.test_get_action()
def test_calc_reward_tf(self):
S_examples = tf.constant([[0,0,0, 0,0,0, 0,0,0],
[1,1,1, 2,0,2, 2,0,0],
[0,0,2, 1,2,1, 2,0,0]])
print('===================================')
print('Testing: calc_reward_tf')
print('[Anwer]')
answer = [0, 1, 2]
print(answer)
print('-------------------------------------')
print('[Test]')
test = [calc_reward_tf(S) for S in S_examples]
print(test)
if test == answer:
print('Test Ok')
else:
print('Test fail')
def test_find_action_list(self):
print('===================================')
print('Testing: test_find_action_list')
print('[Answer]')
print('''[[0 0 0]
[0 0 0]
[0 0 0]] [0, 1, 2, 3, 4, 5, 6, 7, 8] 0
[[0 2 0]
[0 1 0]
[1 0 2]] [0, 2, 3, 5, 7] 4''')
N_A = 9
N_Symbols = 3
my_Q_System = Q_System(N_A, N_Symbols)
print('-------------------------------------')
print('[Test]')
S_l = [[0,0,0, 0,0,0, 0,0,0], [0,2,0, 0,1,0, 1,0,2]]
for S in S_l:
action_list, no_occupied = my_Q_System.find_action_list(S)
print(np.reshape(S,(3,3)), action_list, no_occupied)
def test_get_action(self):
print('===================================')
print('Testing: get_action')
print('''[Answer]
Equally max actions: [0]
S_idx 0 action 0 action_list [0, 1, 2, 3, 4, 5, 6, 7, 8] action_prob [1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
[[0 0 0]
[0 0 0]
[0 0 0]] 1 0
Equally max actions: [0]
S_idx 13950 action 0 action_list [0, 1, 3, 5, 7] action_prob [1.0, 0.0, 0.0, 0.0, 0.0]
[[0 0 2]
[0 1 0]
[1 0 2]] 1 0
Equally max actions: [0]
S_idx 0 action 0 action_list [0, 1, 2, 3, 4, 5, 6, 7, 8] action_prob [1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
[[0 0 0]
[0 0 0]
[0 0 0]] 2 0
Equally max actions: [0]
S_idx 13950 action 0 action_list [0, 1, 3, 5, 7] action_prob [1.0, 0.0, 0.0, 0.0, 0.0]
[[0 0 2]
[0 1 0]
[1 0 2]] 2 0''')
N_A = 9
N_Symbols = 3
my_Q_System = Q_System(N_A, N_Symbols)
print('-------------------------------------')
print('[Test]')
S_l = [[0,0,0, 0,0,0, 0,0,0], [0,0,2, 0,1,0, 1,0,2]]
for P_no in [1,2]:
for S in S_l:
S_idx = my_Q_System.calc_S_idx(S)
my_Q_System.Qsa[P_no-1][S_idx,:] = np.array([1.0,0.0,0, 0,0,0, 0,0,0])
action, _ = my_Q_System.get_action(P_no, S)
print(np.reshape(S,(3,3)), P_no, action)
def _main():
ff = 0.9
lr = 0.01
N_episodes = 2
N_Symbols = 3 # 0=empty, 1=plyaer1, 2=player2
N_A = 9 # (0,0), (0,1), ..., (2,2)
my_Q_System = Q_System(N_A, N_Symbols)
cnt = [0, 0, 0] # tie, p1, p2
player = 1
for episode in range(N_episodes):
# print('===================================')
# Can save this data for play 2 as well
Buff = my_Q_System.play(player)
my_Q_System.update_Qsa_inplace(Buff, ff=ff, lr=lr)
win_player = 0 if Buff['r'][-1] == 0 else Buff['P_no'][-1]
cnt[win_player] += 1
player = 2 if player == 1 else 1
if episode % 10 == 0:
print(episode, cnt)
print(cnt)
def plot_cnt_trace(cnt_trace):
N_cnt = len(cnt_trace)
cnt_d = {'Equal':np.zeros(N_cnt,dtype=int), 'P1':np.zeros(N_cnt,dtype=int), 'P2':np.zeros(N_cnt,dtype=int)}
for i, cnt in enumerate(cnt_trace):
cnt_d['Equal'][i] = cnt[0]
cnt_d['P1'][i] = cnt[1]
cnt_d['P2'][i] = cnt[2]
plt.plot(range(N_cnt), cnt_d['Equal'], label='Equal')
plt.plot(range(N_cnt), cnt_d['P1'], label='Plyaer1 wins')
plt.plot(range(N_cnt), cnt_d['P2'], label='Plyaer2 wins')
plt.xlabel('Episode')
plt.ylabel('Count')
plt.legend(loc=0)
plt.title('Learned (P#1) vs. Random (P#2) policies during learning')
plt.show()
def learning_stage(N_episodes=100, save_flag=True, fig_flag=False):
ff = 0.9
lr = 0.01
N_Symbols = 3 # 0=empty, 1=plyaer1, 2=player2
N_A = 9 # (0,0), (0,1), ..., (2,2)
print_cnt = N_episodes / 10
my_Q_System = Q_System(N_A, N_Symbols)
cnt_trace = my_Q_System.learning(N_episodes=N_episodes, ff=ff, lr=lr, print_cnt=print_cnt)
print('-------------------')
cnt = cnt_trace[-1]
print(N_episodes, cnt)
if save_flag:
my_Q_System.save()
if fig_flag:
plot_cnt_trace(cnt_trace)
return my_Q_System
def input_default(str, defalut_value, dtype=int):
answer = input(str)
if answer == '':
return defalut_value
else:
return dtype(answer)
def main():
Q1 = input_default('1. Loading a trained agent (0) or Learning a new agent (1)? (default=0) ', 0, int)
if Q1 == 0:
print('Loading the trained agent...')
Q2 = input_default('2. Do you want to start first?(0=yes,1=no,default=0) ', 0, int)
player_human = Q2 + 1
if player_human == 1:
print('You=1, Agent=2')
else:
print('Agent=1, You=2')
trained_Q_System = Q_System(None)
trained_Q_System.load()
trained_Q_System.play_with_human(player_human)
# print(len(trained_Q_System.Qsa))
else:
print('Start to learn a new agent...')
Q2 = input_default('2. How many episode do you want to learn?(default=10000) ', 10000, int)
# my_Q_System = learning_stage(N_episodes=Q2, fig_flag=True)
_ = learning_stage(N_episodes=Q2, fig_flag=True)
# print(len(my_Q_System.Qsa))
if __name__ == "__main__":
main()
# Testing('all')
pass
@jskDr
Copy link
Author

jskDr commented Mar 15, 2020

The first version of my Tictactoe.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment