Skip to content

Instantly share code, notes, and snippets.

@pnck
Created February 13, 2024 08:31
Show Gist options
  • Save pnck/0e8b2c5acd0985c72c86567579846bb1 to your computer and use it in GitHub Desktop.
Save pnck/0e8b2c5acd0985c72c86567579846bb1 to your computer and use it in GitHub Desktop.
HGAME2024 WEEK2 华容道 solver
import requests
import numpy as np
import json
port = 31541
class NotMoveable(RuntimeWarning): ...
class Maze:
def _mov_main(self, pos, direction):
assert self._m[pos] == 5
r, c = pos
if direction == 1:
assert self._m[r - 1, c] == 0 and self._m[r - 1, c + 1] == 0
self._m[r - 1, c] = 5
self._m[r - 1, c + 1] = 1
self._m[r, c] = 1
self._m[r, c + 1] = 1
self._m[r + 1, c] = 0
self._m[r + 1, c + 1] = 0
elif direction == 2:
assert self._m[r, c + 2] == 0 and self._m[r + 1, c + 2] == 0
self._m[r, c] = 0
self._m[r, c + 1] = 5
self._m[r, c + 2] = 1
self._m[r + 1, c] = 0
self._m[r + 1, c + 1] = 1
self._m[r + 1, c + 2] = 1
elif direction == 3:
assert self._m[r + 2, c] == 0 and self._m[r + 2, c + 1] == 0
self._m[r, c] = 0
self._m[r, c + 1] = 0
self._m[r + 1, c] = 5
self._m[r + 1, c + 1] = 1
self._m[r + 2, c] = 1
self._m[r + 2, c + 1] = 1
elif direction == 4:
assert self._m[r, c - 1] == 0 and self._m[r + 1, c - 1] == 0
self._m[r, c - 1] = 5
self._m[r + 1, c - 1] = 1
self._m[r, c] = 1
self._m[r + 1, c] = 1
self._m[r, c + 1] = 0
self._m[r + 1, c + 1] = 0
self._record_step(pos, direction)
def _mov_small(self, pos, direction):
assert self._m[pos] == 2
r, c = pos
if direction == 1:
assert self._m[r - 1, c] == 0
self._m[r - 1, c] = 2
elif direction == 2:
assert self._m[r, c + 1] == 0
self._m[r, c + 1] = 2
elif direction == 3:
assert self._m[r + 1, c] == 0
self._m[r + 1, c] = 2
elif direction == 4:
assert self._m[r, c - 1] == 0
self._m[r, c - 1] = 2
self._m[r, c] = 0
self._record_step(pos, direction)
def _mov_vertical(self, pos, direction):
assert self._m[pos] == 3
r, c = pos
if direction == 1:
assert self._m[r - 1, c] == 0
self._m[r - 1, c] = 3
self._m[r, c] = 1
self._m[r + 1, c] = 0
elif direction == 2:
assert self._m[r, c + 1] == 0 and self._m[r + 1, c + 1] == 0
self._m[r, c] = 0
self._m[r + 1, c] = 0
self._m[r, c + 1] = 3
self._m[r + 1, c + 1] = 1
elif direction == 3:
assert self._m[r + 2, c] == 0
self._m[r, c] = 0
self._m[r + 1, c] = 3
self._m[r + 2, c] = 1
elif direction == 4:
assert self._m[r, c - 1] == 0 and self._m[r + 1, c - 1] == 0
self._m[r, c - 1] = 3
self._m[r + 1, c - 1] = 1
self._m[r, c] = 0
self._m[r + 1, c] = 0
self._record_step(pos, direction)
def _mov_horizontal(self, pos, direction):
assert self._m[pos] == 4
r, c = pos
if direction == 1:
assert self._m[r - 1, c] == 0 and self._m[r - 1, c + 1] == 0
self._m[r - 1, c] = 4
self._m[r - 1, c + 1] = 1
self._m[r, c] = 0
self._m[r, c + 1] = 0
elif direction == 2:
assert self._m[r, c + 2] == 0
self._m[r, c] = 0
self._m[r, c + 1] = 4
self._m[r, c + 2] = 1
elif direction == 3:
assert self._m[r + 1, c] == 0 and self._m[r + 1, c + 1] == 0
self._m[r, c] = 0
self._m[r, c + 1] = 0
self._m[r + 1, c] = 4
self._m[r + 1, c + 1] = 1
elif direction == 4:
assert self._m[r, c - 1] == 0
self._m[r, c - 1] = 4
self._m[r, c] = 1
self._m[r, c + 1] = 0
self._record_step(pos, direction)
def _find_key_and_move(self, pos, direction):
r, c = pos
assert self._m[pos] == 1
# 5 3
# 4 1
if self._m[r - 1, c] == 3:
return self._mov_vertical((r - 1, c), direction)
elif self._m[r, c - 1] == 4:
return self._mov_horizontal((r, c - 1), direction)
else:
if self._m[r - 1, c] == 5:
return self._mov_main((r - 1, c), direction)
elif self._m[r, c - 1] == 5:
return self._mov_main((r, c + 1), direction)
elif self._m[r - 1, c - 1] == 5:
return self._mov_main((r - 1, c - 1), direction)
raise NotMoveable
_Mov_Actions = {
1: _find_key_and_move,
2: _mov_small,
3: _mov_vertical,
4: _mov_horizontal,
5: _mov_main,
}
def _record_step(self, pos, direction):
self._steps.append(
((int(pos[0]), int(pos[1])), int(direction))
) # avoid numpy.int64, which is not json serializable
def move(self, pos, direction):
try:
assert pos[0] >= 0 and pos[0] < 5 and pos[1] >= 0 and pos[1] < 4
self._Mov_Actions[self._m[pos]](self, pos, direction)
if self.echo:
self.show()
return self
except IndexError:
raise NotMoveable
except AssertionError:
raise NotMoveable
except KeyError:
raise NotMoveable
def __init__(self, layout, gameId, echo=False):
self.layout = layout
self.gameId = gameId
self._m = np.fromiter(layout, dtype=np.byte).reshape(5, 4)
self._steps = []
self.echo = echo
if echo:
print(f"layout: {self.layout}\n{self._m}")
@property
def current(self):
return "".join([str(n) for n in self._m.flatten().tolist()])
@property
def steps(self):
return self._steps
def show(self):
print(f"-- CURRENT STATE --\n{self._m}")
def copy(self):
m = Maze(self.current, self.gameId, echo=self.echo)
m._steps = [] + self.steps
return m
def get_maze():
url = f"http://106.14.57.14:{port}/api/newgame"
response = requests.get(url)
data = response.json()
print(data)
return Maze(data["layout"], data["gameId"])
def search(ms, max_depth=20, depth=0, states=None, debug=False): # bfs
if not states and len(ms) == 1:
states = {}
states[ms[0].current] = ms[0]
if depth > max_depth:
raise RecursionError(f"Too many steps, give up at {depth}")
_next = []
for maze in ms:
main_at = tuple(*zip(*np.where(maze._m == 5)))
if debug:
print(f"steps: {len(maze.steps)} main at: {main_at}")
maze.show()
if main_at == (3, 1):
print(f"({maze.gameId}) found solution, taking {depth} steps")
return maze
possible = []
for zpos in zip(*np.where(maze._m == 0)): # seach around empty blocks
def _noerr(f):
try:
f()
except NotMoveable:
pass
# move the nearby solid block to the empty position
_noerr(lambda: possible.append(maze.copy().move((zpos[0] - 1, zpos[1]), 3)))
_noerr(lambda: possible.append(maze.copy().move((zpos[0], zpos[1] - 1), 2)))
_noerr(lambda: possible.append(maze.copy().move((zpos[0] + 1, zpos[1]), 1)))
_noerr(lambda: possible.append(maze.copy().move((zpos[0], zpos[1] + 1), 4)))
_count = 0
for n in possible:
if n.current not in states:
states[n.current] = n
_next.append(n)
_count += 1
if debug:
print(f"near {zpos} possibles: {_count}/{len(possible)}")
if len(_next) == 0:
if debug:
print(f"Cannot move further at depth {depth}")
return None
if debug:
print(f"next depth: {depth + 1}, tries: {len(_next)}, recorded: {len(states)}")
return search(_next, max_depth, depth + 1, states, debug=debug)
def LetsRockNRoll(ALL_SOLUTIONS=None):
print(f"Use cached {len(ALL_SOLUTIONS)} solutions")
M = get_maze()
def submit(gameId, solution):
def build_steps(steps):
return [
{"position": int(p[0] * 4 + p[1]), "direction": int(d)}
for p, d in steps
]
url = f"http://106.14.57.14:{port}/api/submit/{gameId}"
response = requests.post(url, json=build_steps(solution))
# print(response.json())
return response.json()
result = {"status": "next", "game_stage": {"layout": M.layout, "round": 0}}
while result["status"] == "next":
layout = result["game_stage"]["layout"]
round = result["game_stage"]["round"]
try:
if ALL_SOLUTIONS and layout in ALL_SOLUTIONS:
print(f"({M.gameId}) Use previous solution of {layout}")
result = submit(M.gameId, ALL_SOLUTIONS[layout])
else:
solution = search([Maze(layout, M.gameId)], max_depth=200)
assert solution
ALL_SOLUTIONS[layout] = solution.steps
result = submit(M.gameId, solution.steps)
except Exception as e:
print(e)
print(f"Game {M.gameId} finished in {round} rounds, status: {result['status']}")
if result["status"] not in ("next", "lose"):
raise SystemExit(f"Finished! status => {result}")
return ALL_SOLUTIONS
if __name__ == "__main__":
from sys import argv
fname = "solutions.json"
threads = 8
if len(argv) > 1:
fname = argv[1]
if len(argv) > 2:
threads = int(argv[2])
print(f"Using cache file {fname}, solver: {threads}")
ALL_SOLUTIONS = {}
try:
with open(fname, "r") as f:
ALL_SOLUTIONS = json.load(f)
print(f"Loaded {len(ALL_SOLUTIONS)} solutions")
except:
pass
from concurrent.futures import ProcessPoolExecutor as Pool, ALL_COMPLETED, wait
try:
with Pool(max_workers=2) as p:
futures = []
while True:
for i in range(2):
futures.append(p.submit(LetsRockNRoll, ALL_SOLUTIONS))
wait(futures, return_when=ALL_COMPLETED)
for f in futures:
ALL_SOLUTIONS.update(f.result())
print(f"Saving {len(ALL_SOLUTIONS)} solutions")
open(fname, "w").write(json.dumps(ALL_SOLUTIONS))
for f in futures:
if f.exception():
raise f.exception()
except SystemExit as fin:
print(fin)
except KeyboardInterrupt:
pass
print(f"Saving {len(ALL_SOLUTIONS)} solutions")
open(fname, "w").write(json.dumps(ALL_SOLUTIONS))
# assert search([Maze("35131111202032231411", 0, echo=True)], max_depth=200, debug=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment