Skip to content

Instantly share code, notes, and snippets.

@bleu48
Created February 15, 2022 08:41
Show Gist options
  • Save bleu48/3925f2ae3c55a182439007978427220e to your computer and use it in GitHub Desktop.
Save bleu48/3925f2ae3c55a182439007978427220e to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
# floodgate専用スクリプト @bleu48 2022.2.15
# 元ネタ https://github.com/ishidakei/python-shogi/blob/master/scripts/csa_usi_bridge.py
from __future__ import print_function, unicode_literals
import re
import subprocess
import os
import sys
import shogi
from shogi import CSA
from shogi.CSA import COLOR_SYMBOLS, PIECE_SYMBOLS, SQUARE_NAMES
from shogi import Move
from timeout_decorator import timeout, TimeoutError
class CSAUSIBrdige:
if not hasattr(sys.stdout, 'buffer'):
import locale
import codecs
sys.stdout = codecs.getwriter(
locale.getpreferredencoding())(sys.stdout)
def __init__(self, usi_engine_path):
self.proc = subprocess.Popen(usi_engine_path, bufsize=0, shell=False, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE,)
self.write_usi("usi\n")
while True:
output = self.read_usi_line()
if output.rstrip() == "usiok":
break
usi_options_path = './usioptions.txt'
if os.path.exists(usi_options_path):
with open(usi_options_path, "r") as options:
for line in options:
self.write_usi(line)
self.write_usi("isready\n")
while True:
output = self.read_usi_line()
if output.rstrip() == "readyok":
break
def connect(self, host_name, user_name, password):
print("CONNECTING...")
self.csa = CSA.TCPProtocol(host_name, 4081)
self.csa.login(user_name, password)
print("CONNECTED")
@timeout(180)
def match(self):
print("WAITING FOR MATCHING...")
game_summary = self.csa.wait_match()
print("MATCHED")
return game_summary
node_re = re.compile("nodes +([0-9]+)")
cp_re = re.compile("cp +(\-?[0-9]+)")
ponder_re = re.compile("ponder +([0-9a-zA-Z\*\+]+)")
bestmove_re = re.compile("bestmove +([0-9a-zA-Z\*\+]+)")
def game(self, log=None):
moves = []
btime = 900000
wtime = 900000
byoyomi = 0
time_increment = 0
try:
game_summary = self.match()
except TimeoutError:
print("Time Out for Matching")
return
start_sfen = game_summary['summary']['sfen']
my_color = game_summary['my_color']
time = game_summary['summary']['time']
# print(time)
# print(game_summary)
if time != None:
if time['Total_Time'] != None:
btime = int(time['Total_Time']) * 1000
wtime = btime
if time['Byoyomi'] != None:
byoyomi = int(time['Byoyomi']) * 1000
if time['Increment'] != None:
time_increment = int(time['Increment']) * 1000
board = shogi.Board(start_sfen)
for move in game_summary['summary']['moves']:
if move['color'] == 0:
btime += time_increment - float(move['spend_time']) * 1000
else:
wtime += time_increment - float(move['spend_time']) * 1000
board.push(shogi.Move.from_usi(move['usi']))
self.csa.agree()
# やねうら王はusinewgameで初期化しないので、毎ゲームisready 足してみる
self.write_usi("isready\n")
while True:
output = self.read_usi_line()
if output.rstrip() == "readyok":
break
self.write_usi("usinewgame\n")
ponder = None
ponder_hit = None
while True:
print(board)
if my_color == 0:
print('B(self):', btime / 1000, 'W:', wtime / 1000)
else:
print('B:', btime / 1000, 'W(self):', wtime / 1000)
if board.turn == my_color:
nodes = 0
cp = 0
ponder = None
bestmove = None
if ponder_hit == None:
if len(moves) == 0:
self.write_usi("position sfen {0}\n".format(start_sfen))
else:
self.write_usi("position sfen {0} moves {1}\n".format(start_sfen, " ".join(moves)))
if byoyomi > 0:
self.write_usi("go btime {0} wtime {1} byoyomi {2}\n".format(btime, wtime, byoyomi))
else:
self.write_usi("go btime {0} wtime {1} binc {2} winc {3}\n".format(btime, wtime, time_increment, time_increment))
while True:
output = self.read_usi_line()
if output[0:5] == "info ":
match = self.node_re.search(output)
if match != None:
nodes = int(match.group(1))
match = self.cp_re.search(output)
if match != None:
cp = int(match.group(1))
if output[0:9] == "bestmove ":
match = self.ponder_re.search(output)
if match != None:
ponder = match.group(1)
if output[0:12] == "bestmove win":
self.csa.command('%KACHI')
# break
return
if output[0:15] == "bestmove resign":
self.csa.resign()
# break
return
match = self.bestmove_re.search(output)
if match != None:
bestmove = match.group(1)
break
if bestmove != None:
next_move = Move.from_usi(bestmove)
board.push(next_move)
moves.append(bestmove)
if my_color != 0:
cp = -cp
comment = "\'* {0}".format(cp)
if ponder != None:
comment = comment + \
" {0}".format(Move.from_usi(ponder))
comment = comment + " #{0}".format(nodes)
if next_move.from_square is None:
from_square = '00'
else:
from_square = SQUARE_NAMES[next_move.from_square]
command = '{0}{1}{2}{3}'.format(COLOR_SYMBOLS[my_color],
from_square,
SQUARE_NAMES[next_move.to_square],
PIECE_SYMBOLS[board.pieces[next_move.to_square]])
line = self.csa.command(command +',' + comment)
while True:
if line[0] == '+' or line[0] == '-':
(turn, usi, spend_time, message) = self.csa.parse_server_message(
line, board)
if my_color == 0:
btime = max(0, btime + time_increment - int(spend_time * 1000))
else:
wtime = max(0, wtime + time_increment - int(spend_time * 1000))
self.show_and_log("SELF MOVE: {0}{1},T{2},{3}".format("+" if turn == 0 else "-", next_move, spend_time, comment, ), log)
if ponder != None:
self.write_usi("position sfen {0} moves {1} {2}\n".format( start_sfen, ' '.join(moves), ponder ))
if byoyomi > 0:
self.write_usi("go ponder btime {0} wtime {1} byoyomi {2}\n".format( btime, wtime, byoyomi))
else:
self.write_usi("go ponder btime {0} wtime {1} binc {2} winc {3}\n".format( btime, wtime, time_increment, time_increment))
break
self.show_and_log("MESSAGE: {0}".format(line), log)
if line[0:5] == "#DRAW" or line[0:5] == "#LOSE" or line[0:4] == "#WIN" or line[0:9] == "#CENSORED": # 追加2022.2.4
self.write_usi("stop\n")
return
line = self.csa.read_line()
else:
ponder_hit = None
(turn, usi, spend_time, message) = self.csa.wait_server_message(board)
if message is not None:
self.show_and_log("MESSAGE: {0}".format(CSA.SERVER_MESSAGE_SYMBOLS[message]), log)
self.write_usi("stop\n")
return
else:
if turn != board.turn:
raise ValueError("Invalid turn")
if my_color == 1:
btime = max(0, btime + time_increment - int(spend_time * 1000))
else:
wtime = max(0, wtime + time_increment - int(spend_time * 1000))
move = shogi.Move.from_usi(usi)
self.show_and_log("OPPONENT MOVE: {0}{1},T{2}".format("+" if turn == 0 else "-", move, spend_time, ), log)
if ponder != None:
if ponder != usi:
self.write_usi("stop\n")
while True:
output = self.read_usi_line()
if output[0:9] == "bestmove ":
break
else:
self.write_usi("ponderhit\n")
ponder_hit = True
board.push(move)
moves.append(usi)
def write_usi(self, str):
self.proc.stdin.write(str.encode("utf-8"))
self.proc.stdin.flush() # 追加 2022.2.1
print("USI> ", str.rstrip())
def read_usi_line(self):
line = self.proc.stdout.readline().decode("utf-8")
print("USI< ", line.rstrip())
return line
def show_and_log(self, str, log):
print(str)
if log is not None:
log.write("{0}\n".format(str))
from datetime import datetime, timedelta
import sched
import time
if __name__ == '__main__':
argc = len(sys.argv)
if argc != 5:
print('Usage: {0} usi_engine host_name user_name password'.format(
sys.argv[0]))
sys.exit(1)
bridge = CSAUSIBrdige(sys.argv[1])
try:
while True:
now = datetime.now()
if now.minute < 29:
comp = datetime(now.year, now.month, now.day, now.hour, 29)
elif now.minute < 59:
comp = datetime(now.year, now.month, now.day, now.hour, 59)
else:
comp = datetime(now.year, now.month, now.day, now.hour+1 , 29)
diff = comp - now
print(datetime.now(),diff)
scheduler = sched.scheduler(time.time, time.sleep)
scheduler.enter(diff.seconds, 1, print, (datetime.now(), ))
scheduler.run()
print(datetime.now())
bridge.connect(sys.argv[2], sys.argv[3], sys.argv[4])
with open("log.txt", "a") as log:
bridge.game(log)
except KeyboardInterrupt:
print('!!FINISH!!')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment