Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
2018-09-28-classic-code
#!/usr/local/bin/python
# -*- coding: utf-8 -*-
# See: https://gist.github.com/jmhummel/fe7604ab830567d5939b21d3e6c57dfa for results!
import ast
import random
import datetime
import multiprocessing
from multiprocessing import Pool
def rotate(state):
new_state = []
for i in range(0, len(state)):
row = []
for j in xrange(i, -1, -1):
row.append(state[::-1][i-j][j])
new_state.append(row)
return new_state
def swap(a, b, state):
y1, x1 = a
y2, x2 = b
new_state = [row[:] for row in state]
new_state[y1][x1], new_state[y2][x2] = state[y2][x2], state[y1][x1]
return new_state
def heuristic_cost_estimate(neighbor, goal_state):
def score(curr):
# return 0
return 0.5 * len([(i, j) for i, row in enumerate(curr) for j, val in enumerate(row) if val != goal_state[i][j]])
test = [score(neighbor), score(rotate(neighbor))+1, score(rotate(rotate(neighbor)))+1]
estimate = min(test)
return estimate
def get_neighbors(current):
neighbors = {('rotate CW', str(rotate(current))), ('rotate CCW', str(rotate(rotate(current))))}
l = [(i,j) for i, row in enumerate(current) for j, _ in enumerate(row)]
l.sort()
for idx, a in enumerate(l):
for b in l[:idx]:
y1, x1 = a
y2, x2 = b
val1 = current[y1][x1]
val2 = current[y2][x2]
if val1 != val2:
neighbors.add(('swap {} {} and {} {}'.format(val1, a, val2, b), (str(swap(a, b, current)))))
return neighbors
def reconstruct_path(cameFrom, current):
current_str = str(current)
total_path = [('goal', current_str)]
while current_str in cameFrom:
# print current_str
action, current_str = cameFrom[current_str]
total_path.append((action, current_str))
return total_path
def A_Star(start, goal):
# The set of nodes already evaluated
closedSet = set([])
# The set of currently discovered nodes that are not evaluated yet.
# Initially, only the start node is known.
openSet = {str(start)}
# For each node, which node it can most efficiently be reached from.
# If a node can be reached from many nodes, cameFrom will eventually contain the
# most efficient previous step.
cameFrom = {}
# For each node, the cost of getting from the start node to that node.
gScore = {} # map with default value of Infinity
def get_gScore(node):
if str(node) in gScore:
return gScore[str(node)]
else:
return float("inf")
# The cost of going from start to start is zero.
gScore[str(start)] = 0
# For each node, the total cost of getting from the start node to the goal
# by passing by that node. That value is partly known, partly heuristic.
fScore = {} # map with default value of Infinity
def get_fScore(node):
if str(node) in fScore:
return fScore[str(node)]
else:
return float("inf")
# For the first node, that value is completely heuristic.
fScore[str(start)] = heuristic_cost_estimate(start, goal)
while openSet:
current_str = min(openSet, key=get_fScore) # the node in openSet having the lowest fScore[] value
current = ast.literal_eval(current_str)
if current_str == str(goal):
return reconstruct_path(cameFrom, current)
openSet.remove(current_str)
closedSet.add(current_str)
for action_neighbor in get_neighbors(current):
action, neighbor_str = action_neighbor
neighbor = ast.literal_eval(neighbor_str)
if neighbor_str in closedSet:
continue # pass # continue # Ignore the neighbor which is already evaluated.
# The distance from start to a neighbor
tentative_gScore = get_gScore(current) + 1 # dist_between(current, neighbor)
if neighbor_str not in openSet: # Discover a new node
openSet.add(neighbor_str)
elif tentative_gScore >= get_gScore(neighbor):
continue # This is not a better path.
# This path is the best until now. Record it!
cameFrom[neighbor_str] = (action, str(current))
gScore[neighbor_str] = tentative_gScore
fScore[neighbor_str] = get_gScore(neighbor) + heuristic_cost_estimate(neighbor, goal)
def pprint(state, asArray=False, emoji=False):
if asArray:
print '['
for row in state:
print '{:^20},'.format(row)
print ']'
elif emoji:
emojiDict = {"0 ": "🎾", "1 ": "⚽", "8 ": "🎱"}
for idx, row in enumerate(state):
vals = ' '.join(['{:<2}'.format(i) for i in row])
out = '{:^22}'.format(vals)
for key, val in emojiDict.iteritems():
out = out.replace(str(key), val)
print out
else:
print '{:^18}'.format('/-\\')
for idx, row in enumerate(state):
vals = ' '.join([str(i) for i in row])
width = idx*2 + 3
out = '/' + '{:^{width}}'.format(vals, width=width) + '\\'
print '{:^18}'.format(out)
print '{:^18}'.format('/-------------\\')
goal = [[0],
[1,0],
[0,8,1],
[1,0,1,0],
[0,1,1,0,1]]
def reconstruct(elements):
elements = list(elements)
new_state = []
for i in xrange(5):
row = []
for j in xrange(i+1):
row.append(elements.pop(0))
new_state.append(row)
return new_state
def get_random_state():
elements = [el for sublist in goal for el in sublist]
new_state = []
for i in xrange(5):
row = []
for j in xrange(i+1):
idx = random.randrange(len(elements))
row.append(elements[idx])
elements.pop(idx)
new_state.append(row)
return new_state
class UniqueElement:
def __init__(self, value, occurrences):
self.value = value
self.occurrences = occurrences
def perm_unique(elements):
e_set = set(elements)
list_unique = [UniqueElement(i, elements.count(i)) for i in e_set]
u = len(elements)
return perm_unique_helper(list_unique, [0]*u, u-1)
def perm_unique_helper(list_unique, result_list, d):
if d < 0:
yield tuple(result_list)
else:
for i in list_unique:
if i.occurrences > 0:
result_list[d] = i.value
i.occurrences -= 1
for g in perm_unique_helper(list_unique, result_list, d-1):
yield g
i.occurrences += 1
def get_permutations():
elements = [el for sublist in goal for el in sublist]
elements.sort()
return perm_unique(elements)
def worker(s):
start = reconstruct(s)
return s, A_Star(start, goal)
if __name__ == '__main__':
worst = []
totals = {}
count = 0
perms = list(get_permutations())
print "Total pems:", len(perms)
cpu_count = multiprocessing.cpu_count()
print "CPU count:", cpu_count
pool = Pool()
print "Pool size:", pool._processes
for s, result in pool.imap(worker, perms):
count += 1
cost = len(result) - 1
if cost not in totals:
totals[cost] = 0
totals[cost] += 1
if random.random() < 0.001:
print '{} count: {:5}/{} avg: {:<6} totals: {}'.format(datetime.datetime.utcnow().isoformat(), count, len(perms), 1.0 * sum([k*v for k, v in totals.iteritems()]) / count, totals)
if len(result) > len(worst):
worst = result
print len(worst) - 1
pprint(ast.literal_eval(result[-1][1]), emoji=True)
pool.close()
pool.join()
print '{} count: {:5}/{} avg: {:<6} totals: {}'.format(datetime.datetime.utcnow().isoformat(), count, len(perms), sum([k*v for k, v in totals.iteritems()]) / count, totals)
for action, state in reversed(list(worst)):
print action
pprint(ast.literal_eval(state), emoji=True)
@jmhummel

This comment has been minimized.

Copy link
Owner Author

@jmhummel jmhummel commented Sep 30, 2018

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment