Skip to content

Instantly share code, notes, and snippets.

@TadaoYamaoka
Created July 22, 2023 07:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save TadaoYamaoka/8a45ac93aa0951664043294ffb029d98 to your computer and use it in GitHub Desktop.
Save TadaoYamaoka/8a45ac93aa0951664043294ffb029d98 to your computer and use it in GitHub Desktop.
mate_sfen.py
import argparse
import glob
import os
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor, wait
from threading import Lock
import queue
from cshogi import *
from cshogi import CSA
from cshogi.usi.Engine import Engine
parser = argparse.ArgumentParser()
parser.add_argument("dir")
parser.add_argument("mata_sfen")
parser.add_argument("--engine", default="/work/KomoringHeights/bin/KomoringHeights-by-gcc")
parser.add_argument('--options')
parser.add_argument('--threads', type=int, default=20)
parser.add_argument('--byoyomi', type=int, default=30000)
args = parser.parse_args()
lock = Lock()
base, sub = os.path.split(args.dir)
if base == "":
file_pos = 0
elif sub == "":
file_pos = len(base) + 1
else:
file_pos = len(args.dir) + 1
files = queue.Queue()
total = 0
for path in glob.glob(os.path.join(args.dir, "**", "*.csa"), recursive=True):
files.put(path)
total += 1
pbar = tqdm(total=total)
f = open(args.mata_sfen, "w")
def mate():
engine = Engine(args.engine)
if args.options:
for option in args.options.split(','):
name, value = option.split(':')
engine.setoption(name, value)
engine.isready()
board = Board()
try:
while True:
path = files.get_nowait()
try:
kif = CSA.Parser.parse_file(path)[0]
if kif.endgame == "%TORYO":
board.set_sfen(kif.sfen)
usi_moves = []
for move in kif.moves:
if board.is_legal(move):
usi_moves.append(move_to_usi(move))
board.push(move)
else:
raise
mate_positions = []
for i in range(1, len(usi_moves), 2):
moves = usi_moves[:-i]
position = "startpos moves " + " ".join(moves)
engine.position(moves=moves)
pv = engine.go_mate(args.byoyomi)
if pv in ["nomate", "timeout"]:
break
n = len(pv.split(" "))
mate_positions.append((position, pv, n))
if len(mate_positions) > 0:
file = path[file_pos:]
with lock:
for position, pv, n in mate_positions:
f.write(position + "," + pv + "," + str(n) + "," + file + "\n")
except:
print(f'skip {path}')
with lock:
pbar.update(1)
except:
pass
engine.quit()
executor = ThreadPoolExecutor(max_workers=args.threads)
fs = []
for i in range(args.threads):
fs.append(executor.submit(mate))
wait(fs)
executor.shutdown()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment