Created
August 11, 2023 17:13
-
-
Save AndyGrant/8d607b11b0d4bf85acd61502d30176cb to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/python3 | |
import argparse | |
import chess | |
import chess.pgn | |
from subprocess import Popen, PIPE, call | |
from multiprocessing import Process, Queue | |
class Engine(): | |
def __init__(self, binary): | |
self.engine = Popen([binary], stdin=PIPE, stdout=PIPE, universal_newlines=True, shell=True) | |
self.uci_ready() | |
self.write_line('uci\n') | |
def write_line(self, line): | |
self.engine.stdin.write(line) | |
self.engine.stdin.flush() | |
def read_line(self): | |
return self.engine.stdout.readline().rstrip() | |
def uci_newgame(self): | |
self.write_line('ucinewgame\n') | |
def uci_ready(self): | |
self.write_line('isready\n') | |
while self.read_line() != 'readyok': pass | |
def uci_search(self, fen, string): | |
self.uci_ready() | |
self.write_line('position fen %s\ngo %s\n' % (fen, string)) | |
return list(self.uci_bestmove()) | |
def uci_bestmove(self): | |
while True: | |
line = self.read_line() | |
yield line | |
if line.startswith('bestmove'): break | |
def generator_from_fens(fens): | |
with open(fens) as fin: | |
for line in fin: | |
yield line.rstrip() | |
def enqueue_fens(generator, batch_size, queue): | |
for x in range(batch_size): | |
try: queue.put(next(generator)) | |
except StopIteration: return x | |
return batch_size | |
def process_fens(inqueue, outqueue, engine, search): | |
engine = Engine(engine) | |
engine.write_line('setoption name Minimal value true\n') | |
engine.write_line('setoption name MultiPV value 5\n') | |
while True: | |
engine.uci_newgame() | |
fen = inqueue.get() | |
analysis = engine.uci_search(fen, search) | |
data = [( | |
line.split(' pv ')[1].split()[0], | |
line.split(' score ')[1].split(' nodes ')[0] | |
) for line in analysis[:-1]] | |
outqueue.put((fen, data)) | |
if __name__ == '__main__': | |
p = argparse.ArgumentParser( | |
formatter_class=lambda prog: | |
argparse.ArgumentDefaultsHelpFormatter(prog, max_help_position=10) | |
) | |
p.add_argument('--fens', help='Input Fen File', required=True) | |
p.add_argument('--engine', help='Engine Binary', required=True) | |
p.add_argument('--search', help='UCI go command', required=True) | |
p.add_argument('--workers', help='# of workers', required=True) | |
args = p.parse_args() | |
inqueue = Queue() | |
outqueue = Queue() | |
workers = [ | |
Process( | |
target=process_fens, | |
args=(inqueue, outqueue, args.engine, args.search), | |
).start() for x in range(int(args.workers)) | |
] | |
generator = generator_from_fens(args.fens) | |
N = 1024 | |
while True: | |
# Enqueue and collect up to N results | |
for x in range(enqueue_fens(generator, N, inqueue)): | |
print (outqueue.get()) | |
# Terminate if we enqueued less than N fens | |
if x != N - 1: | |
break |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment