This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
# floodgate専用スクリプト @bleu48 2022.2.15 | |
# 元ネタ https://github.com/ishidakei/python-shogi/blob/master/scripts/csa_usi_bridge.py | |
from __future__ import print_function, unicode_literals | |
import re | |
import subprocess | |
import os | |
import sys | |
import shogi | |
from shogi import CSA | |
from shogi.CSA import COLOR_SYMBOLS, PIECE_SYMBOLS, SQUARE_NAMES | |
from shogi import Move | |
from timeout_decorator import timeout, TimeoutError | |
class CSAUSIBrdige: | |
if not hasattr(sys.stdout, 'buffer'): | |
import locale | |
import codecs | |
sys.stdout = codecs.getwriter( | |
locale.getpreferredencoding())(sys.stdout) | |
def __init__(self, usi_engine_path): | |
self.proc = subprocess.Popen(usi_engine_path, bufsize=0, shell=False, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE,) | |
self.write_usi("usi\n") | |
while True: | |
output = self.read_usi_line() | |
if output.rstrip() == "usiok": | |
break | |
usi_options_path = './usioptions.txt' | |
if os.path.exists(usi_options_path): | |
with open(usi_options_path, "r") as options: | |
for line in options: | |
self.write_usi(line) | |
self.write_usi("isready\n") | |
while True: | |
output = self.read_usi_line() | |
if output.rstrip() == "readyok": | |
break | |
def connect(self, host_name, user_name, password): | |
print("CONNECTING...") | |
self.csa = CSA.TCPProtocol(host_name, 4081) | |
self.csa.login(user_name, password) | |
print("CONNECTED") | |
@timeout(180) | |
def match(self): | |
print("WAITING FOR MATCHING...") | |
game_summary = self.csa.wait_match() | |
print("MATCHED") | |
return game_summary | |
node_re = re.compile("nodes +([0-9]+)") | |
cp_re = re.compile("cp +(\-?[0-9]+)") | |
ponder_re = re.compile("ponder +([0-9a-zA-Z\*\+]+)") | |
bestmove_re = re.compile("bestmove +([0-9a-zA-Z\*\+]+)") | |
def game(self, log=None): | |
moves = [] | |
btime = 900000 | |
wtime = 900000 | |
byoyomi = 0 | |
time_increment = 0 | |
try: | |
game_summary = self.match() | |
except TimeoutError: | |
print("Time Out for Matching") | |
return | |
start_sfen = game_summary['summary']['sfen'] | |
my_color = game_summary['my_color'] | |
time = game_summary['summary']['time'] | |
# print(time) | |
# print(game_summary) | |
if time != None: | |
if time['Total_Time'] != None: | |
btime = int(time['Total_Time']) * 1000 | |
wtime = btime | |
if time['Byoyomi'] != None: | |
byoyomi = int(time['Byoyomi']) * 1000 | |
if time['Increment'] != None: | |
time_increment = int(time['Increment']) * 1000 | |
board = shogi.Board(start_sfen) | |
for move in game_summary['summary']['moves']: | |
if move['color'] == 0: | |
btime += time_increment - float(move['spend_time']) * 1000 | |
else: | |
wtime += time_increment - float(move['spend_time']) * 1000 | |
board.push(shogi.Move.from_usi(move['usi'])) | |
self.csa.agree() | |
# やねうら王はusinewgameで初期化しないので、毎ゲームisready 足してみる | |
self.write_usi("isready\n") | |
while True: | |
output = self.read_usi_line() | |
if output.rstrip() == "readyok": | |
break | |
self.write_usi("usinewgame\n") | |
ponder = None | |
ponder_hit = None | |
while True: | |
print(board) | |
if my_color == 0: | |
print('B(self):', btime / 1000, 'W:', wtime / 1000) | |
else: | |
print('B:', btime / 1000, 'W(self):', wtime / 1000) | |
if board.turn == my_color: | |
nodes = 0 | |
cp = 0 | |
ponder = None | |
bestmove = None | |
if ponder_hit == None: | |
if len(moves) == 0: | |
self.write_usi("position sfen {0}\n".format(start_sfen)) | |
else: | |
self.write_usi("position sfen {0} moves {1}\n".format(start_sfen, " ".join(moves))) | |
if byoyomi > 0: | |
self.write_usi("go btime {0} wtime {1} byoyomi {2}\n".format(btime, wtime, byoyomi)) | |
else: | |
self.write_usi("go btime {0} wtime {1} binc {2} winc {3}\n".format(btime, wtime, time_increment, time_increment)) | |
while True: | |
output = self.read_usi_line() | |
if output[0:5] == "info ": | |
match = self.node_re.search(output) | |
if match != None: | |
nodes = int(match.group(1)) | |
match = self.cp_re.search(output) | |
if match != None: | |
cp = int(match.group(1)) | |
if output[0:9] == "bestmove ": | |
match = self.ponder_re.search(output) | |
if match != None: | |
ponder = match.group(1) | |
if output[0:12] == "bestmove win": | |
self.csa.command('%KACHI') | |
# break | |
return | |
if output[0:15] == "bestmove resign": | |
self.csa.resign() | |
# break | |
return | |
match = self.bestmove_re.search(output) | |
if match != None: | |
bestmove = match.group(1) | |
break | |
if bestmove != None: | |
next_move = Move.from_usi(bestmove) | |
board.push(next_move) | |
moves.append(bestmove) | |
if my_color != 0: | |
cp = -cp | |
comment = "\'* {0}".format(cp) | |
if ponder != None: | |
comment = comment + \ | |
" {0}".format(Move.from_usi(ponder)) | |
comment = comment + " #{0}".format(nodes) | |
if next_move.from_square is None: | |
from_square = '00' | |
else: | |
from_square = SQUARE_NAMES[next_move.from_square] | |
command = '{0}{1}{2}{3}'.format(COLOR_SYMBOLS[my_color], | |
from_square, | |
SQUARE_NAMES[next_move.to_square], | |
PIECE_SYMBOLS[board.pieces[next_move.to_square]]) | |
line = self.csa.command(command +',' + comment) | |
while True: | |
if line[0] == '+' or line[0] == '-': | |
(turn, usi, spend_time, message) = self.csa.parse_server_message( | |
line, board) | |
if my_color == 0: | |
btime = max(0, btime + time_increment - int(spend_time * 1000)) | |
else: | |
wtime = max(0, wtime + time_increment - int(spend_time * 1000)) | |
self.show_and_log("SELF MOVE: {0}{1},T{2},{3}".format("+" if turn == 0 else "-", next_move, spend_time, comment, ), log) | |
if ponder != None: | |
self.write_usi("position sfen {0} moves {1} {2}\n".format( start_sfen, ' '.join(moves), ponder )) | |
if byoyomi > 0: | |
self.write_usi("go ponder btime {0} wtime {1} byoyomi {2}\n".format( btime, wtime, byoyomi)) | |
else: | |
self.write_usi("go ponder btime {0} wtime {1} binc {2} winc {3}\n".format( btime, wtime, time_increment, time_increment)) | |
break | |
self.show_and_log("MESSAGE: {0}".format(line), log) | |
if line[0:5] == "#DRAW" or line[0:5] == "#LOSE" or line[0:4] == "#WIN" or line[0:9] == "#CENSORED": # 追加2022.2.4 | |
self.write_usi("stop\n") | |
return | |
line = self.csa.read_line() | |
else: | |
ponder_hit = None | |
(turn, usi, spend_time, message) = self.csa.wait_server_message(board) | |
if message is not None: | |
self.show_and_log("MESSAGE: {0}".format(CSA.SERVER_MESSAGE_SYMBOLS[message]), log) | |
self.write_usi("stop\n") | |
return | |
else: | |
if turn != board.turn: | |
raise ValueError("Invalid turn") | |
if my_color == 1: | |
btime = max(0, btime + time_increment - int(spend_time * 1000)) | |
else: | |
wtime = max(0, wtime + time_increment - int(spend_time * 1000)) | |
move = shogi.Move.from_usi(usi) | |
self.show_and_log("OPPONENT MOVE: {0}{1},T{2}".format("+" if turn == 0 else "-", move, spend_time, ), log) | |
if ponder != None: | |
if ponder != usi: | |
self.write_usi("stop\n") | |
while True: | |
output = self.read_usi_line() | |
if output[0:9] == "bestmove ": | |
break | |
else: | |
self.write_usi("ponderhit\n") | |
ponder_hit = True | |
board.push(move) | |
moves.append(usi) | |
def write_usi(self, str): | |
self.proc.stdin.write(str.encode("utf-8")) | |
self.proc.stdin.flush() # 追加 2022.2.1 | |
print("USI> ", str.rstrip()) | |
def read_usi_line(self): | |
line = self.proc.stdout.readline().decode("utf-8") | |
print("USI< ", line.rstrip()) | |
return line | |
def show_and_log(self, str, log): | |
print(str) | |
if log is not None: | |
log.write("{0}\n".format(str)) | |
from datetime import datetime, timedelta | |
import sched | |
import time | |
if __name__ == '__main__': | |
argc = len(sys.argv) | |
if argc != 5: | |
print('Usage: {0} usi_engine host_name user_name password'.format( | |
sys.argv[0])) | |
sys.exit(1) | |
bridge = CSAUSIBrdige(sys.argv[1]) | |
try: | |
while True: | |
now = datetime.now() | |
if now.minute < 29: | |
comp = datetime(now.year, now.month, now.day, now.hour, 29) | |
elif now.minute < 59: | |
comp = datetime(now.year, now.month, now.day, now.hour, 59) | |
else: | |
comp = datetime(now.year, now.month, now.day, now.hour+1 , 29) | |
diff = comp - now | |
print(datetime.now(),diff) | |
scheduler = sched.scheduler(time.time, time.sleep) | |
scheduler.enter(diff.seconds, 1, print, (datetime.now(), )) | |
scheduler.run() | |
print(datetime.now()) | |
bridge.connect(sys.argv[2], sys.argv[3], sys.argv[4]) | |
with open("log.txt", "a") as log: | |
bridge.game(log) | |
except KeyboardInterrupt: | |
print('!!FINISH!!') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment