Skip to content

Instantly share code, notes, and snippets.

@mateon1
Last active December 25, 2022 15:32
Show Gist options
  • Save mateon1/b63eabc371ac35e2a14a9c5ce37413bc to your computer and use it in GitHub Desktop.
Save mateon1/b63eabc371ac35e2a14a9c5ce37413bc to your computer and use it in GitHub Desktop.
Closed Position Set (CPS) TM decider
from bbconv import *
from collections import deque
# Machine, N - segment length, s - initial state, p in [0..N) - initial position, t - tape state encoded as an integer
# returns (p, t, s) - final position, tape state, head state
def segrun(M, N, s, p, t, limit=-1):
seen = set()
while 0 <= p < N:
#print(": " + bin(t)[2:].zfill(N)[::-1])
#print("- " + " "*p + "^")
# Detect loops
key = ((t << 4 | s) << 8) | p
if key in seen: break
seen.add(key)
limit -= 1
if limit == 0: break
# Perform forward transition
w, d, s = M[s][(t>>p)&1]
if s < 0: break
if w:
t |= 1<<p
else:
t &= ~(1 << p)
if d:
p += 1
else:
p += -1
return (p, t, s), limit
# M - machine, N - atom size, T - segment size (always 3*N in savask)
# returns False if the machine runs forever, or None if it can't decide
def decide(M, N, T, P=None, limit=999999999, loud=False, savaskcert=False):
if P is None: P = N
assert N > 0 # N=1 is useless, but valid
assert P <= N # Larger prefixes are theoretically possible, but current implementation is not sound with them
assert T >= N + P
leftg = {0: {0}}
rightg = {0: {0}}
queue = deque()
queue.append((T-N, 0, 0)) # (pos, tape, state) tuple
#queue = set()
#queue.add((T-N, 0, 0)) # (pos, tape, state) tuple
SEG_MASK = (1<<T)-1
ATOM_MASK = (1<<N)-1
PFX_MASK = (1<<P)-1
finleft = {}
finright = {}
loops = set()
atomcopiesl = {}
atomcopiesr = {}
seen = set() # avoid duplicating work
while queue:
p, t, s = queue.popleft()
if (p, t, s) in seen: continue
seen.add((p, t, s))
if loud >= 3: print("Unqueued: ", prettytape(t, T, p, s))
#assert N-1 <= p < T-N+1
res, limit = segrun(M, T, s, p, t, limit)
if limit <= 0: return None, 0
pn, tn, sn = res
if loud >= 3: print("Sim result: ", prettytape(tn, T, pn, sn) + (" *HALT*" if sn < 0 else " *LOOP*" if 0 <= pn < T else ""))
if sn < 0:
if loud >= 1:
print("Found halt in position set!")
print(":UNDECIDED:")
#print((N, N, T), limit, "; s ", len(finleft), len(finright), "; g ", sum(len(v) for v in leftg.values()), sum(len(v) for v in rightg.values()))
return None, limit
if 0 <= pn < T:
loops.add((p, t, s, pn, tn, sn))
continue
elif pn < 0:
origpfx = t & PFX_MASK
finleft.setdefault(origpfx, set())
finleft[origpfx].add((p, t, s, tn, sn))
else:
origpfx = t >> (T - P)
finright.setdefault(origpfx, set())
finright[origpfx].add((p, t, s, tn, sn))
# Queue up this state according to current graph transitions
if pn < 0:
origpfx = t & PFX_MASK
for a in leftg[origpfx]:
tnn = ((tn << N) | a) & SEG_MASK
queue.append((N-1, tnn, sn))
#queue.add((N-1, tnn, sn))
else:
origpfx = t >> (T - P)
for a in rightg[origpfx]:
tnn = (tn | (a << T)) >> N
queue.append((T-N, tnn, sn))
#queue.add((T-N, tnn, sn))
# See if shifting the tape extends the left/right graph
# if so, queue up the new transitions for all matching positions in set
def addrightatom(a, b):
nonlocal limit
toadd = [(a, b)]
while toadd:
a, b = toadd.pop()
oset = rightg.get(a, set())
if b in oset: continue
limit -= 1
rightg[a] = oset | {b}
if loud >= 2: print("right edge: " + prettytape(a, P) + " -> " + prettytape(b, N))
for ia in atomcopiesr.get(a, set()): toadd.append((ia, b))
# queue affected positions
for (rp, rt, rs, rtn, rsn) in finright.get(a, set()):
#rorigatom = rt >> (T - N)
#if rorigatom != a: continue
rtnn = (rtn | (b << T)) >> N
queue.append((T-N, rtnn, rsn))
#queue.add((T-N, rtnn, rsn))
def addleftatom(a, b):
nonlocal limit
toadd = [(a, b)]
while toadd:
a, b = toadd.pop()
oset = leftg.get(a, set())
if b in oset: continue
limit -= 1
leftg[a] = oset | {b}
if loud >= 2: print("left edge: " + prettytape(a, P) + " -> " + prettytape(b, N))
for ia in atomcopiesl.get(a, set()): toadd.append((ia, b))
# queue affected positions
for (rp, rt, rs, rtn, rsn) in finleft.get(a, set()):
#rorigatom = rt & ATOM_MASK
#if rorigatom != a: continue
rtnn = ((rtn << N) | b) & SEG_MASK
queue.append((N-1, rtnn, rsn))
#queue.add((N-1, rtnn, rsn))
if pn < 0:
origpfx = t >> (T-P)
origatom = t >> (T-N)
newatom = tn >> (T-N)
newpfx = tn >> (T-P)
midpfx = (tn >> (T-N-P)) & PFX_MASK
# xxxxxxx xxxx A>orig ... (implied rightg[orig] entries)
# v v v
# <D xxxxxxx mida newa
# ^--- shifted out
# New graph edge: mida -> newa
# newa -> everything in orig set
if midpfx not in rightg or newatom not in rightg[midpfx]:
addrightatom(midpfx, newatom)
if origpfx != newpfx:
rightg.setdefault(newpfx, set())
newatomnewedges = rightg[origpfx] - rightg[newpfx]
atomcopiesr.setdefault(origpfx, set())
atomcopiesr[origpfx] |= {newpfx}
for a in newatomnewedges:
addrightatom(newpfx, a)
else:
origpfx = t & PFX_MASK
origatom = t & ATOM_MASK
newatom = tn & ATOM_MASK
newpfx = tn & PFX_MASK
midpfx = (tn >> N) & PFX_MASK
if midpfx not in leftg or newatom not in leftg[midpfx]:
addleftatom(midpfx, newatom)
if origpfx != newpfx:
leftg.setdefault(newpfx, set())
newatomnewedges = leftg[origpfx] - leftg[newpfx]
atomcopiesl.setdefault(origpfx, set())
atomcopiesl[origpfx] |= {newpfx}
for a in newatomnewedges:
addleftatom(newpfx, a)
#print(atomcopiesl)
#print(atomcopiesr)
if loud >= 1:
print("Ran out of positions to visit, have a closed set")
print("Left graph:")
for k, v in sorted(leftg.items()):
print("\t", prettytape(k, P) + ": {" + ",".join(sorted([prettytape(vv, N) for vv in v])) + "}")
print("Right graph:")
for k, v in sorted(rightg.items()):
print("\t", prettytape(k, P) + ": {" + ",".join(sorted([prettytape(vv, N) for vv in v])) + "}")
print("Left pos:")
for (p, t, s, tn, sn) in sorted([m for k in finleft.values() for m in k], key=lambda x: (x[2], x[1], x[0], x[3], x[4])):
print("\t", prettytape(t, T, p, s), "-->", prettytape(tn, T, -1, sn))
if not finleft: print(" None")
print("Right pos:")
for (p, t, s, tn, sn) in sorted([m for k in finright.values() for m in k], key=lambda x: (x[2], x[1], x[0], x[3], x[4])):
print("\t", prettytape(t, T, p, s), "-->", prettytape(tn, T, T, sn))
if not finright: print(" None")
print("Looping pos:")
for (p, t, s, pn, tn, sn) in sorted([m for m in loops], key=lambda x: (x[2], x[1], x[0], x[5], x[4], x[3])):
print("\t", prettytape(t, T, p, s), "-->", prettytape(tn, T, pn, sn), "*LOOPS*")
if not loops: print(" None")
print(":NONHALTING:")
if savaskcert:
lefts = []
for a, v in sorted(leftg.items()):
for b in sorted(v):
lefts.append(prettytape(a, P) + " " + prettytape(b, N))
rights = []
for a, v in sorted(rightg.items()):
for b in sorted(v):
rights.append(prettytape(a, P) + " " + prettytape(b, N))
tapes = []
for (p, t, s, _, _) in sorted((m for end in [finleft, finright, {0: [(p, t, s, tn, sn) for (p, t, s, pn, tn, sn) in loops]}] for k in end.values() for m in k), key=lambda x: (x[2], x[1], x[0], x[3], x[4])):
tapes.append(prettytape(t, T, p, s))
print(stdstr(M), "Result %d %s %d %s %d %s" % (len(lefts), " ".join(lefts), len(rights), " ".join(rights), len(tapes), " ".join(tapes)))
#print((N, T), limit, "; s ", len(finleft), len(finright), "; g ", sum(len(v) for v in leftg.values()), sum(len(v) for v in rightg.values()))
return False, limit
def prettytape(t, N, p=None, s=None):
ts = bin(t)[2:].zfill(N)[::-1]
if p is not None and s is not None:
if p < N//2:
ts = ts[:p+1] + "<" + chr(s + ord("A")) + ts[p+1:]
else:
ts = ts[:p] + chr(s + ord("A")) + ">" + ts[p:]
return ts
def rdecide(M, limit=500000):
if isinstance(M, int): M = read_short(M)[0]
olimit = limit
for i in range(2,256):
for j in range(1, i//2+1):
res, limit = decide(M, j, i, limit=limit)
#print((j, i), res, limit)
if limit == 0: return (None, i, j)
if res is False:
return (False, i, j, olimit-limit)
return (None, 256, 0, limit)
import multiprocessing, sys
def prettyscan(M, T=16, N=8, P=None, limit=10000000):
if isinstance(M, int): M = read_short(M)[0]
for y in range(1, N+1):
for x in range(1, T+1):
if (x < 2*y) if P is None else (x < y+P or y < P):
sys.stdout.write("x")
sys.stdout.flush()
else:
r, l = decide(M, y, x, P, limit=limit)
if l == 0: sys.stdout.write("?")
elif r is None: sys.stdout.write(".")
elif r is False: sys.stdout.write("#")
else:
print("ERROR, don't know how to interpret result", r)
return
sys.stdout.flush()
print()
def wrap(p):
N, M, i = p
if i%(88664064//500) == 0:
print(".", end="")
sys.stdout.flush()
r, l = decide(read_short(i)[0], N, M, limit=1000000000)
assert l > 0
#return (r, 1000000000-l) if l > 0 else (None, -1)
return r
def wrap2(i):
if i%(88664064//1000) == 0:
print(".", end="")
sys.stdout.flush()
M = read_short(i)[0]
if decide(M, 1, 2)[0] is False:
return sum(1<<i for i in range(63)) # cheaty, but saves so much compute. Verified empirically and seems obviously true that if 1,2 decides everything else does too
res = 0
resi = 0
for N, T in ((i, j) for j in range(3, 17) for i in range(1,j//2+1)):
r, l = decide(M, N, T, limit=1000000000)
assert l > 0
#res += b"\0" if r is None else b"\x01"
if r is False: res |= 1 << resi
resi += 1
return res
if __name__ == "__main__":
ts = [(i, j) for j in range(3, 17) for i in range(1,j//2+1)]
allres = {}
with multiprocessing.Pool(20) as pl:
# for t in [(1,i) for i in range(2,11)] + [(2,i) for i in range(4,11)] + [(3,i) for i in range(6,10)] + [(4,8), (4,9), (5,10)]:
# print("Running",t)
# allres[t] = pl.map(wrap, (t+(i,) for i in range(len(shortdata)//8-1)))
# print()
# print("Computing %d params for all machines" % len(ts))
machres = pl.map(wrap2, range(len(shortdata)//8-1))
print()
#print("Loading results from file...")
#with open("res63.bin", "rb") as f:
# machres = list(struct.unpack("<"+"Q"*88664064, f.read()))
#print("Done!")
import collections
ctr = collections.Counter(machres)
#[i for i in sorted(collections.Counter(machres).items(),key=lambda v:(v[1],v))[::-1]]
#for i, m in enumerate(machres):
# if ctr[m] <= 1:
# print("---")
# print("Rare machine!", i, stdstr(read_short(i)[0]))
# prettyscan(i, 20, 10, 500000)
#print("---")
import struct
import mmap
def find(l, x):
if x in l: return l.index(x)
return -1
permuteback = [0, 1, 2, 4, 3, 5, 6, 7, 12, 18, 13, 19, 8, 10, 14, 20, 16, 22, 9, 11, 15, 21, 17, 23]
def rupermute(rules, n):
m = list(range(2,len(rules)))
pidx = [0,1] # 1RB assumed
inv = []
for i in range(1,len(m)+1):
n, k = divmod(n, i)
inv.append(k)
while m:
pidx.append(m.pop(inv.pop()))
#print(pidx)
fixup = lambda r: tuple((ri[0], ri[1], find(pidx, ri[2])) for ri in r)
newrules = [fixup(rules[i]) for i in pidx]
return newrules
def ruswap(rules, a, b):
pidx = list(range(len(rules)))
pidx[a] = b
pidx[b] = a
fixup = lambda r: tuple((ri[0], ri[1], find(pidx, ri[2])) for ri in r)
newrules = [fixup(rules[i]) for i in pidx]
return newrules
# Defines lexical order on rules
def rulekey(rules):
ts = [t for r in rules for t in r]
ts.pop(0) # first transition is fixed, don't bother comparing
return [t[2] for t in ts] + [b for t in ts for b in [t[0], t[1]]]
def runorm(ru):
# Skip initial tape shuffling before writing 1
while not ru[0][0][0]:
s=ru[0][0][2].index(True)
if s == 0: return [((False, True, [True, False]), (True, True, [False, True]))] # 0RA1RH
ru = ruswap(ru, 0, s)
# Rule is now 1?? ??? ...
# Mirror rule to always go right first
if not ru[0][0][1]:
ru = [tuple((t[0], not t[1], t[2]) if t[2]>=0 else (1, True, -1) for t in r) for r in ru]
# Rule is now 1R? ??? ...
if ru[0][0][2] != 1:
s=ru[0][0][2]
if s == 0: return [((True, True, 0), (True, True, -1))] # 1RA1RH
if s > 1:
ru = ruswap(ru, 1, s)
# Rule is now 1RB ??? ...
assert ru[0][0][0]
assert ru[0][0][1]
assert ru[0][0][2] == 1
for t in (t for r in ru for t in r):
if t[2] == -1:
assert t[0] == 1 and t[1] == True, t
# Choose smallest permutation to achieve canonical lexical normal form (as defined by rulekey)
N = [1,1,2,6,24,120,720][len(ru)-2]
mi = 0
mp = ru
mk = rulekey(ru)
for i in range(1, N):
p = rupermute(ru, i)
k = rulekey(p)
assert rupermute(p, permuteback[i]) == ru
if k < mk:
mi = i
mp = p
mk = k
return mp, mi
def ruleparse(s, norm=True, partial=False):
assert not (partial and norm)
s=s.replace(" ","")
s=s.replace("_","")
assert len(s)%6 == 0
N = len(s)//6
ru = []
for i in range(0, len(s), 6):
if not partial:
w0, d0, s0 = ord(s[i+0]) - ord("0"), s[i+1].upper() != "L", ord(s[i+2])-ord("A")
w1, d1, s1 = ord(s[i+3]) - ord("0"), s[i+4].upper() != "L", ord(s[i+5])-ord("A")
if not (0 <= s0 < N): s0 = -1
if not (0 <= s1 < N): s1 = -1
if s0 == -1: w0, d0, s0 = (1, True, -1)
if s1 == -1: w1, d1, s1 = (1, True, -1)
ru.append(((w0, d0, s0), (w1, d1, s1)))
else:
w0, d0, s0 = [0, 1, None]["01".find(s[i+0])], [False, True, None]["LR".find(s[i+1].upper())], ord(s[i+2])-ord("A")
w1, d1, s1 = [0, 1, None]["01".find(s[i+3])], [False, True, None]["LR".find(s[i+4].upper())], ord(s[i+5])-ord("A")
if not (0 <= s0 <= N): s0 = None if s[i+2] not in "!HXZ" else -1
if not (0 <= s1 <= N): s1 = None if s[i+5] not in "!HXZ" else -1
ru.append(((w0, d0, (s0 if s0 is not None else None)), (w1, d1, (s1 if s1 is not None else None))))
if norm:
ru, _ = runorm(ru)
return ru
def largeparse(s, norm=True, partial=False):
assert not (partial and norm)
s=s.replace(" ","")
s=s.split("_")
N = len(s)
M = len(s[0])//3
assert all(len(t) == 3*M for t in s)
ru = []
for i in range(N):
ts = []
for j in range(M):
if not partial:
w0, d0, s0 = ord(s[i][3*j]) - ord("0"), s[i][3*j+1].upper() != "L", ord(s[i][3*j+2])-ord("A")
if not (0 <= s0 < N): s0 = -1
if s0 == -1: w0, d0, s0 = (1, True, -1)
ts.append((w0, d0, s0))
else:
w0, d0, s0 = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, None]["0123456789".find(s[i][3*j+0])], [False, True, None]["LR".find(s[i][3*j+1].upper())], ord(s[i][3*j+2])-ord("A")
if not (0 <= s0 <= N): s0 = None if s[i+2] not in "!HXZ" else -1
ts.append((w0, d0, (s0 if s0 is not None else None)))
ru.append(tuple(ts))
if norm:
ru, _ = runorm(ru)
return ru
def rustr(rules, ass=None):
c = lambda s, x: s[x] if x is not None else "?"
l = lambda i: chr(ord("A")+i) if 0 <= i < len(rules) else "Z?"[i == None]
return [" ".join([c("0123456789", w) + c("LR", d) + l(s) for (w, d, s) in r]) for r in rules]
def stdstr(rules):
c = lambda s, x: s[x] if x is not None else "?"
l = lambda i: chr(ord("A")+i) if 0 <= i < len(rules) else "Z?"[i == None]
return "_".join(["".join([c("0123456789", w) + c("LR", d) + l(s) for (w, d, s) in r]) for r in rules])
# --- #
def read_wide(i=-1, data=None):
if data is None: data = widedata
b = struct.unpack_from("B"*30, widedata, i*30+30)
return runorm([
(([0, 1][b[i ]], [True, False][b[i+1]], b[i+2]-1) if b[i+2] else (1, True, -1),
([0, 1][b[i+3]], [True, False][b[i+4]], b[i+5]-1) if b[i+5] else (1, True, -1)) for i in range(0, 30, 6)
])
def read_short(i=-1, data=None):
if data is None: data = shortdata
st, pk = struct.unpack_from("<II", data, i*8+8)
return ([
(((pk>>(4*i )) & 1, bool((pk>>(4*i+1)) & 1), [0,1,2,3,4,5,6,-1][(st>>(i*6 ))&7]),
((pk>>(4*i+2)) & 1, bool((pk>>(4*i+3)) & 1), [0,1,2,3,4,5,6,-1][(st>>(i*6+3))&7])) for i in range(5)
], pk >> 20)
def write_short(rules, n, data=None, pos=-1):
st, pk = 0, 0
for i, (r0, r1) in enumerate(rules):
st |= (r0[2]&7) << (6*i)
st |= (r1[2]&7) << (6*i+3)
pk |= r0[0] << (4*i)
pk |= int(r0[1]) << (4*i+1)
pk |= r1[0] << (4*i+2)
pk |= int(r1[1]) << (4*i+3)
pk |= n << 20
if data is None:
a = bytearray(8)
struct.pack_into("<II", a, 0, st, pk)
return a
else:
struct.pack_into("<II", data, pos*8+8, st, pk)
with open("all_5_states_undecided_machines_with_global_header", "rb") as widedbfile:
widedata = mmap.mmap(widedbfile.fileno(), 0, access=mmap.ACCESS_READ)
import sys
if __name__ == "__main__" and sys.argv[1:] == ["convert"]: # Convert
nmach = struct.unpack_from(">I", widedata, 8)[0]
shortdata = bytearray(8)
struct.pack_into("<I", shortdata, 0, nmach)
for i in range(nmach):
if i % 50000 == 0:
print(" %d / %d " % (i, nmach), end="\r")
sys.stdout.flush()
ru, n = read_wide(i)
shortdata += write_short(ru, n)
print("\nDone!")
with open("seeddb_short_norm.bin", "rb") as f: f.write(shortdata)
with open("seeddb_short_norm.bin", "rb") as shortdbfile:
shortdata = mmap.mmap(shortdbfile.fileno(), 0, access=mmap.ACCESS_READ)
with open("bb5_undecided_index", "rb") as f:
undecided = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment