Created
February 13, 2024 08:31
-
-
Save pnck/0e8b2c5acd0985c72c86567579846bb1 to your computer and use it in GitHub Desktop.
HGAME2024 WEEK2 华容道 solver
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import numpy as np | |
import json | |
port = 31541 | |
class NotMoveable(RuntimeWarning): ... | |
class Maze: | |
def _mov_main(self, pos, direction): | |
assert self._m[pos] == 5 | |
r, c = pos | |
if direction == 1: | |
assert self._m[r - 1, c] == 0 and self._m[r - 1, c + 1] == 0 | |
self._m[r - 1, c] = 5 | |
self._m[r - 1, c + 1] = 1 | |
self._m[r, c] = 1 | |
self._m[r, c + 1] = 1 | |
self._m[r + 1, c] = 0 | |
self._m[r + 1, c + 1] = 0 | |
elif direction == 2: | |
assert self._m[r, c + 2] == 0 and self._m[r + 1, c + 2] == 0 | |
self._m[r, c] = 0 | |
self._m[r, c + 1] = 5 | |
self._m[r, c + 2] = 1 | |
self._m[r + 1, c] = 0 | |
self._m[r + 1, c + 1] = 1 | |
self._m[r + 1, c + 2] = 1 | |
elif direction == 3: | |
assert self._m[r + 2, c] == 0 and self._m[r + 2, c + 1] == 0 | |
self._m[r, c] = 0 | |
self._m[r, c + 1] = 0 | |
self._m[r + 1, c] = 5 | |
self._m[r + 1, c + 1] = 1 | |
self._m[r + 2, c] = 1 | |
self._m[r + 2, c + 1] = 1 | |
elif direction == 4: | |
assert self._m[r, c - 1] == 0 and self._m[r + 1, c - 1] == 0 | |
self._m[r, c - 1] = 5 | |
self._m[r + 1, c - 1] = 1 | |
self._m[r, c] = 1 | |
self._m[r + 1, c] = 1 | |
self._m[r, c + 1] = 0 | |
self._m[r + 1, c + 1] = 0 | |
self._record_step(pos, direction) | |
def _mov_small(self, pos, direction): | |
assert self._m[pos] == 2 | |
r, c = pos | |
if direction == 1: | |
assert self._m[r - 1, c] == 0 | |
self._m[r - 1, c] = 2 | |
elif direction == 2: | |
assert self._m[r, c + 1] == 0 | |
self._m[r, c + 1] = 2 | |
elif direction == 3: | |
assert self._m[r + 1, c] == 0 | |
self._m[r + 1, c] = 2 | |
elif direction == 4: | |
assert self._m[r, c - 1] == 0 | |
self._m[r, c - 1] = 2 | |
self._m[r, c] = 0 | |
self._record_step(pos, direction) | |
def _mov_vertical(self, pos, direction): | |
assert self._m[pos] == 3 | |
r, c = pos | |
if direction == 1: | |
assert self._m[r - 1, c] == 0 | |
self._m[r - 1, c] = 3 | |
self._m[r, c] = 1 | |
self._m[r + 1, c] = 0 | |
elif direction == 2: | |
assert self._m[r, c + 1] == 0 and self._m[r + 1, c + 1] == 0 | |
self._m[r, c] = 0 | |
self._m[r + 1, c] = 0 | |
self._m[r, c + 1] = 3 | |
self._m[r + 1, c + 1] = 1 | |
elif direction == 3: | |
assert self._m[r + 2, c] == 0 | |
self._m[r, c] = 0 | |
self._m[r + 1, c] = 3 | |
self._m[r + 2, c] = 1 | |
elif direction == 4: | |
assert self._m[r, c - 1] == 0 and self._m[r + 1, c - 1] == 0 | |
self._m[r, c - 1] = 3 | |
self._m[r + 1, c - 1] = 1 | |
self._m[r, c] = 0 | |
self._m[r + 1, c] = 0 | |
self._record_step(pos, direction) | |
def _mov_horizontal(self, pos, direction): | |
assert self._m[pos] == 4 | |
r, c = pos | |
if direction == 1: | |
assert self._m[r - 1, c] == 0 and self._m[r - 1, c + 1] == 0 | |
self._m[r - 1, c] = 4 | |
self._m[r - 1, c + 1] = 1 | |
self._m[r, c] = 0 | |
self._m[r, c + 1] = 0 | |
elif direction == 2: | |
assert self._m[r, c + 2] == 0 | |
self._m[r, c] = 0 | |
self._m[r, c + 1] = 4 | |
self._m[r, c + 2] = 1 | |
elif direction == 3: | |
assert self._m[r + 1, c] == 0 and self._m[r + 1, c + 1] == 0 | |
self._m[r, c] = 0 | |
self._m[r, c + 1] = 0 | |
self._m[r + 1, c] = 4 | |
self._m[r + 1, c + 1] = 1 | |
elif direction == 4: | |
assert self._m[r, c - 1] == 0 | |
self._m[r, c - 1] = 4 | |
self._m[r, c] = 1 | |
self._m[r, c + 1] = 0 | |
self._record_step(pos, direction) | |
def _find_key_and_move(self, pos, direction): | |
r, c = pos | |
assert self._m[pos] == 1 | |
# 5 3 | |
# 4 1 | |
if self._m[r - 1, c] == 3: | |
return self._mov_vertical((r - 1, c), direction) | |
elif self._m[r, c - 1] == 4: | |
return self._mov_horizontal((r, c - 1), direction) | |
else: | |
if self._m[r - 1, c] == 5: | |
return self._mov_main((r - 1, c), direction) | |
elif self._m[r, c - 1] == 5: | |
return self._mov_main((r, c + 1), direction) | |
elif self._m[r - 1, c - 1] == 5: | |
return self._mov_main((r - 1, c - 1), direction) | |
raise NotMoveable | |
_Mov_Actions = { | |
1: _find_key_and_move, | |
2: _mov_small, | |
3: _mov_vertical, | |
4: _mov_horizontal, | |
5: _mov_main, | |
} | |
def _record_step(self, pos, direction): | |
self._steps.append( | |
((int(pos[0]), int(pos[1])), int(direction)) | |
) # avoid numpy.int64, which is not json serializable | |
def move(self, pos, direction): | |
try: | |
assert pos[0] >= 0 and pos[0] < 5 and pos[1] >= 0 and pos[1] < 4 | |
self._Mov_Actions[self._m[pos]](self, pos, direction) | |
if self.echo: | |
self.show() | |
return self | |
except IndexError: | |
raise NotMoveable | |
except AssertionError: | |
raise NotMoveable | |
except KeyError: | |
raise NotMoveable | |
def __init__(self, layout, gameId, echo=False): | |
self.layout = layout | |
self.gameId = gameId | |
self._m = np.fromiter(layout, dtype=np.byte).reshape(5, 4) | |
self._steps = [] | |
self.echo = echo | |
if echo: | |
print(f"layout: {self.layout}\n{self._m}") | |
@property | |
def current(self): | |
return "".join([str(n) for n in self._m.flatten().tolist()]) | |
@property | |
def steps(self): | |
return self._steps | |
def show(self): | |
print(f"-- CURRENT STATE --\n{self._m}") | |
def copy(self): | |
m = Maze(self.current, self.gameId, echo=self.echo) | |
m._steps = [] + self.steps | |
return m | |
def get_maze(): | |
url = f"http://106.14.57.14:{port}/api/newgame" | |
response = requests.get(url) | |
data = response.json() | |
print(data) | |
return Maze(data["layout"], data["gameId"]) | |
def search(ms, max_depth=20, depth=0, states=None, debug=False): # bfs | |
if not states and len(ms) == 1: | |
states = {} | |
states[ms[0].current] = ms[0] | |
if depth > max_depth: | |
raise RecursionError(f"Too many steps, give up at {depth}") | |
_next = [] | |
for maze in ms: | |
main_at = tuple(*zip(*np.where(maze._m == 5))) | |
if debug: | |
print(f"steps: {len(maze.steps)} main at: {main_at}") | |
maze.show() | |
if main_at == (3, 1): | |
print(f"({maze.gameId}) found solution, taking {depth} steps") | |
return maze | |
possible = [] | |
for zpos in zip(*np.where(maze._m == 0)): # seach around empty blocks | |
def _noerr(f): | |
try: | |
f() | |
except NotMoveable: | |
pass | |
# move the nearby solid block to the empty position | |
_noerr(lambda: possible.append(maze.copy().move((zpos[0] - 1, zpos[1]), 3))) | |
_noerr(lambda: possible.append(maze.copy().move((zpos[0], zpos[1] - 1), 2))) | |
_noerr(lambda: possible.append(maze.copy().move((zpos[0] + 1, zpos[1]), 1))) | |
_noerr(lambda: possible.append(maze.copy().move((zpos[0], zpos[1] + 1), 4))) | |
_count = 0 | |
for n in possible: | |
if n.current not in states: | |
states[n.current] = n | |
_next.append(n) | |
_count += 1 | |
if debug: | |
print(f"near {zpos} possibles: {_count}/{len(possible)}") | |
if len(_next) == 0: | |
if debug: | |
print(f"Cannot move further at depth {depth}") | |
return None | |
if debug: | |
print(f"next depth: {depth + 1}, tries: {len(_next)}, recorded: {len(states)}") | |
return search(_next, max_depth, depth + 1, states, debug=debug) | |
def LetsRockNRoll(ALL_SOLUTIONS=None): | |
print(f"Use cached {len(ALL_SOLUTIONS)} solutions") | |
M = get_maze() | |
def submit(gameId, solution): | |
def build_steps(steps): | |
return [ | |
{"position": int(p[0] * 4 + p[1]), "direction": int(d)} | |
for p, d in steps | |
] | |
url = f"http://106.14.57.14:{port}/api/submit/{gameId}" | |
response = requests.post(url, json=build_steps(solution)) | |
# print(response.json()) | |
return response.json() | |
result = {"status": "next", "game_stage": {"layout": M.layout, "round": 0}} | |
while result["status"] == "next": | |
layout = result["game_stage"]["layout"] | |
round = result["game_stage"]["round"] | |
try: | |
if ALL_SOLUTIONS and layout in ALL_SOLUTIONS: | |
print(f"({M.gameId}) Use previous solution of {layout}") | |
result = submit(M.gameId, ALL_SOLUTIONS[layout]) | |
else: | |
solution = search([Maze(layout, M.gameId)], max_depth=200) | |
assert solution | |
ALL_SOLUTIONS[layout] = solution.steps | |
result = submit(M.gameId, solution.steps) | |
except Exception as e: | |
print(e) | |
print(f"Game {M.gameId} finished in {round} rounds, status: {result['status']}") | |
if result["status"] not in ("next", "lose"): | |
raise SystemExit(f"Finished! status => {result}") | |
return ALL_SOLUTIONS | |
if __name__ == "__main__": | |
from sys import argv | |
fname = "solutions.json" | |
threads = 8 | |
if len(argv) > 1: | |
fname = argv[1] | |
if len(argv) > 2: | |
threads = int(argv[2]) | |
print(f"Using cache file {fname}, solver: {threads}") | |
ALL_SOLUTIONS = {} | |
try: | |
with open(fname, "r") as f: | |
ALL_SOLUTIONS = json.load(f) | |
print(f"Loaded {len(ALL_SOLUTIONS)} solutions") | |
except: | |
pass | |
from concurrent.futures import ProcessPoolExecutor as Pool, ALL_COMPLETED, wait | |
try: | |
with Pool(max_workers=2) as p: | |
futures = [] | |
while True: | |
for i in range(2): | |
futures.append(p.submit(LetsRockNRoll, ALL_SOLUTIONS)) | |
wait(futures, return_when=ALL_COMPLETED) | |
for f in futures: | |
ALL_SOLUTIONS.update(f.result()) | |
print(f"Saving {len(ALL_SOLUTIONS)} solutions") | |
open(fname, "w").write(json.dumps(ALL_SOLUTIONS)) | |
for f in futures: | |
if f.exception(): | |
raise f.exception() | |
except SystemExit as fin: | |
print(fin) | |
except KeyboardInterrupt: | |
pass | |
print(f"Saving {len(ALL_SOLUTIONS)} solutions") | |
open(fname, "w").write(json.dumps(ALL_SOLUTIONS)) | |
# assert search([Maze("35131111202032231411", 0, echo=True)], max_depth=200, debug=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment