Skip to content

Instantly share code, notes, and snippets.

@markjenkins
Created March 15, 2016 16:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save markjenkins/10bf49cad123d503ed47 to your computer and use it in GitHub Desktop.
Save markjenkins/10bf49cad123d503ed47 to your computer and use it in GitHub Desktop.
winninglines
PRED_VAL, PRED_DEPTH = range(2)
# abreviated version of FEN, we don't include the en-pessant unless available
# as for clock times, we're keeping the lowest seen per state, not making
# it unique
START_STATE = 'rnbqkbnr/pppppppp/8/8/8/8/PPPPPPPP/RNBQKBNR w KQkq'
def pawn_move_or_capture(first_state, second_state):
"""determine if the transition from first_state to second_state entailed
a pawn capture
"""
# need to fill this in
return False
class Node(object):
def __init__(self, state_desc, owner):
self.state_desc = state_desc
self.owner = owner
# makes it easy to make a variation using
self.init_successors_and_predecessors()
# only true of start state, every other state is at least 1
self.least_ply_count_to_here = 0
# reset each time there is a pawn move
self.least_ply_since_pawn_move_or_capture = 0
def init_successors_and_predecessors(self):
self.successors = set()
self.predecessors = set()
def __hash__(self):
return hash(self.state_desc)
def add_preds(self, preds):
"""Takes care of the doubly linking by doing the add_successors
of this state to the predecessors
"""
old_least_ply_count_to_here = self.least_ply_count_to_here
# reset each time there is a pawn move
old_least_ply_since_pawn_move_or_capture = \
self.least_ply_since_pawn_move_or_capture
for pred in preds:
# if least_ply_count_to_here hasn't been set yet and we're not
# the start stage (a reason to leave it)
#
# of, if one of the newer predecessors has a lower count
pred_plus = pred.least_ply_count_to_here+1
if ( (self.least_ply_count_to_here == 0 and
not self.is_start_state() and) or
pred_plus < self.least_ply_count_to_here ):
self.least_ply_count_to_here = pred_plus
pawn_move_or_capture_after_pred = (
0 if pawn_move_or_capture(pred, self)
else pred.least_ply_since_pawn_move_or_capture + 1 )
if (pawn_move_or_capture_after_pred <
self.least_ply_since_pawn_move_or_capture):
self.least_ply_since_pawn_move_or_capture = \
pawn_move_or_capture_after_pred
assert( self.least_ply_count_to_here >= 0 )
assert( self.least_ply_since_pawn_move_or_capture >= 0 )
# major assumption (worth testing)
# is that we don't need to do this first before
# grabbing ply counts of successors, in particular, that our
# precessor's ply counts can't possibly drop by way of this
# this state having gained predecessors and them each gaining
# this state as a sucessor
pred.successors.add(self)
# probably more effecient than a bunch of .add operations in
# above loop, more optimization perhaps for underlying store growing
# in a less piece by piece way
self.predecessors.update(preds)
return (old_least_ply_count_to_here,
old_least_ply_since_pawn_move_or_capture)
def is_start_state(self):
return self.state_desc == START_STATE
class GameTree(object):
def claim_node(owner, state_desc):
# find all possible legal succesor states early one
# all_sucessors =
# now, after that we can start db transaction
# start DB transaction, the following reads and computations are
# critical path for transaction
# confirm that state_desc and mirror/inverted equivilent versions of it
# aren't already present, otherwise deny claim
# look up all the states that have this as a successor,
# our prededcedors. Include inversions of this state in that particular
# search
predecessors = set()
# there better be one, or else it's an orhan state, of no interest to us
if len(predecessors) == 0:
# perhaps this case should be handled by an exception?
return # you can't claim a state with no predecessors
new_node = Node(state_desc, owner)
# overall stragegy now
# 1) create links (and establish ply count)
# 2) filter sucessors
# 3) update ply counts of successors
# 4) update status
# 1)
new_node.add_preds(precessors)
# 2)
# filter all_sucessors down to the successors that already exist,
# including, inversions
successors = set()
# 3)
# assumption (worth testing), we don't need to do this before the above,
# there's no way that connecting this to our successors should screw up
# the recursively defined counts in the above
#
# and, with that out of the way, it should be no problem
for successor in sucessors:
(old_least_ply_count_to_here, \
old_least_ply_since_pawn_move_or_capture) = \
successor.add_preds( set( (new_node,) ) )
# reverse the effect of
# old_least_ply_count_to_here had on the score and incorporate new
# sucessor.least_ply_count_to_here
# There's sanity concerns worth being concerned about here again
# too, does the change in sucessor.least_ply_count_to_here
# at all effect new_node.least_ply_count_to_here
# 4)
# we can't rely on the status cache on the way up because
# new_node itself may have created some incoherence in those caches
# byway of some kind of loopback
# But, by doing so, we can rely on the cache on the way down
#
# hopefully some tests bear this all out
already_visited = set()
status_changed = set( (new_node,) )
self.update_status_of_node_by_way_of_sucessors(
new_node, already_visited, status_changed, cache=False)
# one notable thing is that above only goes up and doesn't fix
# predecessors of those above, so we'll be accumulating them to
# adjustment (cache drive) as a stage two
self.update_status_of_nodes_predecessors(new_node, already_visited)
# for each state that was a predesor of sucessors fix them...
def update_status_of_node_by_way_of_sucessors(
self, node, already_visited, status_changed, cache=False):
already_visited.add(node)
if not cache :
for successor in node.sucessors:
if successor not in already_visited:
self.update_status_of_node_by_way_of_sucessors(
successor,
already_visited, status_changed, cache=False)
# set, my current state
#
# establish my new status,
# if I wasn't challanged, but now one of my successors
# is now unchallanged , then Im now challanged by the existnce
# of said successor
#
# if I was challanged, but now, all of my successors are
# challanged, I'm no longer challanged, huzah!
#
# otherwise, no state change
# my_new_status=
# AND update sets in this regard
#
# here's an interesting thing, generally we can rely on the cached
# status of a successor, but not if it has gained a successor
# (directly or indirect) due to the new state being introduce
#
# (remember, my sucessors can actually have me as a sucessor too!)
# if my status changed,
# status_changed.add(node)
# the manner in which this state contributed
# the other owner's score before needs to be removed, and new manner
# applied
#
# of course, not happening if nobbody owns me
if node.owner != None:
pass # do score adjustment
# note, it's up to caller to adjust my predecesors!
def update_status_of_nodes_predecessors(self, node, already_visited):
already_visited.add(node)
for pred in new_node.predecessors:
self.update_status_of_node_by_way_of_sucessors(
pred, use_cached_status_of_sucessors=True)
self.update_status_of_nodes_predecessors(pred, already_visited)
def create_new_position(old_position, backstate, reset_half_count):
return expand_backstates_inposition(
frozenset(), old_position, backstate, reset_half_count)
def add_backstate_to_position(position, old_position, backstate,
reset_half_count):
return position.union(
(backstate, 0 if reset_half_count else back_count+1)
for old_back_state, back_count in old_position )
def own_position(state, position, own_index, score_index):
assert( state not in own_index and state not in score_index )
# @Authors Mark Jenkins <mark@parit.ca>
BOARD, TURN, CASTLE_AND_EN, HALF_CLOCK = range(4)
WHITE_TURN = 'w'
BLACK_TURN = 'b'
def advance_counters(state, no_reset_half_clock=True):
"""Return new fen state tuple with TURN, HALF_CLOCK incremented
Can also reset HALF_CLOCK
"""
return (
state[BOARD],
WHITE_TURN if state[TURN] == BLACK_TURN else BLACK_TURN,
state[CASTLE_AND_EN],
state[HALF_CLOCK] + 1 if no_reset_half_clock else 0,
)
from persistent.set
BACK_POSITIONS, SUB_POSITIONS = xrange(1)
def new_position(origin):
pass
from unittest import TestCase, main
""" ____ 9----------------
/ b |
| 5 7 |
| w null |
| 3 4 6 |
b b b b\ \----null |
1 2 \ null | |
w w | 8 10-w--
O -----------
b
"""
CHOICE_TO_BE_BLACK = ('b', 0)
forward_moves_in_insertion_order = (
( CHOICE_TO_BE_BLACK, ('w', 1), ),
( CHOICE_TO_BE_BLACK, ('w', 2), ),
( ('w', 2), ('b', 3), ),
( ('w', 2), ('b', 4), ),
( ('b', 4), ('w', 5), ),
( ('w', 2), ('b', 6) ),
( ('w', 1), ('b', 9 ) ),
( ('w', 10), ('b', 9) ),
)
forward_moves = {}
backward_moves = {}
def add_allowable_forward_and_backward_moves(moves):
for prev_state, new_state in moves:
if prev_state not in forward_moves:
forward_moves[prev_state] = ()
forward_moves[prev_state] += (new_state,)
if new_state not in backward_moves:
backward_moves[new_state] = ()
backward_moves[new_state] += (prev_state,)
add_allowable_forward_and_backward_moves(forward_moves_in_insertion_order)
add_allowable_forward_and_backward_moves( (
( ('b', 6), ('w', 7) ),
( ('w', 8), ('b', 6), ),
( ('w', 5), ('b', 9) ),
( ('w', 10), ('b', 6) ),
( ('b', 4), ('w', 10), ),
))
class Position(object):
def __init__(self, state, owner, winning=True):
self.state = state
self.owner = owner
self.winning = winning
class Contributor(object):
def __init__(self, name):
self.name = name
self.groups = []
# eventually someday...
#self.drawn_positions = set()
self.defended_positions = set()
self.challanged_positions = set()
def engine_create_base_contributors_for_players():
contributors = {'w': Contributor('w'),
'b': Contributor('b') }
return contributors
def engine_get_base_state():
return CHOICE_TO_BE_BLACK
def engine_get_player_implied_by_state(state):
return state[0]
def engine_get_descendant_states(state):
return () if state not in forward_moves else forward_moves[state]
def engine_get_ancestor_states(state):
return () if state not in backward_moves else backward_moves[state]
def get_all_descendants_of_position(position, state_map):
visited_states = set( (position.state,) )
current_states = set(visited_states)
# ahhh, the avoidance of recursion, got to love it
while len(current_states) > 0:
next_states = set(
desc_state
for desc_state in engine_get_descendant_states(state)
for state in current_states
if desc_state in state_map and desc_state not in visited_states
)
for state in next_states:
yield state
visisted_states.update(next_states)
current_states = next_states
def position_unchallanged(position, state_map):
return all(
decendant_state not in state_map
for decendant_state in
engine_get_descendant_states(position.state) )
def get_unchallanged_descendants_of_position(position, state_map):
return (decendant
for decendant in
get_all_descendants_of_position(position, state_map)
if position_unchallanged(decendant.state) )
def get_positions_challanged_by_position(position, state_map):
return ()
def get_positions_defended_by_position(position, state_map):
return ()
# The fundamental algorithm when a new state is added
#
# first, that means somebody is owning it for the first time.
#
# If the state isn't already challanged:
#
# Get all the states challanged by the state
# update thier owners to be challanged if not already
#
# Get all the states defended by the state
# update thier owners to be defended if not already
# unless of course they are simueltaneously challanged by the
# state, (as above),
#
# If the state is already challanged:
# we should find the unchallanged ancestors and run this algorithm
# on them instead, which in the end we can assert leads to
# the owner of this state listing it as an unchallanged one
# and we should hint at the possible danger some day from this recurssion...
#
# one problem with this approach is that we dn't have anything to
# punush players for contradicting themselves, will have to save that
# until scoring procces...
# best to keep move insertion as simple as possible...
def update_challange_status_of_state_owner(position, state_map, new_owner):
challanged_by = [
state_map[desc_state]
for desc_state in engine_get_descendant_states(position.state)
if desc_state in state_map
]
if len(challanged_by) == 0:
positions_challanged = \
set(get_positions_challanged_by_position(position, state_map))
for challanged_position in positions_challanged:
owner = challanged_position.owner
owner.defended_positions.discard(challanged_position)
owner.challanged_positions.add(challanged_position)
for defended_position in get_positions_defended_by_position(
position, state_map):
if defended_position not in positions_challanged:
owner = defended_position.owner
owner.defended_positions.add(defended_position)
# is this ever useful?
new_owner.challanged_positions.discard(position)
# is it is possible that it could of been in here before
# but we don't need to check, set add takes care of that
new_owner.defended_positions.add(position)
else:
# this is where things get recursive in a way that could be problematic
# at some point..
update_challange_status_of_substates_owners(
get_unchallaged_descendants_of_position(position, state_map) )
def update_challange_status_of_substates_owners(
positions, state_map, new_owner):
for position in positions:
update_challange_status_of_state_owner(position, state_map, new_owner)
def take_ownership_of_state(
state_map, state, contributor, winning=True):
if state in state_map:
raise Exception("State %s is already owned" % state)
if winning:
position = state_map[state] = Position(state, contributor, winning)
update_challange_status_of_substates_owners(
(position,), state_map, contributor )
else:
raise Exception("only winning=True supported right now")
class ZaTest(TestCase):
def setUp(self):
contributors = engine_create_base_contributors_for_players()
base_state = engine_get_base_state()
self.state_map = {}
take_ownership_of_state(
self.state_map, base_state,
contributors[engine_get_player_implied_by_state(base_state)] )
def test_len_own_map(self):
self.assertEquals( 1, len(self.state_map) )
if __name__ == "__main__":
main()
# @Authors Mark Jenkins <mark@parit.ca>
from unittest import main, TestCase
from winninglines.fenstate import advance_counters
class FenTest(TestCase):
def test_advance_counters(self):
self.assertEquals( advance_counters( ('', 'w', '', 0) ),
('', 'b', '', 1) )
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment