Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
# -*- coding: utf-8 -*-
"""
I. About the script
1. It takes a pgn file with games in it and analyze those
games there one by one if there are more than one game
2. It mainly uses Stockfish uci engine during development.
Other uci engines can be used provided those engines support
multipv mode
II. Application references and dependencies
1. Developed under python 2.7.11
2. Using python-chess library version 0.14.1
site: http://python-chess.readthedocs.org/en/v0.14.1/
3. Tested under Windows 7, and Linux Mint
III. License notice
This program is free software, you can redistribute it and/or modify
it under the terms of the GPLv3 License as published by the
Free Software Foundation.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License (LICENSE)
along with this program, if not visit https://www.gnu.org/licenses/gpl.html
IV. Release notes
1. Release date: December 15, 2016
2. Files:
a. game_analyzer_v39.14.beta.py
b. chess_game_analyzer.exe
c. runChessGameAnalyzer.bat
d. LICENSE
e. sample_game.pgn
f. Deuterium.exe
g. readme.txt
V. Chess Game Analyzer Development log
A. Notes:
1. Interesting positions are saved in puzzle.epd
2. Existing comments in the original pgn will not be copied to the analyzed game
3. Original pgn file will not be changed by the script
4. Installation of python 2.7.11 or later (not tested though) and
python-chess v0.14.1 on your computer are needed for the script to work
B. Changes:
v39.14.beta
1. Limit max complexity analysis time to 2 minutes or half of user-defined
analysis time whichever is smaller.
2. When the position is complicated increase the engine analysis time
proportional to the complixity of the positions. This was 3 x user-defined
analysis time when move changes is greater 2.
3. When move is easy (move_changes <= 0) reduce the engine analysis time to 50%
of the user-defined analysis time or 100ms whichever is bigger.
v39.13.beta
1. Save complex position with movechanges >= 5 in epd format.
2. Fixed adding comment in a variation when that variation has only 1 move.
3. Tune which bad lines are to be shown, engine pv from multipv 2 will only
be written when it is not too bad compared to pv from multipv 1.
v39.12.beta
1. Modify opening of pgn file
v39.11.beta
1. Modify writing of pv2 line
2. Modify condition of writing !! and ! to game move NAG, now also depends
on complexity number. score >= +1.0 and move changes >= 5 for !! and >= 3 for !
3. Remove saving of interesting position as puzzle
4. Use utf-8 encoding, for reading input file and writing to output file.
v39.10
1. Added comment at the end of a game for number of blunders,
mistakes and dubious moves.
Decisive advantage: v >= 3P
Moderate advantage: v >= 1.5P and v < 3P
Slight advantage: v >= 0.25P and v < 1.5P
Blunder (??): From non-decisive to decisive advantage for opp
Mistakes (?): From non-moderate to moderate advantage for opp
Dubious (?!): From non-slight to slight advantage for opp
2. Change blundermargincp to addvariationmargincp
3. Change only move symbol margin from +3 pawns to +1.5 pawns
v39.9
1. Modify position_nags(), do not return $0 instead return None
2. Verify that we will get search info when trying to get threat move
and threat pv
v39.8
1. Do not add $0 after a game move when there is an analysis to be written
v39.7
1. Correct spelling of possibility
2. Added French translation
3. Fixed double output of result after the game notation if there is comment
"A model game for white, black or white and black".
v39.6.beta
1. Added --cerebellum option, so that the tool will be able to comment in the
book moves of a player
2. Handle properly when the analyzing engine does not return a search info when
told to analyze a position. A comment is added {No search output from Annotator}
3. Modify comment when polyglot book is used
4. When --bookfile <polyglot book filename> and --cerebellum <1> is used,
Only the polyglot book will be probed.
v39.5
1. Added # -*- coding: utf-8 -*- at the top
2. Added --lang option as language option, default is ENG, other value is GER
3. Added german language for comments, note there are unicode
string in the comments for german translation
4. Change comment_key() to get_good_comment()
5. Change comments to list instead of dict except good comment
v39.4
1. Remove empty comment in REASON_COMMENT list
2. Update usage()
v39.3
1. Add model game comment only when last move of a player or both players
are all analyzed
2. Added more alternative comment
3. Added more reason comment
v39.2
1. Create a new function analyze_games()
2. Modify Function names
3. Modify Good comment
v39.1
1. Check if process created has been terminated by commmunicate()
v39
1. Added Model game comment at the end of every game,
if game has no blunder beyond 50 cp. It can be a model game
for white, for black and for white and black
2. Added player name option, to analyze only the game of specific player
3. Modify condition of parsing pv lines, there should be
"score cp" or "score mate" in the pv line
v38
1. Use command line options instead of interactive user input
v37
1. Copy the original headers completely into analyzedGame.pgn
2. Tested using Stockfish 7 under OS windows 7
3. Calculation of position complexity by "pv move changes"
starts at iteration depth equal to 9
4. Print statements are now converted to function for python 3.0 and later
compatibility
v36.1
1. Upgrade to use Python-chess version 0.13.0
v36
1. Added polyglot book, to not analyze game move if it is in the book.
The book filename should be book.bin, and should be located in the
same directory of this tool.
2. Added option to set blunder margin in cp. If this value is low, expect
to have more analysis lines will be shown.
3. Modify Annotator tag, added blunder margin in cp
4. Added position NAG (+/-, +- ) to a line "color is threatening" variation
"""
from __future__ import print_function
import sys
sys.path.append('/home/lila/python-chess')
import chess
import codecs
import sys
from chess import pgn
import os
import subprocess
import random
from chess import polyglot
import getopt
# Constants
APP_NAME = "Chess Game Analyzer"
APP_VERSION = "39.14.beta"
INF = 32000
MAX_PLY = 128
BAD_SCORE = -INF
ONLY_MOVE_SCORE = 1.5
ANALYSIS_MARGIN = 10.0
GOOD_SCORE = 1.5
WHITE = 1
BLACK = 0
MODEL_GAME_MARGIN = 0.5
WHITE_MATE_THREAT_COMMENT = {'ENG': 'White is threatening mate in',
'FRA': 'Les blancs menacent mat en'.decode('utf8'),
'GER': 'Weiß droht mir in'.decode('utf8')}
BLACK_MATE_THREAT_COMMENT = {'ENG': 'Black is threatening mate in',
'FRA': 'Les noirs menacent mat en'.decode('utf8'),
'GER': 'Schwarz droht mir in'.decode('utf8')}
MOVE_FROM_COMMENT = {'ENG': 'Move from',
'FRA': 'Coup de la bibliotheque'.decode('utf8'),
'GER': 'Verschieben von'.decode('utf8')}
BOOK_RECOMMENDS_COMMENT = {'ENG': 'recommends:',
'FRA': 'recommande :'.decode('utf8'),
'GER': 'empfiehlt :'.decode('utf8')}
WHITE_THREAT_COMMENT = {'ENG': 'White is threatening',
'FRA': 'Les blancs menacent'.decode('utf8'),
'GER': 'Weiß droht'.decode('utf8')}
BLACK_THREAT_COMMENT = {'ENG': 'Black is threatening',
'FRA': 'Les noirs menacent'.decode('utf8'),
'GER': 'Schwarz droht'.decode('utf8')}
WHITE_MODEL_COMMENT = {'ENG': 'A Model game for White.',
'FRA': 'Un jeu exemplaire de la part des blancs.'.decode('utf8'),
'GER': 'Ein Modell für White Spiel.'.decode('utf8')}
BLACK_MODEL_COMMENT = {'ENG': 'A Model game for Black.',
'FRA': 'Un jeu exemplaire de la part des noirs.'.decode('utf8'),
'GER': 'Ein Modell für Schwarz-Spiel.'.decode('utf8')}
WHITE_BLACK_MODEL_COMMENT = {'ENG': 'A Model game for White and Black.',
'FRA': 'Un jeu exemplaire de la part des blancs et des noirs.'.decode('utf8'),
'GER': 'Ein Model Spiel für Weiß und Schwarz.'.decode('utf8')}
# Random comment
BAD_COMMENT = ['Not good is',
'But not',
'Bad is',
'Inferior is',
'Not reliable is',
'Incorrect is',
'Unsatisfactory is'
]
REASON_COMMENT = ['due to',
'in view of',
'thanks to',
'considering',
'on the grounds of',
'because of',
'for the reason of',
]
GOOD_COMMENT = {1: 'A nice try could be',
2: 'Better is',
3: 'More accurate is',
4: 'Superior is',
5: 'Excellent is'
}
ALTERNATIVE_COMMENT = ['Also playable is',
'Another interesting line is',
'One that deserves attention is',
'A good alternative is',
'Also sufficient is',
'Worthy of consideration is',
'Also practical is',
'A fine line worth of consideration is',
'Also capable is',
'Also promising is',
'Another modest line is',
'Another possibility is',
'A good one too is',
'Not to be underestimated is'
]
GER_BAD_COMMENT = ['Nicht gut ist',
'Aber nicht',
'Schlecht ist',
'Schwächer ist',
'Ungeeignet ist',
'Inkorrekt ist',
'Ungenau ist'
]
GER_REASON_COMMENT = ['wegen',
'in Anbetracht von',
'aufgrund von',
'berücksichtigt',
'mit der Begründung',
'infolge von',
'aus dem einfachen Grund',
]
GER_GOOD_COMMENT = {1: 'Ein guter Versuch wäre',
2: 'Besser ist',
3: 'Genauer ist',
4: 'Viel besser ist',
5: 'Exzellent ist'
}
GER_ALTERNATIVE_COMMENT = ['Ebenso spielbar ist',
'Ein interessanter Zug ist',
'Beachtung verdient auch',
'Eine gute Alternative ist',
'Ausreichend ist auch',
'Eine Überlegung wert ist auch',
'Spielbar ist auch',
'Vielversprechend erscheint auch',
'Ausreichend ist auch',
'Chancenreich erscheint',
'Ein anderer solider Zug ist',
'Eine andere Möglichkeit ist',
'Gut wäre auch',
'Nicht zu unterschätzen ist'
]
FRA_BAD_COMMENT = ['Pas acceptable serait',
'Non pas',
'Mauvais serait',
'Inférieur serait',
'Aventureux serait',
'Incorrect serait',
'Insuffisant serait'
]
FRA_REASON_COMMENT = ['en raison de',
'à la vue de',
'grâce à',
'étant donné que',
'parce que',
'à cause de',
'au motif de'
]
FRA_GOOD_COMMENT = {1:'Une bonne démarche serait',
2:'Meilleur serait',
3:'Plus précis serait',
4:'Supérieur serait',
5:'Excellent serait'
}
FRA_ALTERNATIVE_COMMENT = ["Aussi jouable :",
"Une autre ligne intéressante :",
"Sollicite aussi l'attention :",
"Une bonne alternative est",
"Aussi suffisant :",
"Mérite aussi l'attention :",
"Jouable aussi :",
"Une ligne attrayante est",
"Aussi efficace :",
"Aussi prometteur est",
"Une autre ligne convenable :",
"Une autre possibilité est",
"Très bien aussi :",
"À ne pas sous-estimer :"
]
def usage():
""" List of options that can be used """
print('Usage:')
print('appname -f g.pgn --engine Sf7.exe --eoption "Hash value 128, Threads value 1"')
print('\nOptions:')
print('-f or --file <input pgn filename>')
print('--engine <uci engine filename>')
print('--movetime <time in ms per move, default: 1000 ms>')
print('--eoption "<opt_name1> value <opt_value1>, <opt_name2> value <opt_value2>"')
print('--outfile <filename>')
print('--startmove <move number>')
print('--endmove <move number>')
print('--bookfile <polyglot book filename>')
print('--addvariationmargincp <value in centipawn>')
print('--lang <value ENG or GER or FRA>')
print('--cerebellum <0 or 1>')
print('--bookannotationonly <0 or 1>')
print('--player <player name in the game found in either White or Black pgn tag>')
def random_reason(_lang):
""" Returns a string as reason comment """
res = []
if _lang == 'GER':
MY_REASON_COMMENT = GER_REASON_COMMENT
elif _lang == 'FRA':
MY_REASON_COMMENT = FRA_REASON_COMMENT
else:
MY_REASON_COMMENT = REASON_COMMENT
for i in MY_REASON_COMMENT:
res.append(i)
random.shuffle(res)
idx = random.randint(0,len(res)-1)
if _lang == 'GER' or _lang == 'FRA':
return res[idx].decode('utf-8')
return res[idx]
def random_bad(_lang):
""" Returns a string for bad comment """
res = []
if _lang == 'GER':
MY_BAD_COMMENT = GER_BAD_COMMENT
elif _lang == 'FRA':
MY_BAD_COMMENT = FRA_BAD_COMMENT
else:
MY_BAD_COMMENT = BAD_COMMENT
for i in MY_BAD_COMMENT:
res.append(i)
random.shuffle(res)
if _lang == 'GER' or _lang == 'FRA':
return res[0].decode('utf-8')
return res[0]
def get_alternative_comment(com, inc, _lang):
""" Returns comment based on index inc """
if inc >= len(com):
# Randomize again
random_alternative(_lang)
inc = 0
val = com[inc]
if _lang == 'GER' or _lang == 'FRA':
val = val.decode('utf-8')
inc += 1
return (val, inc)
def random_alternative(_lang):
""" Returns a list of comments in a different order.
Call this once per new game parsed.
"""
res = []
if _lang == 'GER':
MY_ALTERNATIVE_COMMENT = GER_ALTERNATIVE_COMMENT
elif _lang == 'FRA':
MY_ALTERNATIVE_COMMENT = FRA_ALTERNATIVE_COMMENT
else:
MY_ALTERNATIVE_COMMENT = ALTERNATIVE_COMMENT
for i in MY_ALTERNATIVE_COMMENT:
res.append(i)
random.shuffle(res)
return res
def get_good_comment(cvalue, hvalue, side_tomove, _lang):
""" Returns a comment from GOOD_COMMENT and GER_GOOD_COMMENT
based on scores,
cvalue = analyzer score of its bestmove
hvalue = analyzer score of human move
"""
comment_num = 2
if side_tomove == WHITE:
if cvalue - hvalue >= 3.0:
comment_num = 5
# If already lossing
if cvalue <= -3.0:
comment_num = 1
elif cvalue - hvalue >= 1.5:
comment_num = 4
if cvalue <= -3.0:
comment_num = 1
elif cvalue - hvalue >= 0.50:
comment_num = 3
if cvalue <= -3.0:
comment_num = 1
else:
if cvalue - hvalue <= -3.0:
comment_num = 5
if cvalue >= 3.0:
comment_num = 1
elif cvalue - hvalue <= -1.5:
comment_num = 4
if cvalue >= 3.0:
comment_num = 1
elif cvalue - hvalue <= -0.5:
comment_num = 3
if cvalue >= 3.0:
comment_num = 1
if _lang == 'GER':
return GER_GOOD_COMMENT[comment_num].decode('utf-8')
elif _lang == 'FRA':
return FRA_GOOD_COMMENT[comment_num].decode('utf-8')
return GOOD_COMMENT[comment_num]
def mate_indicator(d2m):
""" Returns +/-M for mate score indication """
if d2m > 0:
return 'White will mate black in %d moves' % abs(d2m)
elif d2m < 0:
return 'Black will mate white in %d moves' % abs(d2m)
return 'None'
def GetMaxMoveNumber(game):
""" Take a game object and returns max number of moves """
fmvn = 0
while len(game.variations):
fmvn = game.board().fullmove_number
next_node = game.variation(0)
game = next_node
return int(fmvn)
def get_engine_detailed_data(data, side):
""" Will extract score, depth, move and pv from input data """
# data = -0.79/15 20...Qa5 21.Nxd6 Bxa4 22.Bb6 Rxd6 23.Bxa5 Rxd3 24.cxd3 Bxd1
# data = -0.79/15 21.Nxd6 Bxa4 22.Bb6 Rxd6 23.Bxa5 Rxd3 24.cxd3 Bxd1
list_value = data.split(' ')
score_and_depth = list_value[0]
list_sd = score_and_depth.split('/')
eval_value = float(list_sd[0])
# Change sign if side is black
if side == BLACK:
eval_value = -1*eval_value
depth = list_sd[1].strip()
depth = int(depth)
move = list_value[1]
if '...' in move:
move = move.split('.')
move = move[3]
else:
move = move.split('.')
move = move[1]
move = move.strip()
pvar = ' '.join(list_value[1:])
return (eval_value, depth, move, pvar)
def get_score_and_depth(data, side_to_move):
""" Will extract score and depth from data.
The returned score is WPOV
"""
# data = -0.79/15 20... Qa5 21. Nxd6 Bxa4 22. Bb6 Rxd6 23. Bxa5 Rxd3 24. cxd3 Bxd1
d_split = data.split(' ')
left_val = d_split[0]
token = left_val.split('/')
val = float(token[0])
# Change sign from point of view of player
# We did this because we let the engine analyze
# the fen + move of the player
val = -1*val
# Change sign if side is black, because we
# use white POV in the analyzed output pgn file.
# Move NAGS !?, ? and others are easier to append by
# checking whether the value is positive or negative
if side_to_move == BLACK:
val = -1*val
depth = token[1].strip()
depth = int(depth)
return (val, depth)
def save_headers(game, outputFN, engine_name, num_threads, nMoveTime):
""" Print to file the headers of a game including
annotator name or the engine that analyzes the game
"""
# Save headers
for key, value in game.headers.iteritems():
with codecs.open(outputFN, 'a+', 'utf-8') as f:
if key != 'Annotator':
f.write('[%s \"%s\"]\n' %(key, value))
# Write the Annotator last
with codecs.open(outputFN, 'a+', 'utf-8') as f:
f.write('[Annotator "%s (%0.1fs/pos, thread=%d)"]\n\n' %(engine_name,
float(nMoveTime)/1000, num_threads))
def is_number(s):
""" Check if input is a number """
try:
float(s)
return True
except ValueError:
return False
def position_nags(v):
""" Returns the NAGs based on input value v """
nag = None
if abs(v) < 0.25:
nag = "$10" # even =
if v >= 3.0:
nag = "$18" # White has decisive adv +-
elif v >= 1.0:
nag = "$16" # White has moderate adv +/-
elif v >= 0.25:
nag = "$14" # White has slight adv +/=
elif v <= -3.0:
nag = "$19" # Black has decisive adv -+
elif v <= -1.0:
nag = "$17" # Black has moderate adv +/-
elif v <= -0.25:
nag = "$15" # Black has slight adv =/+
assert nag is not None
return nag
def move_nags(s, v1, v2):
""" Returns move NAGs for v2 based on side to move,
engine score v1 and human score v2
"""
nag = None
if (s == WHITE and v1 > -3.0 and v2 <= -3.0) or (s == BLACK and v1 < 3.0 and v2 >= +3.0):
nag = "$4" # ??
elif (s == WHITE and v1 >= +3.0 and v2 < 0.25) or (s == BLACK and v1 <= -3.0 and v2 > -0.25):
nag = "$4" # ??
elif (s == WHITE and v1 > -2.99 and v2 <= -1.0) or (s == BLACK and v1 < +2.99 and v2 >= +1.0):
nag = "$2" # ?
elif (s == WHITE and v1 > -0.99 and v2 <= -0.25) or (s == BLACK and v1 < +0.99 and v2 >= +0.25):
nag = "$6" # ?!
return nag
def one_value_move_nags(s, v):
""" Returns move NAGs for based on side and v
"""
nag = None
if (s == WHITE and v <= -3.0) or (s == BLACK and v >= +3.0):
nag = "$4" # ??
elif (s == WHITE and v <= -1.0) or (s == BLACK and v >= +1.0):
nag = "$2" # ?
elif (s == WHITE and v <= -0.25) or (s == BLACK and v >= +0.25):
nag = "$6" # ?!
return nag
# Converts the uci pv to san pv
def ucipv_to_sanpv(fen, pv):
""" Converts uci pv to SAN pv format """
board = chess.Board(fen)
side = board.turn
# Store uci pv in a list and update the board
# then we pop and save the move in san
a = pv.split(' ')
for m in a:
try:
board.push_uci(m)
except ValueError:
print('Illegal move')
# Pop the moves, and save it in SAN
pvSan = []
for i in range(len(a)):
san = board.san(board.pop())
pvSan.append(san)
# Reverse it
pvSan = list(reversed(pvSan))
newPv = ' '.join(pvSan[0:])
# We put number to our pv 1. e4 e5 2. Nf3 ...
fmvn = fen.split(' ')
fmvn = fmvn[-1]
fmvn = int(fmvn)
numPv = []
newPvList = newPv.split(' ')
if side == WHITE:
for i, m in enumerate(newPvList):
if i == 0 or i%2 == 0: # Even
c = fmvn + i/2
b = str(c) + '.' + m
numPv.append(b)
else:
b = m
numPv.append(b)
# else if side is black
else:
for i, m in enumerate(newPvList):
if i == 0:
c = fmvn
b = str(c) + '...' + m
numPv.append(b)
else:
if i%2 != 0: # Even
c = fmvn + i/2 + 1
b = str(c) + '.' + m
numPv.append(b)
else:
b = m
numPv.append(b)
numPv = ' '.join(numPv[0:])
return numPv
def mate_distance_to_value(d):
""" returns value given distance to mate """
value = 0
if d < 0:
value = -2*d - INF
elif d > 0:
value = INF - 2*d + 1
return value
def value_to_mate(value):
""" return number of move to mate """
d = 0
value = int(value)
if abs(value) < INF - MAX_PLY:
d = 0
else:
if value > 0:
d = (INF - value + 1) / 2
elif value < 0:
d = (-INF-value) / 2
return d
def get_time_key(item):
""" Sort time """
return item[2]
def get_depth_key(item):
""" Sort depth """
return item[0]
def analyze_complexity(engineName, fen, _eng_option, movetimev, multipvv, nshortPv):
""" Position is complex when the engine pv move, changes more than once """
assert multipvv == 1
multipv_num = multipvv
record = []
moveChanges = 0
bestScore = -INF-1
engineIsUsingBook = True
scorev = BAD_SCORE
p = subprocess.Popen(engineName, stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
p.stdin.write("uci\n")
for eline in iter(p.stdout.readline, ''):
eline = eline.strip()
if "uciok" in eline:
break
# Send the engine options except multipv
for n in _eng_option:
if "multipv" in n.lower():
pass
else:
p.stdin.write("setoption name %s\n" %(n))
p.stdin.write("setoption name MultiPV value %d\n" %(multipvv))
p.stdin.write("isready\n")
for rline in iter(p.stdout.readline, ''):
rline = rline.strip()
if "readyok" in rline:
break
p.stdin.write("ucinewgame\n")
p.stdin.write("position fen " + fen + "\n")
p.stdin.write("go movetime " + str(movetimev) + "\n") # mt = movetime in ms
# Parse engine output
for eline in iter(p.stdout.readline, ''):
a = eline.strip()
# Process analysis output if there is depth, score and pv
if "depth" in a and "score" in a and "pv" in a:
engineIsUsingBook = False
b = a.split(' ')
i = b.index("depth")
depthv = int(b[i+1])
# Translate mate to value
if "mate" in a:
i = b.index("score")
d2m = int(b[i+2])
scorev = mate_distance_to_value(d2m)
elif "score cp" in a:
i = b.index("score")
scorev = int(b[i+2])
# Split at pv
i = b.index("pv")
c = b[i+1:]
pvv = "None"
# Shorten pv
lenPv = len(c)
if lenPv >= nshortPv:
cc = b[i+1 : i+1+nshortPv]
d = ' '.join(cc)
pvv = d.strip()
else:
cc = b[i+1:]
d = ' '.join(cc)
pvv = d.strip()
# Save only a single move from the pv
singleMove = pvv.split(' ')
singleMove = singleMove[0]
# Record everything then sort later
record.append([depthv, scorev, singleMove])
if "bestmove" in a:
bestScore = scorev
break
# Quit the engine
p.communicate('quit\n')
p.poll()
if p.returncode is None:
print('Warning!! the process has not terminated yet in analyze_complexity()')
if engineIsUsingBook:
assert moveChanges == 0
return moveChanges, False
# Check the move and score
for i, item in enumerate(record):
# Move changes, starts comparison at iteration depth equal to 9
if item[0] >= 10:
if record[i][2] != record[i-1][2]:
moveChanges += 1
mate = False
if bestScore != -INF-1 and (bestScore >= INF - MAX_PLY) or (bestScore <= -INF + MAX_PLY):
mate = True
return (moveChanges, mate)
def max_depth_in_analysis(analysis_data):
""" Find maximum depth in the list"""
max_depth = 0
for item in analysis_data:
if item[0] > max_depth:
max_depth = item[0]
return max_depth
def alter_pv(pv):
""" remove top depth pv """
new_pv = []
for n in pv:
if pv[0][0] == n[0]:
pass
else:
new_pv.append(n)
return new_pv
def good_pv_depth(pv, mpv):
""" returns true if depth of top pvs is the same """
# For 2 pv only
if mpv == 2:
if pv[0][0] == pv[1][0]:
return True
else:
return False
return False
def good_pv_moves(pv):
""" returns true if top pvs moves is not the same, applies only to 2 pv """
pvmove1 = pv[0][4]
pvmove1 = pvmove1.split(' ')
pvmove1 = pvmove1[0]
pvmove2 = pv[1][4]
pvmove2 = pvmove2.split(' ')
pvmove2 = pvmove2[0]
if pvmove1 != pvmove2:
return True
return False
def get_summarized_pv(analysis_data, multipvv):
""" Save the best pv lines return pv list in depth descending order """
final_pv_list = []
max_depth = max_depth_in_analysis(analysis_data)
for i in range(max_depth):
record_depth = []
# Parse the data and save to new list based on depth
for item in analysis_data:
if i+1 == item[0]:
record_depth.append(item)
# Sort time
time_sorted_list = sorted(record_depth, key=get_time_key, reverse=True)
# In temp list only save the pv with high time for multipv 1 and 2
temp_list = []
for j in range(multipvv):
ind = j + 1
# Find the pv with this ind and high time and save it
for n in time_sorted_list:
if ind == n[1]: # multipv 1 or 2
temp_list.append(n)
break
for n in temp_list:
final_pv_list.append(n)
final_pv_list = sorted(final_pv_list, key=get_depth_key, reverse=True)
# [18, 1, 2356, 3, 'e1g1 c8a6 d3a6 a8a6 f3d2 e4d2 c2d2']
# [17, 1, 1920, 12, 'd1e2 g7g5 f4g3 f8f7 e1g1 e4g3 h2g3']
# [17, 2, 1920, 3, 'e1g1 c8a6 d3a6 a8a6 f3d2 e4d2 c2d2']
# [16, 1, 1654, 5, 'e1g1 c8a6 d3a6 a8a6 f3d2 e4d2 d1d2']
# [16, 2, 1654, 0, 'd1e2 g7g5 f4g3 f8f7 e1g1 e4g3 f2g3']
# Filter out same pv move if multipv > 1 at given depth
if multipvv > 1:
# Make sure that multipv is the same as num pvs at same depth
# and pv moves among multipv are not the same
trials = 0
while True and len(final_pv_list) > 1:
if trials >= 3:
pass
trials += 1
if good_pv_depth(final_pv_list, multipvv):
if good_pv_moves(final_pv_list):
break
else:
new_pv = alter_pv(final_pv_list)
final_pv_list = []
final_pv_list = new_pv
else:
new_pv = alter_pv(final_pv_list)
final_pv_list = []
final_pv_list = new_pv
return final_pv_list
def get_cerebellum_book_move(engineName, fen, _eng_option, movetimev, multipvv, nshortPv):
""" Returns a uci move and True/False from stockfish that uses cerebellum book.
If True the bestmove is from cerebellum book """
depth_cnt = 0
# Execute the engine
p = subprocess.Popen(engineName, stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
p.stdin.write("uci\n")
for eline in iter(p.stdout.readline, ''):
eline = eline.strip()
if "uciok" in eline:
break
# Send the engine options except multipv
for n in _eng_option:
if "multipv" in n.lower():
pass
else:
p.stdin.write("setoption name %s\n" %(n))
p.stdin.write("setoption name MultiPV value %d\n" %(multipvv))
p.stdin.write("isready\n")
for rline in iter(p.stdout.readline, ''):
rline = rline.strip()
if "readyok" in rline:
break
p.stdin.write("ucinewgame\n")
p.stdin.write("position fen " + fen + "\n")
p.stdin.write("go movetime " + str(movetimev) + "\n")
# Parse engine output
for eline in iter(p.stdout.readline, ''):
engine_output = eline.strip()
if "depth" in engine_output:
depth_cnt += 1
if "bestmove" in engine_output:
bestmove = engine_output.split()[1]
break
# Quit the engine
p.communicate('quit\n')
p.poll()
if p.returncode is None:
print('Warning!! the process has not terminated yet in analyze_fen()')
return bestmove, True if depth_cnt == 0 else False
def analyze_fen(engineName, fen, _eng_option, movetimev, multipvv, nshortPv):
""" This will output engine analysis in a list
and the score returned is side POV.
Returns None if engine does not search
"""
multipv_num = multipvv
record = []
engineIsUsingBook = True
# Execute the engine
p = subprocess.Popen(engineName, stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
p.stdin.write("uci\n")
for eline in iter(p.stdout.readline, ''):
eline = eline.strip()
if "uciok" in eline:
break
# Send the engine options except multipv
for n in _eng_option:
if "multipv" in n.lower():
pass
else:
p.stdin.write("setoption name %s\n" %(n))
# print('setoption name %s\n' %(n))
p.stdin.write("setoption name MultiPV value %d\n" %(multipvv))
p.stdin.write("isready\n")
for rline in iter(p.stdout.readline, ''):
rline = rline.strip()
if "readyok" in rline:
break
p.stdin.write("ucinewgame\n")
p.stdin.write("position fen " + fen + "\n")
# New command so that only 1 depth will be reported
p.stdin.write("go movetime " + str(movetimev) + "\n")
# Parse engine output
for eline in iter(p.stdout.readline, ''):
engine_output = eline.strip()
# Process engine analysis output
if "depth" in engine_output\
and ("score cp" in engine_output)\
or ("score mate" in engine_output)\
and "time" in engine_output\
and "pv" in engine_output\
and not "upperbound" in engine_output\
and not "lowerbound" in engine_output:
engineIsUsingBook = False
b = engine_output.split(' ')
i = b.index("depth")
depthv = int(b[i+1])
if "multipv" in engine_output:
i = b.index("multipv")
multipvv = int(b[i+1])
else:
multipvv = 1
i = b.index("time")
timev = int(b[i+1])
# Translate mate to value
if "mate" in engine_output:
i = b.index("score")
d2m = int(b[i+2])
scorev = mate_distance_to_value(d2m)
else:
i = b.index("score")
scorev = int(b[i+2])
# Split at pv
i = b.index("pv")
c = b[i+1:]
pvv = "None"
# Shorten pv
lenPv = len(c)
# If score is mate save all pv otherwise use nshortPv
if lenPv >= nshortPv and abs(scorev) < INF-MAX_PLY:
cc = b[i+1 : i+1+nshortPv]
d = ' '.join(cc)
pvv = d.strip()
else:
cc = b[i+1:]
d = ' '.join(cc)
pvv = d.strip()
# Record everything then sort later
record.append([depthv, multipvv, timev, scorev, pvv])
if "bestmove" in engine_output:
break
# Quit the engine
p.communicate('quit\n')
p.poll()
if p.returncode is None:
print('Warning!! the process has not terminated yet in analyze_fen()')
if engineIsUsingBook:
return None
# Save the engine analysis
final_list = get_summarized_pv(record, multipvv)
old_depth = 0
return_list = []
save_cnt = 0
for n in final_list:
scorev = n[3]
# Convert LAN pv to SAN
san_pv = ucipv_to_sanpv(fen, n[4])
analysis_line = ("%+0.2f/%d %s" %(float(n[3])/100, n[0], san_pv))
# Before saving the second pv make sure that the depth of the first pv
# is the same with the depth of the second pv
if save_cnt == 1:
if old_depth == n[0]:
return_list.append(analysis_line)
save_cnt += 1
else:
# Replace the old item
return_list[0] = analysis_line
elif save_cnt == 0:
return_list.append(analysis_line)
save_cnt += 1
if multipvv == 1:
break
if save_cnt == 2:
break
old_depth = n[0]
return return_list
def get_engine_id(enginefn):
""" Returns id name of an engine """
engine_idname = 'Engine'
p = subprocess.Popen(enginefn, stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
p.stdin.write('uci\n')
for eline in iter(p.stdout.readline, ''):
eline = eline.strip()
if 'id name' in eline:
a = eline.split()
engine_idname = ' '.join(a[2:])
elif 'uciok' in eline:
break
# Quit the engine
p.communicate('quit\n')
p.poll()
if p.returncode is None:
print('Warning!! the process has not terminated yet in get_engine_id()')
return engine_idname
def OnlyMove(s, anaMove, gameMove, anaValue1, anaValue2):
""" Returns true if this is the only move that is best and
other moves are bad
ONLY_MOVE_SCORE = 1.5 pawns
"""
if anaMove == gameMove:
if s == WHITE:
if anaValue1 - anaValue2 >= ONLY_MOVE_SCORE\
and anaValue1 >= -0.25 and anaValue1 < 3.0:
return True
else:
assert s == BLACK
if anaValue1 - anaValue2 <= -ONLY_MOVE_SCORE\
and anaValue1 <= +0.25 and anaValue1 > -3:
return True
return False
def analyze_games(argv):
""" argv is a list of option and values
['--file', 'bilbaomast16win.pgn', ...]
"""
# Init
sEngine = None
pgn_file = None
nHash = 32 # Default memory in mb
nThreads = 1
nMoveTime = 1000
nMultiPv = 1
nshortPv = 7
startFmvn = 2
lastFmvn = 200
outputFN = "analyzedGame.pgn"
gameCnt = 0
complexityTime = 1000
flag = 1
option_use_book = 0
book_fn = None
option_add_variation_margin = 0.15 # in cp
e_option = []
eng_option = []
option_player = None
lang = 'ENG' # 'GER', 'FRA'
option_use_cerebellum_book = 0
option_book_anno_only = 0
try:
opts, args = getopt.getopt(argv, "f", ["file=", "engine=", "movetime=",
"eoption=", "startmove=",
"endmove=", "bookfile=", "addvariationmargincp=",
"outfile=", "player=", "lang=", 'cerebellum=',
'bookannotationonly='])
except getopt.GetoptError as err:
print(str(err))
usage()
sys.exit(2)
for opt, arg in opts:
if opt in ("-f", "--file"):
pgn_file = arg
elif opt in ("--outfile"):
outputFN = arg
elif opt in ("--engine"):
sEngine = arg
elif opt in ("--player"):
option_player = arg
elif opt in ("--bookfile"):
book_fn = arg
option_use_book = 1
elif opt in ("--movetime"):
nMoveTime = int(arg)
elif opt in ("--startmove"):
startFmvn = int(arg)
elif opt in ("--endmove"):
lastFmvn = int(arg)
elif opt in ("--addvariationmargincp"):
option_add_variation_margin = int(arg)
elif opt in ("--eoption"):
e_option = arg.split(',')
elif opt in ("--lang"):
lang = arg
elif opt in ("--cerebellum"):
option_use_cerebellum_book = int(arg)
elif opt in ("--bookannotationonly"):
option_book_anno_only = int(arg)
# Clear the engine option of whitespace chars at beginning and ending
for n in e_option:
n = n.strip()
if 'Threads' in n:
nThreads = n.split(' ')
nThreads = int(nThreads[2])
eng_option.append(n)
# Exit if engine and input pgn file is missing
if sEngine is None:
print('Error!! engine filename was not defined')
usage()
sys.exit(1)
if pgn_file is None:
print('input pgn filename was not defined')
usage()
sys.exit(1)
engine_id = get_engine_id(sEngine)
option_add_variation_margin = float(option_add_variation_margin)/100.0
# Send warning of book is missing
if option_use_book and not os.path.isfile(book_fn):
print('Warning!! the required book \"%s\" was not found' % book_fn)
option_use_book = 0 # Set to 0
# nMoveTime is in ms
complexityTime = min(120000, max(100, nMoveTime/2))
# Open pgn file for reading
ifo = codecs.open(pgn_file, 'r', 'utf-8')
alt_index = 0
# Read the games in the pgn file one by one
game = chess.pgn.read_game(ifo)
while game != None:
gameCnt += 1
maxMoveNum = GetMaxMoveNumber(game)
Blunder = {}
Mistake = {}
Dubious = {}
Blunder['white'] = 0
Blunder['black'] = 0
Mistake['white'] = 0
Mistake['black'] = 0
Dubious['white'] = 0
Dubious['black'] = 0
if option_book_anno_only:
modelGameWhite = False
modelGameBlack = False
else:
modelGameWhite = True
modelGameBlack = True
# Randomize alternate comment
ALTER_COM = random_alternative(lang)
# Save result header for writing at end of a game
try:
hre = game.headers['Result']
except:
hre = '*'
wplayer = game.headers['White']
bplayer = game.headers['Black']
event = game.headers['Event']
datev = game.headers['Date']
roundv = game.headers['Round']
# Skip this game if player is not in the game
if option_player != None and option_player != wplayer\
and option_player != bplayer:
game = chess.pgn.read_game(ifo)
continue
# A model game comment can only be added for analyzed side
if option_player != None and option_player == wplayer:
modelGameBlack = False
elif option_player != None and option_player == bplayer:
modelGameWhite = False
# Save headers to output file
save_headers(game, outputFN, engine_id, nThreads,
nMoveTime)
game_node = game
# Loop thru the main moves and comments on this game
while len(game_node.variations):
side = game_node.board().turn
fmvn = game_node.board().fullmove_number
fmvn = int(fmvn)
next_node = game_node.variation(0)
move = next_node.move
uci_game_move = str(move)
sanMove = game_node.board().san(move)
strFEN = str(game_node.board().fen())
puzzleEpd = game_node.board().epd()
# Show game num and fen in console
print('Game: %d, maxMoveNum: %d' %(gameCnt, maxMoveNum))
print('FEN: %s' %(strFEN))
print('Player move: %s' %(sanMove))
# Init
threat_depth = 0
threatValue = BAD_SCORE
anaValue = BAD_SCORE
anaValue2 = BAD_SCORE
gameMoveValue = BAD_SCORE
anaPvMove = "None"
isOnlyMove = False
moveChanges = 0
writeAnalyzerBestLine = False
matePos = False
moveIsInPolyglotBook = False
moveIsInCereBook = False
anaPv2Len = 0
if option_player != None and ((option_player == wplayer and not side)\
or (option_player == bplayer and side)):
with codecs.open(outputFN, 'a', 'utf-8') as f:
if side == WHITE:
f.write('%d. %s ' %(fmvn, game_node.board().san(next_node.move)))
else:
f.write('%s ' %(game_node.board().san(next_node.move)))
game_node = next_node
continue # Parse the next pos in this game
# Probe polyglot book, don't analyze if a game move is in the book
if option_use_book:
bestPolyBookMove = None
ployBookCnt = 0
with chess.polyglot.open_reader(book_fn) as reader:
for entry in reader.find_all(game_node.board()):
ployBookCnt += 1
book_move = str(entry.move())
if ployBookCnt == 1:
bestPolyBookMove = book_move
if book_move == uci_game_move:
moveIsInPolyglotBook = True
break
if moveIsInPolyglotBook:
with codecs.open(outputFN, 'a', 'utf-8') as f:
if side == WHITE:
f.write('%d. %s {%s %s} ' %(fmvn, game_node.board().san(next_node.move),
MOVE_FROM_COMMENT[lang].decode('utf8'), book_fn))
else:
f.write('%d...%s {%s %s} ' %(fmvn, game_node.board().san(next_node.move),
MOVE_FROM_COMMENT[lang].decode('utf8'), book_fn))
game_node = next_node
continue # Parse the next pos in this game
elif bestPolyBookMove is not None:
with codecs.open(outputFN, 'a', 'utf-8') as f:
tempBoard = game_node.board()
tempBoard.push_uci(bestPolyBookMove)
move = tempBoard.pop()
san_move = tempBoard.san(move)
book_comment = '%s %s %s' %(book_fn, BOOK_RECOMMENDS_COMMENT[lang].decode('utf8'), san_move)
if side == WHITE:
f.write('%d. %s {%s} ' %(fmvn, game_node.board().san(next_node.move), book_comment))
else:
f.write('%d...%s {%s} ' %(fmvn, game_node.board().san(next_node.move), book_comment))
game_node = next_node
continue
# Use cerebellum book
elif option_use_cerebellum_book:
moveTimeMs = 100
multiPVNum = 1
pvLenNum = 1
bestmove, validCereBook = get_cerebellum_book_move(sEngine,
strFEN, eng_option, moveTimeMs, multiPVNum, pvLenNum)
if bestmove == uci_game_move and validCereBook:
moveIsInCereBook = True
if moveIsInCereBook:
with codecs.open(outputFN, 'a', 'utf-8') as f:
if side == WHITE:
f.write('%d. %s {%s cerebellum} ' %(fmvn, game_node.board().san(next_node.move), MOVE_FROM_COMMENT[lang].decode('utf8')))
else:
f.write('%d...%s {%s cerebellum} ' %(fmvn, game_node.board().san(next_node.move), MOVE_FROM_COMMENT[lang].decode('utf8')))
game_node = next_node
continue
elif validCereBook:
with codecs.open(outputFN, 'a', 'utf-8') as f:
tempBoard = game_node.board()
tempBoard.push_uci(bestmove)
move = tempBoard.pop()
san_move = tempBoard.san(move)
book_comment = 'Cerebellum %s %s' %(BOOK_RECOMMENDS_COMMENT[lang].decode('utf8'), san_move)
if side == WHITE:
f.write('%d. %s {%s} ' %(fmvn, game_node.board().san(next_node.move), book_comment))
else:
f.write('%d...%s {%s} ' %(fmvn, game_node.board().san(next_node.move), book_comment))
game_node = next_node
continue
# If book annotation only
if option_book_anno_only:
with codecs.open(outputFN, 'a', 'utf-8') as f:
if side == WHITE:
f.write('%d. %s ' %(fmvn, game_node.board().san(next_node.move)))
else:
f.write('%s ' %(game_node.board().san(next_node.move)))
game_node = next_node
continue # Parse the next pos in this game
# Analyze pos if fmvn is within startFmvn and lastFmvn input from user
if fmvn >= startFmvn and fmvn <= lastFmvn:
# (0) Get the score of the game move by running the engine.
# Invert the score after the analysis since we are analyzing fen + move,
# and invert the score if current side is black too
# because we use white POV (point of view) and engine is analyzing at side POV
# Use temp so we will not mess with the current board
tempBoard = game_node.board()
tempBoard.push(move) # make the move on the temp board
# Don't send position to analyze without a legal move
if not game_node.board().is_checkmate()\
and not game_node.board().is_stalemate()\
and not tempBoard.is_checkmate()\
and not tempBoard.is_stalemate():
tFEN = str(tempBoard.fen())
mpv = 1
# Get the score/depth pv <moves> in a list, list[0] = 1st pv,
# The expected return value is,
# "+0.89/11 32. Nc6 Nh5 33. Qf2 Qd1 34. Nb4", for nshortPv = 5
gameMoveAnalysisList = analyze_fen(sEngine,
tFEN,
eng_option,
nMoveTime,
mpv,
nshortPv)
# If engine does not return a search info then just write the move
# This happens when the engine used is using its own book
if gameMoveAnalysisList is None:
with codecs.open(outputFN, 'a', 'utf-8') as f:
if side == WHITE:
f.write('%d. %s {No search output from Annotator} ' %(fmvn, game_node.board().san(next_node.move)))
else:
f.write('%d...%s {No search output from Annotator} ' %(fmvn, game_node.board().san(next_node.move)))
game_node = next_node
continue
gameMoveAnalysis = gameMoveAnalysisList[0]
# The return value is from the point of view of the opponent,
# so we must negate it before comparing with engine analysis score
# gameMoveValue is in pawn unit and is of type float, it is also WPOV
gameMoveValue, gameMoveDepth = get_score_and_depth(gameMoveAnalysis, side)
# Write to console as update
print('Engine analysis of player move: %+0.2f/%d\n'\
%(gameMoveValue, gameMoveDepth))
# Analyze position to get engine recommendation
# (1) Get complexity of the position using multipv 1,
# use 1s or nominal search time entered by user
if gameMoveValue != BAD_SCORE and (gameMoveValue >= -0.50 and side == WHITE)\
or (gameMoveValue <= 0.50 and side == BLACK):
complexityMultiPV = 1
moveChanges, matePos = analyze_complexity(sEngine,
strFEN, eng_option,
complexityTime,
complexityMultiPV, nshortPv)
# Save complex position
if moveChanges >= 5:
with open('complex.epd', 'a') as f:
f.write('%s sm %s; Wpl \"%s\"; Bpl \"%s\"; Eve \"%s\"; Dat \"%s\"; Rou \"%s\"; Moc %d;\n'\
%(puzzleEpd, sanMove, wplayer, bplayer, event, datev, roundv, moveChanges))
# (2) Get the engine analysis when engine is to move in this position
if not game_node.board().is_checkmate()\
and not game_node.board().is_stalemate():
nMultiPv = 2
# Increase engine time when move changes >= 3
# and decrease analysis time when pos is easy
newAllocTime = nMoveTime
if moveChanges >= 3:
newAllocTime = nMoveTime * min((moveChanges-1), 3)
elif moveChanges <= 0:
newAllocTime = max(100, newAllocTime/2)
# If position has mate score then we extend the pv length,
# this is only applicable for pv1
pvLen = nshortPv
if matePos:
pvLen = 200 # nshortPv
analysisList = analyze_fen(sEngine, strFEN, eng_option,
newAllocTime, nMultiPv, pvLen)
# If engine does not return a search info then just write the move
# This happens when the engine used is using its own book
if analysisList is None:
with codecs.open(outputFN, 'a', 'utf-8') as f:
if side == WHITE:
f.write('%d. %s {No search output from Annotator} ' %(fmvn, game_node.board().san(next_node.move)))
else:
f.write('%d...%s {No search output from Annotator} ' %(fmvn, game_node.board().san(next_node.move)))
game_node = next_node
continue
# Get score, depth, and pv of the 1st pv line from multipv
# anaValue is white POV
analysisData = analysisList[0]
anaValue, anaDepth, anaPvMove, anaPv = get_engine_detailed_data(analysisData, side)
# Add model comment if there is no blunder
if (anaValue - gameMoveValue > MODEL_GAME_MARGIN) and side==WHITE:
modelGameWhite = False
elif (anaValue - gameMoveValue < -MODEL_GAME_MARGIN) and side==BLACK:
modelGameBlack = False
# Get score, depth and pv of the 2nd pv if there is
# There is a possibility that a multi pv will not return 2nd pv
if len(analysisList) > 1:
analysisData2 = analysisList[1]
anaValue2, anaDepth2, anaPvMove2,\
anaPv2 = get_engine_detailed_data(analysisData2, side)
anaPv2List = anaPv2.split(' ')
anaPv2Len = len(anaPv2List)
# If move is singular
isOnlyMove = OnlyMove(side, anaPvMove, sanMove, anaValue, anaValue2)
# (3) Check if analyzer best line is to be appended to the game
# ANALYSIS_MARGIN = 10.0 pawns
if anaPvMove == sanMove or gameMoveValue == BAD_SCORE or anaValue == BAD_SCORE\
or (abs(gameMoveValue) >= ANALYSIS_MARGIN and abs(anaValue) >= ANALYSIS_MARGIN)\
or ((anaValue - gameMoveValue < option_add_variation_margin and side == WHITE) or\
(anaValue - gameMoveValue > -option_add_variation_margin and side == BLACK)):
writeAnalyzerBestLine = False
else:
writeAnalyzerBestLine = True
# Find the threat of the last move of opp by doing a null move
# from this current position. If this value is positive then
# the current side to move is in trouble because by doing
# nothing the opponent gains score. This will also detect initiative
if not game_node.board().is_check() and not game_node.board().is_stalemate():
tempBoardt = game_node.board()
tempBoardt.push(move.null()) # Send null move
tFENt = str(tempBoardt.fen())
nMultiPv = 1
gameMoveThreatList = analyze_fen(sEngine, tFENt, eng_option,\
nMoveTime, nMultiPv, nshortPv)
if gameMoveThreatList is not None:
gameMoveThreat = gameMoveThreatList[0]
# gameMoveThreat = +0.00/20 27.Rc4 b6 28.Rc3 Rh1 29.a4 Rh2+ 30.Kf3
threatPvStr = gameMoveThreat.split(' ')
tpvlen = len(threatPvStr)
# Display odd number of moves in the pv, the first item in threatPvStr is score/depth
if tpvlen >= 3:
if tpvlen%2 == 0:
threatPv = ' '.join(threatPvStr[1:])
else:
threatPv = ' '.join(threatPvStr[1:-1])
else:
threatPv = ' '.join(threatPvStr[1:])
threatEval = threatPvStr[0]
threatEvalSplit = threatEval.split('/')
threatValue = float(threatEvalSplit[0])
threat_depth = int(threatEvalSplit[1])
# (4) (a) Write singular move symbol or (b) alternative bad lines
# or (c) good or very good move symbols to a game move
if not writeAnalyzerBestLine:
with codecs.open(outputFN, 'a', 'utf-8') as f:
# If position is complex
if anaPvMove == sanMove and abs(anaValue) < +6.0\
and abs(anaValue2) < +6.0:
# If moveChanges is high add !! to the gameMoveNag, if low just add !
gameMoveNag = None
if moveChanges >= 5 and abs(gameMoveValue) >= +1.0:
gameMoveNag = '$3'
elif moveChanges >= 3 and abs(gameMoveValue) >= +1.0:
gameMoveNag = '$1'
writeInferiorLine = False
# Write inferior line if pv2 score is not too close and not too far from pv1 score
if (side == WHITE and anaValue - anaValue2 >= +option_add_variation_margin\
and anaValue - anaValue2 <= 1.0) or\
(side == BLACK and anaValue - anaValue2 <= -option_add_variation_margin\
and anaValue - anaValue2 >= -1.0):
writeInferiorLine = True
posNag = position_nags(anaValue2)
gamePosNag = position_nags(gameMoveValue)
pv2MoveNag = one_value_move_nags(side, anaValue2)
if pv2MoveNag is not None:
# Get the move in pv2 and add a NAG
anaPv2Rev = anaPv2.split(' ')
# There must be more than 1 move in pv
if len(anaPv2Rev) >= 2:
pv2_move = anaPv2Rev[0]
pv2_move = pv2_move + ' ' + pv2MoveNag + ' { ' + random_reason(lang) + ' } '
# mvRem should be odd with minimum 1
if (len(anaPv2Rev) - 1) % 2 == 0:
mvRem = ' '.join(anaPv2Rev[1:-1])
else:
mvRem = ' '.join(anaPv2Rev[1:])
newAnaPv2 = pv2_move + ' ' + mvRem
# Get random bad comment and append it before the pv2
badComment = random_bad(lang)
# Write the bad variation depends on white and black
if gameMoveNag is None:
if side == WHITE:
f.write('%d. %s %s {%+0.2f/%d; %s} (%s %s {%+0.2f/%d}) '\
%(fmvn, sanMove,
gamePosNag, gameMoveValue, gameMoveDepth,
badComment,
newAnaPv2, posNag, anaValue2, anaDepth2))
else:
f.write('%s %s {%+0.2f/%d; %s} (%s %s {%+0.2f/%d}) '\
%(sanMove,
gamePosNag, gameMoveValue, gameMoveDepth,
badComment,
newAnaPv2, posNag, anaValue2, anaDepth2))
else:
if side == WHITE:
f.write('%d. %s %s %s {%+0.2f/%d; %s} (%s %s {%+0.2f/%d}) '\
%(fmvn, sanMove,
gameMoveNag, gamePosNag, gameMoveValue, gameMoveDepth,
badComment,
newAnaPv2, posNag, anaValue2, anaDepth2))
else:
f.write('%s %s %s {%+0.2f/%d; %s} (%s %s {%+0.2f/%d}) '\
%(sanMove,
gameMoveNag, gamePosNag, gameMoveValue, gameMoveDepth,
badComment,
newAnaPv2, posNag, anaValue2, anaDepth2))
# if writing inferior line is not possible
if not writeInferiorLine or pv2MoveNag is None or len(anaPv2Rev) < 2:
if gameMoveNag is None:
if side == WHITE:
f.write('%d. %s ' %(fmvn, sanMove))
else:
f.write('%s ' %(sanMove))
else:
if side == WHITE:
f.write('%d. %s %s ' %(fmvn, sanMove, gameMoveNag))
else:
f.write('%s %s ' %(sanMove, gameMoveNag))
# else if easy move
else:
if isOnlyMove:
assert anaPvMove == sanMove
# $7 = Singular move comment
if side == WHITE:
f.write('%d. %s %s ' %(fmvn, sanMove, "$7"))
else:
f.write('%s %s ' %(sanMove, "$7"))
else: # Write the game move only
if side == WHITE:
f.write('%d. %s ' %(fmvn, sanMove))
else:
f.write('%s ' %(sanMove))
# Else write the pv as suggested by the engine
else:
assert writeAnalyzerBestLine
# Get position NAG for pv. The pv is a line based from engine
PvPosNag = position_nags(anaValue)
# Get move NAG for game move
assert sanMove != anaPvMove
gameMoveNag = move_nags(side, anaValue, gameMoveValue)
# Get position NAG for position after this game move
gamePosNag = position_nags(gameMoveValue)
# Select a comment based on difference between engine score and game move score
goodComment = get_good_comment(anaValue, gameMoveValue, side, lang)
with codecs.open(outputFN, 'a', 'utf-8') as f:
# If game move pos assessment is a mate due to perhaps of
# a blunder then show +/-M, instead of score/depth
move_score_val = "%+0.2f" % gameMoveValue
posGameMoveComment = str(move_score_val) + '/' + str(gameMoveDepth)
if (int(100*gameMoveValue) >= INF-MAX_PLY) or (int(100*gameMoveValue) <= -INF+MAX_PLY):
assert gameMoveValue != BAD_SCORE
num_mate = value_to_mate(100*gameMoveValue)
assert num_mate != 0
smate = mate_indicator(num_mate)
posGameMoveComment = smate
# If pv1 score is a mate then show +/-M, instead of score/depth
pv1_score_val = "%+0.2f" % anaValue
posPv1Comment = str(pv1_score_val) + '/' + str(anaDepth)
pv1MateScore = False
if (int(100*anaValue) >= INF-MAX_PLY and side == WHITE) or\
(int(100*anaValue) <= -INF+MAX_PLY and side == BLACK):
assert anaValue != BAD_SCORE
num_mate = value_to_mate(100*anaValue)
assert num_mate != 0
smate = mate_indicator(num_mate)
posPv1Comment = smate
pv1MateScore = True
# Break down the pv to get the first move
pv1_split = anaPv.split(' ')
# Get the first move in the pv including the move number
pv1_move = pv1_split[0]
# Insert the pv1_move_nag after the first move
if pv1MateScore:
new_mv = pv1_move + ' ' + '{with mate attack} '
else:
new_mv = pv1_move + ' '
# Reconstruct the pv line
new_anaPv = new_mv + ' '.join(pv1_split[1:])
# Write the game move and pv variation
if side == WHITE:
if pv1MateScore:
if gameMoveNag is None:
f.write('\n%d. %s %s {%s} ({%s} %s %s) '\
%(fmvn, game_node.board().san(next_node.move),
gamePosNag, posGameMoveComment,
goodComment, new_anaPv, PvPosNag))
else:
f.write('\n%d. %s %s %s {%s} ({%s} %s %s) '\
%(fmvn, game_node.board().san(next_node.move), gameMoveNag,
gamePosNag, posGameMoveComment,
goodComment, new_anaPv, PvPosNag))
else:
if gameMoveNag is None:
f.write('\n%d. %s %s {%s} ({%s} %s %s {%s}) '\
%(fmvn, game_node.board().san(next_node.move),
gamePosNag, posGameMoveComment,
goodComment, new_anaPv, PvPosNag, posPv1Comment))
else:
f.write('\n%d. %s %s %s {%s} ({%s} %s %s {%s}) '\
%(fmvn, game_node.board().san(next_node.move), gameMoveNag,
gamePosNag, posGameMoveComment,
goodComment, new_anaPv, PvPosNag, posPv1Comment))
else: # side is black
if pv1MateScore:
if gameMoveNag is None:
f.write('\n%d... %s %s {%s} ({%s} %s %s) '\
%(fmvn, game_node.board().san(next_node.move),
gamePosNag, posGameMoveComment,
goodComment, new_anaPv, PvPosNag))
else:
f.write('\n%d... %s %s %s {%s} ({%s} %s %s) '\
%(fmvn, game_node.board().san(next_node.move), gameMoveNag,
gamePosNag, posGameMoveComment,
goodComment, new_anaPv, PvPosNag))
else:
if gameMoveNag is None:
f.write('\n%d... %s %s {%s} ({%s} %s %s {%s}) '\
%(fmvn, game_node.board().san(next_node.move),
gamePosNag, posGameMoveComment,
goodComment, new_anaPv, PvPosNag, posPv1Comment))
else:
f.write('\n%d... %s %s %s {%s} ({%s} %s %s {%s}) '\
%(fmvn, game_node.board().san(next_node.move), gameMoveNag,
gamePosNag, posGameMoveComment,
goodComment, new_anaPv, PvPosNag, posPv1Comment))
# If the game move is not the same to that of pv2 move then write it as variation,
# depending on the pv2 score and game move score
if anaPvMove2 != sanMove and anaValue2 != BAD_SCORE and anaPv2Len >= 2:
# Get pos nag of pv2
pv2PosNag = position_nags(anaValue2)
# If pv2 score is equal or better than the game move score then write
# it as a playable alternative line
if (side == WHITE and anaValue2 >= gameMoveValue) or\
(side == BLACK and anaValue2 <= gameMoveValue):
if (side == WHITE and anaValue2 >= -ONLY_MOVE_SCORE) or\
(side == BLACK and anaValue2 <= +ONLY_MOVE_SCORE):
# If pv1 showed that this has a mate score then check
# if pv2 is also showing mate score, otherwise cut the pv2 length
# to nshortPv = 7 plies, as we know we extend the pv length
# when there is a mate score from pv1
if (int(100*anaValue2) >= INF-MAX_PLY and side == WHITE) or\
(int(100*anaValue2) <= -INF+MAX_PLY and side == BLACK):
# Convert score to mate number
num_mate = value_to_mate(100*anaValue2)
assert num_mate != 0
smate = mate_indicator(num_mate)
posPv2Comment = smate
com_val, alt_index = get_alternative_comment(ALTER_COM, alt_index, lang)
f.write('\n({ %s } %s %s {%s}) '\
%(com_val, anaPv2, pv2PosNag, posPv2Comment))
else:
# Else if not mate score Reduce the pv length to nshortPv = 7 plies (default)
new_ana_pv2 = anaPv2.split(' ')
# new_ana_pv2 should be odd
if len(new_ana_pv2) % 2 == 0:
new_ana_pv2 = ' '.join(new_ana_pv2[:-1])
else:
new_ana_pv2 = ' '.join(new_ana_pv2[0:])
# new_ana_pv2 = ' '.join(new_ana_pv2[:nshortPv])
com_val, alt_index = get_alternative_comment(ALTER_COM, alt_index, lang)
f.write('\n({ %s } %s %s {%+0.2f/%d}) '\
%(com_val, new_ana_pv2, pv2PosNag, anaValue2, anaDepth2))
# else if pv2MoveScore < gameMoveScore
else:
# Add move nag to the first move of pv2
anaPv2MoveNag = one_value_move_nags(side, anaValue2)
if anaPv2MoveNag is not None:
# new_ana_pv2 = anaPv2.split(' ')
anaPv2List = anaPv2.split(' ')
if len(anaPv2List) >= 2:
# mvRem should be odd with minimum 1
if (len(anaPv2List) - 1) % 2 == 0:
mvRem = ' '.join(anaPv2List[1:-1])
else:
mvRem = ' '.join(anaPv2List[1:])
anaPv2WithReason = anaPvMove2 + ' %s { %s } ' % (anaPv2MoveNag, random_reason(lang))
# Cut 1 ply at end of pv, to emphasize that the other side is the last mover
newAnaPv2 = anaPv2WithReason + mvRem
badComment = random_bad(lang)
f.write('\n({ %s } %s %s {%+0.2f/%d}) '\
%(badComment, newAnaPv2, pv2PosNag, anaValue2, anaDepth2))
# Print the threat pv if score of opponent or last move is good
if False and threatValue > 0.0 and threatValue != BAD_SCORE:
# Translate threatValue to white pov
# Use side == WHITE because we do a null move
wpov_threatValue = threatValue
if side == WHITE:
wpov_threatValue = -1*threatValue
if int(100*threatValue) >= +INF-MAX_PLY:
num_mate = value_to_mate(100*threatValue)
if side == WHITE:
f.write('\n({%s %d} %d. %s %s) '\
%(BLACK_MATE_THREAT_COMMENT[lang], abs(num_mate), fmvn, '--', threatPv))
else:
f.write('\n({%s %d} %d... %s %s) '\
%(WHITE_MATE_THREAT_COMMENT[lang], abs(num_mate), fmvn, '--', threatPv))
else:
posNag = position_nags(wpov_threatValue)
if side == WHITE:
f.write('\n({%s} %d. %s %s %s {%+0.2f/%d}) '\
%(BLACK_THREAT_COMMENT[lang], fmvn, '--',
threatPv, posNag, wpov_threatValue, threat_depth))
else:
f.write('\n({%s} %d... %s %s %s {%+0.2f/%d}) '\
%(WHITE_THREAT_COMMENT[lang], fmvn, '--',
threatPv, posNag, wpov_threatValue, threat_depth))
# Record blunders and mistakes for summary
if anaPvMove != sanMove and writeAnalyzerBestLine:
mnag = move_nags(side, anaValue, gameMoveValue)
# $4=??, $2=?, $6=?!
if side and mnag == '$4':
Blunder['white'] += 1
elif side and mnag == '$2':
Mistake['white'] += 1
elif side and mnag == '$6':
Dubious['white'] += 1
elif not side and mnag == '$4':
Blunder['black'] += 1
elif not side and mnag == '$2':
Mistake['black'] += 1
elif not side and mnag == '$6':
Dubious['black'] += 1
game_node = next_node # Read next position of this game
# Print result at the end of notation
with codecs.open(outputFN, 'a', 'utf-8') as output_fo:
# Add mode game comment only when all moves are analyzed
if lastFmvn >= maxMoveNum:
if modelGameWhite and modelGameBlack:
output_fo.write('{%s}\n' %(WHITE_BLACK_MODEL_COMMENT[lang]))
elif modelGameWhite:
output_fo.write('{%s}\n' %(WHITE_MODEL_COMMENT[lang]))
elif modelGameBlack:
output_fo.write('{%s}\n' %(BLACK_MODEL_COMMENT[lang]))
# output_fo.write('{WhiteBlunder: %d, BlackBlunder: %d}\n' %(Blunder['white'], Blunder['black']))
output_fo.write('{WBlunder: %d, WMistake: %d, WDubious: %d, BBlunder: %d, BMistake: %d, BDubious: %d} %s\n\n'\
%(Blunder['white'], Mistake['white'], Dubious['white'],
Blunder['black'], Mistake['black'], Dubious['black'], hre))
game = chess.pgn.read_game(ifo) # Read next game of the pgn file
ifo.close()
print("\nDone!!")
def main(argv):
""" start """
print(APP_NAME + ' v' + APP_VERSION + '\n')
# argv is a list of option and values
# ['--file', 'bilbaomast16win.pgn',
# '--engine', 'stockfish_120716_x64_modern.exe',
# '--eoption', 'Hash value 128, Threads value 1']
analyze_games(argv)
if __name__ == "__main__":
main(sys.argv[1:])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.