Last active
September 30, 2018 04:39
-
-
Save jmhummel/1093084e191452e5e8aea3b7937f63c5 to your computer and use it in GitHub Desktop.
2018-09-28-classic-code
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/local/bin/python | |
# -*- coding: utf-8 -*- | |
# See: https://gist.github.com/jmhummel/fe7604ab830567d5939b21d3e6c57dfa for results! | |
import ast | |
import random | |
import datetime | |
import multiprocessing | |
from multiprocessing import Pool | |
def rotate(state): | |
new_state = [] | |
for i in range(0, len(state)): | |
row = [] | |
for j in xrange(i, -1, -1): | |
row.append(state[::-1][i-j][j]) | |
new_state.append(row) | |
return new_state | |
def swap(a, b, state): | |
y1, x1 = a | |
y2, x2 = b | |
new_state = [row[:] for row in state] | |
new_state[y1][x1], new_state[y2][x2] = state[y2][x2], state[y1][x1] | |
return new_state | |
def heuristic_cost_estimate(neighbor, goal_state): | |
def score(curr): | |
# return 0 | |
return 0.5 * len([(i, j) for i, row in enumerate(curr) for j, val in enumerate(row) if val != goal_state[i][j]]) | |
test = [score(neighbor), score(rotate(neighbor))+1, score(rotate(rotate(neighbor)))+1] | |
estimate = min(test) | |
return estimate | |
def get_neighbors(current): | |
neighbors = {('rotate CW', str(rotate(current))), ('rotate CCW', str(rotate(rotate(current))))} | |
l = [(i,j) for i, row in enumerate(current) for j, _ in enumerate(row)] | |
l.sort() | |
for idx, a in enumerate(l): | |
for b in l[:idx]: | |
y1, x1 = a | |
y2, x2 = b | |
val1 = current[y1][x1] | |
val2 = current[y2][x2] | |
if val1 != val2: | |
neighbors.add(('swap {} {} and {} {}'.format(val1, a, val2, b), (str(swap(a, b, current))))) | |
return neighbors | |
def reconstruct_path(cameFrom, current): | |
current_str = str(current) | |
total_path = [('goal', current_str)] | |
while current_str in cameFrom: | |
# print current_str | |
action, current_str = cameFrom[current_str] | |
total_path.append((action, current_str)) | |
return total_path | |
def A_Star(start, goal): | |
# The set of nodes already evaluated | |
closedSet = set([]) | |
# The set of currently discovered nodes that are not evaluated yet. | |
# Initially, only the start node is known. | |
openSet = {str(start)} | |
# For each node, which node it can most efficiently be reached from. | |
# If a node can be reached from many nodes, cameFrom will eventually contain the | |
# most efficient previous step. | |
cameFrom = {} | |
# For each node, the cost of getting from the start node to that node. | |
gScore = {} # map with default value of Infinity | |
def get_gScore(node): | |
if str(node) in gScore: | |
return gScore[str(node)] | |
else: | |
return float("inf") | |
# The cost of going from start to start is zero. | |
gScore[str(start)] = 0 | |
# For each node, the total cost of getting from the start node to the goal | |
# by passing by that node. That value is partly known, partly heuristic. | |
fScore = {} # map with default value of Infinity | |
def get_fScore(node): | |
if str(node) in fScore: | |
return fScore[str(node)] | |
else: | |
return float("inf") | |
# For the first node, that value is completely heuristic. | |
fScore[str(start)] = heuristic_cost_estimate(start, goal) | |
while openSet: | |
current_str = min(openSet, key=get_fScore) # the node in openSet having the lowest fScore[] value | |
current = ast.literal_eval(current_str) | |
if current_str == str(goal): | |
return reconstruct_path(cameFrom, current) | |
openSet.remove(current_str) | |
closedSet.add(current_str) | |
for action_neighbor in get_neighbors(current): | |
action, neighbor_str = action_neighbor | |
neighbor = ast.literal_eval(neighbor_str) | |
if neighbor_str in closedSet: | |
continue # pass # continue # Ignore the neighbor which is already evaluated. | |
# The distance from start to a neighbor | |
tentative_gScore = get_gScore(current) + 1 # dist_between(current, neighbor) | |
if neighbor_str not in openSet: # Discover a new node | |
openSet.add(neighbor_str) | |
elif tentative_gScore >= get_gScore(neighbor): | |
continue # This is not a better path. | |
# This path is the best until now. Record it! | |
cameFrom[neighbor_str] = (action, str(current)) | |
gScore[neighbor_str] = tentative_gScore | |
fScore[neighbor_str] = get_gScore(neighbor) + heuristic_cost_estimate(neighbor, goal) | |
def pprint(state, asArray=False, emoji=False): | |
if asArray: | |
print '[' | |
for row in state: | |
print '{:^20},'.format(row) | |
print ']' | |
elif emoji: | |
emojiDict = {"0 ": "🎾", "1 ": "⚽", "8 ": "🎱"} | |
for idx, row in enumerate(state): | |
vals = ' '.join(['{:<2}'.format(i) for i in row]) | |
out = '{:^22}'.format(vals) | |
for key, val in emojiDict.iteritems(): | |
out = out.replace(str(key), val) | |
print out | |
else: | |
print '{:^18}'.format('/-\\') | |
for idx, row in enumerate(state): | |
vals = ' '.join([str(i) for i in row]) | |
width = idx*2 + 3 | |
out = '/' + '{:^{width}}'.format(vals, width=width) + '\\' | |
print '{:^18}'.format(out) | |
print '{:^18}'.format('/-------------\\') | |
goal = [[0], | |
[1,0], | |
[0,8,1], | |
[1,0,1,0], | |
[0,1,1,0,1]] | |
def reconstruct(elements): | |
elements = list(elements) | |
new_state = [] | |
for i in xrange(5): | |
row = [] | |
for j in xrange(i+1): | |
row.append(elements.pop(0)) | |
new_state.append(row) | |
return new_state | |
def get_random_state(): | |
elements = [el for sublist in goal for el in sublist] | |
new_state = [] | |
for i in xrange(5): | |
row = [] | |
for j in xrange(i+1): | |
idx = random.randrange(len(elements)) | |
row.append(elements[idx]) | |
elements.pop(idx) | |
new_state.append(row) | |
return new_state | |
class UniqueElement: | |
def __init__(self, value, occurrences): | |
self.value = value | |
self.occurrences = occurrences | |
def perm_unique(elements): | |
e_set = set(elements) | |
list_unique = [UniqueElement(i, elements.count(i)) for i in e_set] | |
u = len(elements) | |
return perm_unique_helper(list_unique, [0]*u, u-1) | |
def perm_unique_helper(list_unique, result_list, d): | |
if d < 0: | |
yield tuple(result_list) | |
else: | |
for i in list_unique: | |
if i.occurrences > 0: | |
result_list[d] = i.value | |
i.occurrences -= 1 | |
for g in perm_unique_helper(list_unique, result_list, d-1): | |
yield g | |
i.occurrences += 1 | |
def get_permutations(): | |
elements = [el for sublist in goal for el in sublist] | |
elements.sort() | |
return perm_unique(elements) | |
def worker(s): | |
start = reconstruct(s) | |
return s, A_Star(start, goal) | |
if __name__ == '__main__': | |
worst = [] | |
totals = {} | |
count = 0 | |
perms = list(get_permutations()) | |
print "Total pems:", len(perms) | |
cpu_count = multiprocessing.cpu_count() | |
print "CPU count:", cpu_count | |
pool = Pool() | |
print "Pool size:", pool._processes | |
for s, result in pool.imap(worker, perms): | |
count += 1 | |
cost = len(result) - 1 | |
if cost not in totals: | |
totals[cost] = 0 | |
totals[cost] += 1 | |
if random.random() < 0.001: | |
print '{} count: {:5}/{} avg: {:<6} totals: {}'.format(datetime.datetime.utcnow().isoformat(), count, len(perms), 1.0 * sum([k*v for k, v in totals.iteritems()]) / count, totals) | |
if len(result) > len(worst): | |
worst = result | |
print len(worst) - 1 | |
pprint(ast.literal_eval(result[-1][1]), emoji=True) | |
pool.close() | |
pool.join() | |
print '{} count: {:5}/{} avg: {:<6} totals: {}'.format(datetime.datetime.utcnow().isoformat(), count, len(perms), sum([k*v for k, v in totals.iteritems()]) / count, totals) | |
for action, state in reversed(list(worst)): | |
print action | |
pprint(ast.literal_eval(state), emoji=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Results: https://gist.github.com/jmhummel/fe7604ab830567d5939b21d3e6c57dfa