Last active
March 9, 2018 15:01
-
-
Save KriPet/4c9719fafe4dd2f38882169bfae84514 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from functools import lru_cache | |
from collections import Counter | |
class BoardState: | |
def __init__(self, dimension, edges, parent=None, label=None, player1=None): | |
self.label = label | |
self.dimension = dimension | |
if parent is None: | |
self.turn = 0 | |
else: | |
self.turn = parent.turn + 1 | |
player1 = not parent.player1 | |
if player1 is None: | |
player1 = False | |
# player1 is true if player 1 is the player who made the last move, i.e. false if it's player 1's turn | |
self.player1 = player1 | |
self.edges = edges # set of (sorted) tuples | |
self.edges.add((-2, -1)) | |
self.parent = parent | |
self.invalid = False | |
# Define variables for later | |
self.minmax = None | |
self.children = None | |
self.filled = None | |
self.score = None | |
def no_descendants(self): | |
descendants = Counter() | |
descendants[self.turn] = 1 | |
if self.children is not None: | |
for child in self.children: | |
descendants.update(child.no_descendants()) | |
return descendants | |
def calculate_score(self): | |
""" | |
Return number of filled squares (unreachable squares) | |
Also remove unreachable edges from self.edges | |
Score defined as player 1 score - player 2 score (P1 wants to maximize, P2 wants to minimize) | |
""" | |
reached_nodes = {-2, -1} # outside node | |
node_queue = {e[1] for e in self.edges - {(-2, -1)} if | |
e[0] in (-2, -1)} # nodes adjacent to outside node (all nodes are sorted) | |
while len(node_queue) > 0: | |
n = node_queue.pop() | |
if n in reached_nodes: | |
continue | |
reached_nodes.add(n) | |
new_nodes = {nn for nn, e in BoardState.connected_edges(self.dimension)[n] if | |
e in self.edges} - reached_nodes - node_queue | |
node_queue.update(new_nodes) | |
self.edges = {(a, b) for a, b in self.edges if a in reached_nodes and b in reached_nodes} | |
self.filled = self.dimension ** 2 - len(reached_nodes) + 2 | |
if self.parent is None: | |
self.score = self.filled | |
elif not self.player1: # Opponent has just played | |
self.score = self.parent.score - (self.filled - self.parent.filled) # Newly won squares removed | |
else: | |
self.score = self.parent.score + (self.filled - self.parent.filled) | |
self.minmax = (self.score - (self.dimension ** 2 - self.filled), | |
self.score + (self.dimension ** 2 - self.filled)) | |
def create_children(self): | |
self.children = [BoardState(self.dimension, self.edges - {e}, self, label=e) for e in self.edges - {(-2, -1)}] | |
return self.children | |
def upgrade_score(self): | |
if len(self.children) == 0: # No more possible moves | |
return | |
if self.player1: # Player 1 has just played. Opponent will pick child with minimum score | |
_max = min(c.minmax[1] for c in self.children) | |
_min = min(c.minmax[0] for c in self.children) | |
# self.minmax = (max(_min, self.minmax[0]), min(_max, self.minmax[1])) | |
self.minmax = (_min, _max) | |
else: # Opponent has played. I will pick child with max score | |
_max = max(c.minmax[1] for c in self.children) | |
_min = max(c.minmax[0] for c in self.children) | |
# self.minmax = (max(_min, self.minmax[0]), min(_max, self.minmax[1])) | |
self.minmax = (_min, _max) | |
if self.parent is not None: | |
self.parent.upgrade_score() | |
@classmethod | |
def new_board(cls, dimension): | |
# nodes: 0..dimension**2-1 | |
edges = {(-2, -1)} | |
for y in range(dimension): | |
for x in range(dimension): | |
node_id = y * dimension + x | |
if y == 0 or y == dimension - 1: | |
edges.add((-1, node_id)) | |
if y != dimension - 1: | |
edges.add((node_id, node_id + dimension)) | |
if x == 0 or x == dimension - 1: | |
edges.add((-2, node_id)) | |
if x != dimension - 1: | |
edges.add((node_id, node_id + 1)) | |
return BoardState(dimension, edges) | |
@staticmethod | |
@lru_cache() | |
def connected_edges(dimension): | |
edges = {-1: set(), -2: set()} | |
for y in range(dimension): | |
for x in range(dimension): | |
node_id = y * dimension + x | |
nbs = set() | |
if x == 0: | |
nbs.add(-2) | |
edges[-2].add((node_id, (-2, node_id))) | |
nbs.add(node_id + 1) | |
elif x == dimension - 1: | |
nbs.add(-2) | |
edges[-2].add((node_id, (-2, node_id))) | |
nbs.add(node_id - 1) | |
else: | |
nbs.add(node_id + 1) | |
nbs.add(node_id - 1) | |
if y == 0: | |
nbs.add(-1) | |
edges[-1].add((node_id, (-1, node_id))) | |
nbs.add(node_id + dimension) | |
elif y == dimension - 1: | |
nbs.add(-1) | |
edges[-1].add((node_id, (-1, node_id))) | |
nbs.add(node_id - dimension) | |
else: | |
nbs.add(node_id + dimension) | |
nbs.add(node_id - dimension) | |
edges[node_id] = {(nn, (min(nn, node_id), max(nn, node_id))) for nn in nbs} | |
return edges | |
bs = BoardState(2, {(-2, -1), (-2, 0), (-2, 1), (-2, 2), (-2, 3), (-1, 0), (-1, 1), (-1, 2), (-1, 3), (0, 1), (0, 2), | |
(1, 3), (2, 3)}) | |
bsf = BoardState(2, {(-2, -1), (-2, 1), (-2, 2), (-2, 3), (-1, 1), (-1, 2), (-1, 3), (1, 3), (2, 3)}) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Game AI -- this is for you to complete | |
""" | |
import requests | |
import time | |
import random | |
from unthreaded_ai import UnthreadedAI | |
from BoardState import BoardState | |
SERVER = "http://127.0.0.1:5000" # this will not change | |
BASE_TEAM_ID = "xyzzy" | |
TEAM_ID = BASE_TEAM_ID | |
TIME_SAFETY_MARGIN = 500 # in ms, try to deliver this amount of time before time runs out | |
def reg(conf): | |
global TEAM_ID | |
# register using a fixed team_id | |
resp = requests.get(SERVER + "/reg/" + TEAM_ID).json() # register 1st team | |
if resp["response"] == "OK": | |
# save which player we are | |
print(resp) | |
print("Registered successfully as Player {}".format(resp["player"])) | |
conf["player"] = resp["player"] | |
return | |
elif resp["response"] == "ERROR": | |
if resp["error_code"] in (1, 2): | |
print("Bad team id: " + TEAM_ID) | |
TEAM_ID = BASE_TEAM_ID + "".join(random.choice("0123456789") for _ in range(5)) | |
return reg(conf) | |
elif resp["error_code"] == 3: | |
print("Game under way") | |
exit(5) | |
else: | |
raise Exception("Unexpected message from server: {}".format(resp["response"])) | |
def compute_for(ai, s): | |
now = time.time() | |
while time.time() - now < s: | |
ai.check_next() | |
def play(conf): | |
status = requests.get(SERVER + "/status").json() # request the status of the game | |
board_size = status['board_size'] | |
ai = UnthreadedAI(BoardState.new_board(board_size), {1: True, 2: False}[conf['player']]) | |
game_over = False | |
while not game_over: | |
compute_for(ai, 0.2) # wait a bit before making a new status request | |
status = requests.get(SERVER + "/status").json() # request the status of the game | |
if status["status_code"] > 300: | |
game_over = True | |
elif status["status_code"] == 200 + conf["player"]: # it's our turn | |
last_move = status["last_move"] | |
if last_move != "": | |
ai.update_with_move_formatted(last_move) | |
time_left = status["time_left"] | |
print("It's our turn ({}ms left)".format(time_left)) | |
if time_left > TIME_SAFETY_MARGIN: | |
compute_time = time_left - TIME_SAFETY_MARGIN | |
print("Computing for {} more ms".format(compute_time)) | |
compute_for(ai, compute_time / 1000) | |
move = ai.get_best_move_formatted() | |
ai.update_with_move_formatted(move) | |
status = requests.get(SERVER + "/move/" + TEAM_ID + "/" + move).json() | |
if status["status_code"] < 400: | |
if status["status_code"] % 10 == conf["player"]: | |
print("We won") | |
else: | |
print("We lost") | |
elif status["status_code"] < 500: | |
if status["status_code"] % 10 == conf["player"]: | |
print("We made an illegal move") | |
else: | |
print("Other player made an illegal move") | |
if status["status_code"] % 10 == conf["player"]: | |
print("We timed out") | |
else: | |
print("Other player timed out") | |
if __name__ == "__main__": | |
configuration = dict() | |
player = reg(configuration) | |
play(configuration) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import heapq | |
from itertools import count | |
class UnthreadedAI: | |
TWEAK_PARAMETER = .5 | |
def __init__(self, initial_state, player1=True): | |
self.player1 = player1 | |
# Set up initial boardstate | |
self.turn = 0 if player1 else 1 | |
initial_state.calculate_score() | |
self.root = initial_state | |
# Set up priorityheap | |
self.p_queue = [] | |
self.heapcount = count(0, 1) | |
self.add_to_queue(self.root) | |
self.check_next() | |
# Define variables for later | |
self.best_child = None | |
def __calc_score(self, bs): | |
"""Smaller is better""" | |
return (bs.minmax[0] + self.TWEAK_PARAMETER * bs.minmax[1]) * (-1 if self.player1 else 1) | |
def add_to_queue(self, bs): | |
sort_tuple = (bs.turn, self.__calc_score(bs), hash("salt" + str(next(self.heapcount))), next(self.heapcount)) | |
heapq.heappush(self.p_queue, (sort_tuple, bs)) | |
def is_descendant_of_root(self, bs): | |
d = bs | |
while d.turn > self.root.turn: | |
d = d.parent | |
return d == self.root | |
def check_next(self): | |
# Checks next BoardState in priorityheap. | |
# Returns True if done, false if no more leaves | |
while True: | |
if len(self.p_queue) == 0: | |
return False # We are done | |
_, bs = heapq.heappop(self.p_queue) | |
if bs is self.root: | |
break | |
elif bs.turn <= self.root.turn: | |
continue | |
elif not self.is_descendant_of_root(bs): | |
continue | |
elif bs.minmax[1] >= 0: # If not, guaranteed loss | |
break | |
children = bs.create_children() | |
for c in children: | |
c.calculate_score() | |
if c.minmax[1] >= 0: # If not, guaranteed loss | |
self.add_to_queue(c) | |
bs.upgrade_score() | |
return True | |
def get_best_move(self): | |
cand_children = [c for c in self.root.children if c.children is not None] | |
if len(cand_children) == 0: | |
print("Might do something stupid!!") | |
self.best_child = sorted(self.root.children, key=self.__calc_score)[0] | |
else: | |
self.best_child = sorted(cand_children, key=self.__calc_score)[0] | |
return self.best_child.label | |
def update_with_move(self, move): | |
print("Turn:", self.turn - (0 if self.player1 else 1), | |
("My move: " if (self.turn + self.player1) % 2 == self.player1 else | |
"My opponent's move:"), move) | |
self.turn += 1 | |
new_root = [c for c in self.root.children if c.label == move][0] | |
new_root.parent = None | |
if new_root.children is None: | |
self.p_queue = [] | |
children = new_root.create_children() | |
for c in children: | |
c.calculate_score() | |
if c.minmax[1] > 0: | |
self.add_to_queue(c) | |
new_root.upgrade_score() | |
self.root = new_root | |
def convert_from_move(self, move): | |
dim = self.root.dimension | |
x, y, border = move.split(",") | |
x = int(x) | |
y = int(y) | |
node_a = y * dim + x | |
if y == 0 and border == "top": | |
node_b = -1 | |
elif y == dim - 1 and border == "bottom": | |
node_b = -1 | |
elif x == 0 and border == "left": | |
node_b = -2 | |
elif x == dim - 1 and border == "right": | |
node_b = -2 | |
elif border == "right": | |
node_b = node_a + 1 | |
elif border == "bottom": | |
node_b = node_a + dim | |
elif border == "left": | |
node_b = node_a - 1 | |
else: # border == "top" | |
node_b = node_a - dim | |
return tuple(sorted((node_a, node_b))) | |
def convert_to_move(self, edge): | |
dim = self.root.dimension | |
y, x = divmod(edge[0], dim) | |
if edge[0] == -1: | |
y, x = divmod(edge[1], dim) | |
if y == 0: | |
border = "top" | |
else: | |
border = "bottom" | |
elif edge[0] == -2: | |
y, x = divmod(edge[1], dim) | |
if x == 0: | |
border = "left" | |
else: | |
border = "right" | |
y, x = divmod(edge[1], dim) | |
elif edge[1] == edge[0] + 1: | |
border = "right" | |
elif edge[1] == edge[0] + dim: | |
border = "bottom" | |
elif edge[1] == edge[0] - 1: | |
border = "left" | |
elif edge[1] == edge[0] - dim: | |
border = "top" | |
else: | |
raise Exception("Can't translate edge {} to move".format(edge)) | |
return ",".join(str(p) for p in (x, y, border)) | |
def update_with_move_formatted(self, move): | |
self.update_with_move(self.convert_from_move(move)) | |
def get_best_move_formatted(self): | |
return self.convert_to_move(self.get_best_move()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment