Created
March 15, 2016 16:42
-
-
Save markjenkins/10bf49cad123d503ed47 to your computer and use it in GitHub Desktop.
winninglines
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
PRED_VAL, PRED_DEPTH = range(2) | |
# abreviated version of FEN, we don't include the en-pessant unless available | |
# as for clock times, we're keeping the lowest seen per state, not making | |
# it unique | |
START_STATE = 'rnbqkbnr/pppppppp/8/8/8/8/PPPPPPPP/RNBQKBNR w KQkq' | |
def pawn_move_or_capture(first_state, second_state): | |
"""determine if the transition from first_state to second_state entailed | |
a pawn capture | |
""" | |
# need to fill this in | |
return False | |
class Node(object): | |
def __init__(self, state_desc, owner): | |
self.state_desc = state_desc | |
self.owner = owner | |
# makes it easy to make a variation using | |
self.init_successors_and_predecessors() | |
# only true of start state, every other state is at least 1 | |
self.least_ply_count_to_here = 0 | |
# reset each time there is a pawn move | |
self.least_ply_since_pawn_move_or_capture = 0 | |
def init_successors_and_predecessors(self): | |
self.successors = set() | |
self.predecessors = set() | |
def __hash__(self): | |
return hash(self.state_desc) | |
def add_preds(self, preds): | |
"""Takes care of the doubly linking by doing the add_successors | |
of this state to the predecessors | |
""" | |
old_least_ply_count_to_here = self.least_ply_count_to_here | |
# reset each time there is a pawn move | |
old_least_ply_since_pawn_move_or_capture = \ | |
self.least_ply_since_pawn_move_or_capture | |
for pred in preds: | |
# if least_ply_count_to_here hasn't been set yet and we're not | |
# the start stage (a reason to leave it) | |
# | |
# of, if one of the newer predecessors has a lower count | |
pred_plus = pred.least_ply_count_to_here+1 | |
if ( (self.least_ply_count_to_here == 0 and | |
not self.is_start_state() and) or | |
pred_plus < self.least_ply_count_to_here ): | |
self.least_ply_count_to_here = pred_plus | |
pawn_move_or_capture_after_pred = ( | |
0 if pawn_move_or_capture(pred, self) | |
else pred.least_ply_since_pawn_move_or_capture + 1 ) | |
if (pawn_move_or_capture_after_pred < | |
self.least_ply_since_pawn_move_or_capture): | |
self.least_ply_since_pawn_move_or_capture = \ | |
pawn_move_or_capture_after_pred | |
assert( self.least_ply_count_to_here >= 0 ) | |
assert( self.least_ply_since_pawn_move_or_capture >= 0 ) | |
# major assumption (worth testing) | |
# is that we don't need to do this first before | |
# grabbing ply counts of successors, in particular, that our | |
# precessor's ply counts can't possibly drop by way of this | |
# this state having gained predecessors and them each gaining | |
# this state as a sucessor | |
pred.successors.add(self) | |
# probably more effecient than a bunch of .add operations in | |
# above loop, more optimization perhaps for underlying store growing | |
# in a less piece by piece way | |
self.predecessors.update(preds) | |
return (old_least_ply_count_to_here, | |
old_least_ply_since_pawn_move_or_capture) | |
def is_start_state(self): | |
return self.state_desc == START_STATE | |
class GameTree(object): | |
def claim_node(owner, state_desc): | |
# find all possible legal succesor states early one | |
# all_sucessors = | |
# now, after that we can start db transaction | |
# start DB transaction, the following reads and computations are | |
# critical path for transaction | |
# confirm that state_desc and mirror/inverted equivilent versions of it | |
# aren't already present, otherwise deny claim | |
# look up all the states that have this as a successor, | |
# our prededcedors. Include inversions of this state in that particular | |
# search | |
predecessors = set() | |
# there better be one, or else it's an orhan state, of no interest to us | |
if len(predecessors) == 0: | |
# perhaps this case should be handled by an exception? | |
return # you can't claim a state with no predecessors | |
new_node = Node(state_desc, owner) | |
# overall stragegy now | |
# 1) create links (and establish ply count) | |
# 2) filter sucessors | |
# 3) update ply counts of successors | |
# 4) update status | |
# 1) | |
new_node.add_preds(precessors) | |
# 2) | |
# filter all_sucessors down to the successors that already exist, | |
# including, inversions | |
successors = set() | |
# 3) | |
# assumption (worth testing), we don't need to do this before the above, | |
# there's no way that connecting this to our successors should screw up | |
# the recursively defined counts in the above | |
# | |
# and, with that out of the way, it should be no problem | |
for successor in sucessors: | |
(old_least_ply_count_to_here, \ | |
old_least_ply_since_pawn_move_or_capture) = \ | |
successor.add_preds( set( (new_node,) ) ) | |
# reverse the effect of | |
# old_least_ply_count_to_here had on the score and incorporate new | |
# sucessor.least_ply_count_to_here | |
# There's sanity concerns worth being concerned about here again | |
# too, does the change in sucessor.least_ply_count_to_here | |
# at all effect new_node.least_ply_count_to_here | |
# 4) | |
# we can't rely on the status cache on the way up because | |
# new_node itself may have created some incoherence in those caches | |
# byway of some kind of loopback | |
# But, by doing so, we can rely on the cache on the way down | |
# | |
# hopefully some tests bear this all out | |
already_visited = set() | |
status_changed = set( (new_node,) ) | |
self.update_status_of_node_by_way_of_sucessors( | |
new_node, already_visited, status_changed, cache=False) | |
# one notable thing is that above only goes up and doesn't fix | |
# predecessors of those above, so we'll be accumulating them to | |
# adjustment (cache drive) as a stage two | |
self.update_status_of_nodes_predecessors(new_node, already_visited) | |
# for each state that was a predesor of sucessors fix them... | |
def update_status_of_node_by_way_of_sucessors( | |
self, node, already_visited, status_changed, cache=False): | |
already_visited.add(node) | |
if not cache : | |
for successor in node.sucessors: | |
if successor not in already_visited: | |
self.update_status_of_node_by_way_of_sucessors( | |
successor, | |
already_visited, status_changed, cache=False) | |
# set, my current state | |
# | |
# establish my new status, | |
# if I wasn't challanged, but now one of my successors | |
# is now unchallanged , then Im now challanged by the existnce | |
# of said successor | |
# | |
# if I was challanged, but now, all of my successors are | |
# challanged, I'm no longer challanged, huzah! | |
# | |
# otherwise, no state change | |
# my_new_status= | |
# AND update sets in this regard | |
# | |
# here's an interesting thing, generally we can rely on the cached | |
# status of a successor, but not if it has gained a successor | |
# (directly or indirect) due to the new state being introduce | |
# | |
# (remember, my sucessors can actually have me as a sucessor too!) | |
# if my status changed, | |
# status_changed.add(node) | |
# the manner in which this state contributed | |
# the other owner's score before needs to be removed, and new manner | |
# applied | |
# | |
# of course, not happening if nobbody owns me | |
if node.owner != None: | |
pass # do score adjustment | |
# note, it's up to caller to adjust my predecesors! | |
def update_status_of_nodes_predecessors(self, node, already_visited): | |
already_visited.add(node) | |
for pred in new_node.predecessors: | |
self.update_status_of_node_by_way_of_sucessors( | |
pred, use_cached_status_of_sucessors=True) | |
self.update_status_of_nodes_predecessors(pred, already_visited) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def create_new_position(old_position, backstate, reset_half_count): | |
return expand_backstates_inposition( | |
frozenset(), old_position, backstate, reset_half_count) | |
def add_backstate_to_position(position, old_position, backstate, | |
reset_half_count): | |
return position.union( | |
(backstate, 0 if reset_half_count else back_count+1) | |
for old_back_state, back_count in old_position ) | |
def own_position(state, position, own_index, score_index): | |
assert( state not in own_index and state not in score_index ) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# @Authors Mark Jenkins <mark@parit.ca> | |
BOARD, TURN, CASTLE_AND_EN, HALF_CLOCK = range(4) | |
WHITE_TURN = 'w' | |
BLACK_TURN = 'b' | |
def advance_counters(state, no_reset_half_clock=True): | |
"""Return new fen state tuple with TURN, HALF_CLOCK incremented | |
Can also reset HALF_CLOCK | |
""" | |
return ( | |
state[BOARD], | |
WHITE_TURN if state[TURN] == BLACK_TURN else BLACK_TURN, | |
state[CASTLE_AND_EN], | |
state[HALF_CLOCK] + 1 if no_reset_half_clock else 0, | |
) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from persistent.set | |
BACK_POSITIONS, SUB_POSITIONS = xrange(1) | |
def new_position(origin): | |
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from unittest import TestCase, main | |
""" ____ 9---------------- | |
/ b | | |
| 5 7 | | |
| w null | | |
| 3 4 6 | | |
b b b b\ \----null | | |
1 2 \ null | | | |
w w | 8 10-w-- | |
O ----------- | |
b | |
""" | |
CHOICE_TO_BE_BLACK = ('b', 0) | |
forward_moves_in_insertion_order = ( | |
( CHOICE_TO_BE_BLACK, ('w', 1), ), | |
( CHOICE_TO_BE_BLACK, ('w', 2), ), | |
( ('w', 2), ('b', 3), ), | |
( ('w', 2), ('b', 4), ), | |
( ('b', 4), ('w', 5), ), | |
( ('w', 2), ('b', 6) ), | |
( ('w', 1), ('b', 9 ) ), | |
( ('w', 10), ('b', 9) ), | |
) | |
forward_moves = {} | |
backward_moves = {} | |
def add_allowable_forward_and_backward_moves(moves): | |
for prev_state, new_state in moves: | |
if prev_state not in forward_moves: | |
forward_moves[prev_state] = () | |
forward_moves[prev_state] += (new_state,) | |
if new_state not in backward_moves: | |
backward_moves[new_state] = () | |
backward_moves[new_state] += (prev_state,) | |
add_allowable_forward_and_backward_moves(forward_moves_in_insertion_order) | |
add_allowable_forward_and_backward_moves( ( | |
( ('b', 6), ('w', 7) ), | |
( ('w', 8), ('b', 6), ), | |
( ('w', 5), ('b', 9) ), | |
( ('w', 10), ('b', 6) ), | |
( ('b', 4), ('w', 10), ), | |
)) | |
class Position(object): | |
def __init__(self, state, owner, winning=True): | |
self.state = state | |
self.owner = owner | |
self.winning = winning | |
class Contributor(object): | |
def __init__(self, name): | |
self.name = name | |
self.groups = [] | |
# eventually someday... | |
#self.drawn_positions = set() | |
self.defended_positions = set() | |
self.challanged_positions = set() | |
def engine_create_base_contributors_for_players(): | |
contributors = {'w': Contributor('w'), | |
'b': Contributor('b') } | |
return contributors | |
def engine_get_base_state(): | |
return CHOICE_TO_BE_BLACK | |
def engine_get_player_implied_by_state(state): | |
return state[0] | |
def engine_get_descendant_states(state): | |
return () if state not in forward_moves else forward_moves[state] | |
def engine_get_ancestor_states(state): | |
return () if state not in backward_moves else backward_moves[state] | |
def get_all_descendants_of_position(position, state_map): | |
visited_states = set( (position.state,) ) | |
current_states = set(visited_states) | |
# ahhh, the avoidance of recursion, got to love it | |
while len(current_states) > 0: | |
next_states = set( | |
desc_state | |
for desc_state in engine_get_descendant_states(state) | |
for state in current_states | |
if desc_state in state_map and desc_state not in visited_states | |
) | |
for state in next_states: | |
yield state | |
visisted_states.update(next_states) | |
current_states = next_states | |
def position_unchallanged(position, state_map): | |
return all( | |
decendant_state not in state_map | |
for decendant_state in | |
engine_get_descendant_states(position.state) ) | |
def get_unchallanged_descendants_of_position(position, state_map): | |
return (decendant | |
for decendant in | |
get_all_descendants_of_position(position, state_map) | |
if position_unchallanged(decendant.state) ) | |
def get_positions_challanged_by_position(position, state_map): | |
return () | |
def get_positions_defended_by_position(position, state_map): | |
return () | |
# The fundamental algorithm when a new state is added | |
# | |
# first, that means somebody is owning it for the first time. | |
# | |
# If the state isn't already challanged: | |
# | |
# Get all the states challanged by the state | |
# update thier owners to be challanged if not already | |
# | |
# Get all the states defended by the state | |
# update thier owners to be defended if not already | |
# unless of course they are simueltaneously challanged by the | |
# state, (as above), | |
# | |
# If the state is already challanged: | |
# we should find the unchallanged ancestors and run this algorithm | |
# on them instead, which in the end we can assert leads to | |
# the owner of this state listing it as an unchallanged one | |
# and we should hint at the possible danger some day from this recurssion... | |
# | |
# one problem with this approach is that we dn't have anything to | |
# punush players for contradicting themselves, will have to save that | |
# until scoring procces... | |
# best to keep move insertion as simple as possible... | |
def update_challange_status_of_state_owner(position, state_map, new_owner): | |
challanged_by = [ | |
state_map[desc_state] | |
for desc_state in engine_get_descendant_states(position.state) | |
if desc_state in state_map | |
] | |
if len(challanged_by) == 0: | |
positions_challanged = \ | |
set(get_positions_challanged_by_position(position, state_map)) | |
for challanged_position in positions_challanged: | |
owner = challanged_position.owner | |
owner.defended_positions.discard(challanged_position) | |
owner.challanged_positions.add(challanged_position) | |
for defended_position in get_positions_defended_by_position( | |
position, state_map): | |
if defended_position not in positions_challanged: | |
owner = defended_position.owner | |
owner.defended_positions.add(defended_position) | |
# is this ever useful? | |
new_owner.challanged_positions.discard(position) | |
# is it is possible that it could of been in here before | |
# but we don't need to check, set add takes care of that | |
new_owner.defended_positions.add(position) | |
else: | |
# this is where things get recursive in a way that could be problematic | |
# at some point.. | |
update_challange_status_of_substates_owners( | |
get_unchallaged_descendants_of_position(position, state_map) ) | |
def update_challange_status_of_substates_owners( | |
positions, state_map, new_owner): | |
for position in positions: | |
update_challange_status_of_state_owner(position, state_map, new_owner) | |
def take_ownership_of_state( | |
state_map, state, contributor, winning=True): | |
if state in state_map: | |
raise Exception("State %s is already owned" % state) | |
if winning: | |
position = state_map[state] = Position(state, contributor, winning) | |
update_challange_status_of_substates_owners( | |
(position,), state_map, contributor ) | |
else: | |
raise Exception("only winning=True supported right now") | |
class ZaTest(TestCase): | |
def setUp(self): | |
contributors = engine_create_base_contributors_for_players() | |
base_state = engine_get_base_state() | |
self.state_map = {} | |
take_ownership_of_state( | |
self.state_map, base_state, | |
contributors[engine_get_player_implied_by_state(base_state)] ) | |
def test_len_own_map(self): | |
self.assertEquals( 1, len(self.state_map) ) | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# @Authors Mark Jenkins <mark@parit.ca> | |
from unittest import main, TestCase | |
from winninglines.fenstate import advance_counters | |
class FenTest(TestCase): | |
def test_advance_counters(self): | |
self.assertEquals( advance_counters( ('', 'w', '', 0) ), | |
('', 'b', '', 1) ) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment