Skip to content

Instantly share code, notes, and snippets.

@KriPet
Last active March 9, 2018 15:01
Show Gist options
  • Save KriPet/4c9719fafe4dd2f38882169bfae84514 to your computer and use it in GitHub Desktop.
Save KriPet/4c9719fafe4dd2f38882169bfae84514 to your computer and use it in GitHub Desktop.
from functools import lru_cache
from collections import Counter
class BoardState:
def __init__(self, dimension, edges, parent=None, label=None, player1=None):
self.label = label
self.dimension = dimension
if parent is None:
self.turn = 0
else:
self.turn = parent.turn + 1
player1 = not parent.player1
if player1 is None:
player1 = False
# player1 is true if player 1 is the player who made the last move, i.e. false if it's player 1's turn
self.player1 = player1
self.edges = edges # set of (sorted) tuples
self.edges.add((-2, -1))
self.parent = parent
self.invalid = False
# Define variables for later
self.minmax = None
self.children = None
self.filled = None
self.score = None
def no_descendants(self):
descendants = Counter()
descendants[self.turn] = 1
if self.children is not None:
for child in self.children:
descendants.update(child.no_descendants())
return descendants
def calculate_score(self):
"""
Return number of filled squares (unreachable squares)
Also remove unreachable edges from self.edges
Score defined as player 1 score - player 2 score (P1 wants to maximize, P2 wants to minimize)
"""
reached_nodes = {-2, -1} # outside node
node_queue = {e[1] for e in self.edges - {(-2, -1)} if
e[0] in (-2, -1)} # nodes adjacent to outside node (all nodes are sorted)
while len(node_queue) > 0:
n = node_queue.pop()
if n in reached_nodes:
continue
reached_nodes.add(n)
new_nodes = {nn for nn, e in BoardState.connected_edges(self.dimension)[n] if
e in self.edges} - reached_nodes - node_queue
node_queue.update(new_nodes)
self.edges = {(a, b) for a, b in self.edges if a in reached_nodes and b in reached_nodes}
self.filled = self.dimension ** 2 - len(reached_nodes) + 2
if self.parent is None:
self.score = self.filled
elif not self.player1: # Opponent has just played
self.score = self.parent.score - (self.filled - self.parent.filled) # Newly won squares removed
else:
self.score = self.parent.score + (self.filled - self.parent.filled)
self.minmax = (self.score - (self.dimension ** 2 - self.filled),
self.score + (self.dimension ** 2 - self.filled))
def create_children(self):
self.children = [BoardState(self.dimension, self.edges - {e}, self, label=e) for e in self.edges - {(-2, -1)}]
return self.children
def upgrade_score(self):
if len(self.children) == 0: # No more possible moves
return
if self.player1: # Player 1 has just played. Opponent will pick child with minimum score
_max = min(c.minmax[1] for c in self.children)
_min = min(c.minmax[0] for c in self.children)
# self.minmax = (max(_min, self.minmax[0]), min(_max, self.minmax[1]))
self.minmax = (_min, _max)
else: # Opponent has played. I will pick child with max score
_max = max(c.minmax[1] for c in self.children)
_min = max(c.minmax[0] for c in self.children)
# self.minmax = (max(_min, self.minmax[0]), min(_max, self.minmax[1]))
self.minmax = (_min, _max)
if self.parent is not None:
self.parent.upgrade_score()
@classmethod
def new_board(cls, dimension):
# nodes: 0..dimension**2-1
edges = {(-2, -1)}
for y in range(dimension):
for x in range(dimension):
node_id = y * dimension + x
if y == 0 or y == dimension - 1:
edges.add((-1, node_id))
if y != dimension - 1:
edges.add((node_id, node_id + dimension))
if x == 0 or x == dimension - 1:
edges.add((-2, node_id))
if x != dimension - 1:
edges.add((node_id, node_id + 1))
return BoardState(dimension, edges)
@staticmethod
@lru_cache()
def connected_edges(dimension):
edges = {-1: set(), -2: set()}
for y in range(dimension):
for x in range(dimension):
node_id = y * dimension + x
nbs = set()
if x == 0:
nbs.add(-2)
edges[-2].add((node_id, (-2, node_id)))
nbs.add(node_id + 1)
elif x == dimension - 1:
nbs.add(-2)
edges[-2].add((node_id, (-2, node_id)))
nbs.add(node_id - 1)
else:
nbs.add(node_id + 1)
nbs.add(node_id - 1)
if y == 0:
nbs.add(-1)
edges[-1].add((node_id, (-1, node_id)))
nbs.add(node_id + dimension)
elif y == dimension - 1:
nbs.add(-1)
edges[-1].add((node_id, (-1, node_id)))
nbs.add(node_id - dimension)
else:
nbs.add(node_id + dimension)
nbs.add(node_id - dimension)
edges[node_id] = {(nn, (min(nn, node_id), max(nn, node_id))) for nn in nbs}
return edges
bs = BoardState(2, {(-2, -1), (-2, 0), (-2, 1), (-2, 2), (-2, 3), (-1, 0), (-1, 1), (-1, 2), (-1, 3), (0, 1), (0, 2),
(1, 3), (2, 3)})
bsf = BoardState(2, {(-2, -1), (-2, 1), (-2, 2), (-2, 3), (-1, 1), (-1, 2), (-1, 3), (1, 3), (2, 3)})
"""
Game AI -- this is for you to complete
"""
import requests
import time
import random
from unthreaded_ai import UnthreadedAI
from BoardState import BoardState
SERVER = "http://127.0.0.1:5000" # this will not change
BASE_TEAM_ID = "xyzzy"
TEAM_ID = BASE_TEAM_ID
TIME_SAFETY_MARGIN = 500 # in ms, try to deliver this amount of time before time runs out
def reg(conf):
global TEAM_ID
# register using a fixed team_id
resp = requests.get(SERVER + "/reg/" + TEAM_ID).json() # register 1st team
if resp["response"] == "OK":
# save which player we are
print(resp)
print("Registered successfully as Player {}".format(resp["player"]))
conf["player"] = resp["player"]
return
elif resp["response"] == "ERROR":
if resp["error_code"] in (1, 2):
print("Bad team id: " + TEAM_ID)
TEAM_ID = BASE_TEAM_ID + "".join(random.choice("0123456789") for _ in range(5))
return reg(conf)
elif resp["error_code"] == 3:
print("Game under way")
exit(5)
else:
raise Exception("Unexpected message from server: {}".format(resp["response"]))
def compute_for(ai, s):
now = time.time()
while time.time() - now < s:
ai.check_next()
def play(conf):
status = requests.get(SERVER + "/status").json() # request the status of the game
board_size = status['board_size']
ai = UnthreadedAI(BoardState.new_board(board_size), {1: True, 2: False}[conf['player']])
game_over = False
while not game_over:
compute_for(ai, 0.2) # wait a bit before making a new status request
status = requests.get(SERVER + "/status").json() # request the status of the game
if status["status_code"] > 300:
game_over = True
elif status["status_code"] == 200 + conf["player"]: # it's our turn
last_move = status["last_move"]
if last_move != "":
ai.update_with_move_formatted(last_move)
time_left = status["time_left"]
print("It's our turn ({}ms left)".format(time_left))
if time_left > TIME_SAFETY_MARGIN:
compute_time = time_left - TIME_SAFETY_MARGIN
print("Computing for {} more ms".format(compute_time))
compute_for(ai, compute_time / 1000)
move = ai.get_best_move_formatted()
ai.update_with_move_formatted(move)
status = requests.get(SERVER + "/move/" + TEAM_ID + "/" + move).json()
if status["status_code"] < 400:
if status["status_code"] % 10 == conf["player"]:
print("We won")
else:
print("We lost")
elif status["status_code"] < 500:
if status["status_code"] % 10 == conf["player"]:
print("We made an illegal move")
else:
print("Other player made an illegal move")
if status["status_code"] % 10 == conf["player"]:
print("We timed out")
else:
print("Other player timed out")
if __name__ == "__main__":
configuration = dict()
player = reg(configuration)
play(configuration)
import heapq
from itertools import count
class UnthreadedAI:
TWEAK_PARAMETER = .5
def __init__(self, initial_state, player1=True):
self.player1 = player1
# Set up initial boardstate
self.turn = 0 if player1 else 1
initial_state.calculate_score()
self.root = initial_state
# Set up priorityheap
self.p_queue = []
self.heapcount = count(0, 1)
self.add_to_queue(self.root)
self.check_next()
# Define variables for later
self.best_child = None
def __calc_score(self, bs):
"""Smaller is better"""
return (bs.minmax[0] + self.TWEAK_PARAMETER * bs.minmax[1]) * (-1 if self.player1 else 1)
def add_to_queue(self, bs):
sort_tuple = (bs.turn, self.__calc_score(bs), hash("salt" + str(next(self.heapcount))), next(self.heapcount))
heapq.heappush(self.p_queue, (sort_tuple, bs))
def is_descendant_of_root(self, bs):
d = bs
while d.turn > self.root.turn:
d = d.parent
return d == self.root
def check_next(self):
# Checks next BoardState in priorityheap.
# Returns True if done, false if no more leaves
while True:
if len(self.p_queue) == 0:
return False # We are done
_, bs = heapq.heappop(self.p_queue)
if bs is self.root:
break
elif bs.turn <= self.root.turn:
continue
elif not self.is_descendant_of_root(bs):
continue
elif bs.minmax[1] >= 0: # If not, guaranteed loss
break
children = bs.create_children()
for c in children:
c.calculate_score()
if c.minmax[1] >= 0: # If not, guaranteed loss
self.add_to_queue(c)
bs.upgrade_score()
return True
def get_best_move(self):
cand_children = [c for c in self.root.children if c.children is not None]
if len(cand_children) == 0:
print("Might do something stupid!!")
self.best_child = sorted(self.root.children, key=self.__calc_score)[0]
else:
self.best_child = sorted(cand_children, key=self.__calc_score)[0]
return self.best_child.label
def update_with_move(self, move):
print("Turn:", self.turn - (0 if self.player1 else 1),
("My move: " if (self.turn + self.player1) % 2 == self.player1 else
"My opponent's move:"), move)
self.turn += 1
new_root = [c for c in self.root.children if c.label == move][0]
new_root.parent = None
if new_root.children is None:
self.p_queue = []
children = new_root.create_children()
for c in children:
c.calculate_score()
if c.minmax[1] > 0:
self.add_to_queue(c)
new_root.upgrade_score()
self.root = new_root
def convert_from_move(self, move):
dim = self.root.dimension
x, y, border = move.split(",")
x = int(x)
y = int(y)
node_a = y * dim + x
if y == 0 and border == "top":
node_b = -1
elif y == dim - 1 and border == "bottom":
node_b = -1
elif x == 0 and border == "left":
node_b = -2
elif x == dim - 1 and border == "right":
node_b = -2
elif border == "right":
node_b = node_a + 1
elif border == "bottom":
node_b = node_a + dim
elif border == "left":
node_b = node_a - 1
else: # border == "top"
node_b = node_a - dim
return tuple(sorted((node_a, node_b)))
def convert_to_move(self, edge):
dim = self.root.dimension
y, x = divmod(edge[0], dim)
if edge[0] == -1:
y, x = divmod(edge[1], dim)
if y == 0:
border = "top"
else:
border = "bottom"
elif edge[0] == -2:
y, x = divmod(edge[1], dim)
if x == 0:
border = "left"
else:
border = "right"
y, x = divmod(edge[1], dim)
elif edge[1] == edge[0] + 1:
border = "right"
elif edge[1] == edge[0] + dim:
border = "bottom"
elif edge[1] == edge[0] - 1:
border = "left"
elif edge[1] == edge[0] - dim:
border = "top"
else:
raise Exception("Can't translate edge {} to move".format(edge))
return ",".join(str(p) for p in (x, y, border))
def update_with_move_formatted(self, move):
self.update_with_move(self.convert_from_move(move))
def get_best_move_formatted(self):
return self.convert_to_move(self.get_best_move())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment