Skip to content

Instantly share code, notes, and snippets.

@cheery
Last active January 24, 2022 19:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cheery/5f7812d7b2d6ad41c3ca2a790b9c69d6 to your computer and use it in GitHub Desktop.
Save cheery/5f7812d7b2d6ad41c3ca2a790b9c69d6 to your computer and use it in GitHub Desktop.
from rpython.rlib.rstacklet import StackletThread
from rpython.rlib.objectmodel import specialize
import core
class SThread(StackletThread):
def __init__(self, config):
StackletThread.__init__(self, config)
self.cont = None
def get_sthread():
ec = core.get_ec()
if not ec.sthread:
ec.sthread = SThread(ec.config)
return ec.sthread
# Continuation represents either live or empty continuation.
# It is never 'blank', assuming you call 'init' immediately after positioning the continuation.
class Continuation(object):
def __init__(self):
self.sthread = sthread = get_sthread()
self.h = sthread.get_null_handle()
self.gate = False
def is_empty(self):
return self.sthread.is_empty_handle(self.h) or not self.gate
def init(self, callback):
sthread = self.sthread
assert sthread.cont is None
sthread.cont = self
self.h = sthread.new(callback)
def switch(self):
sthread = get_sthread()
assert not self.is_empty(), "not so fatal error: empty switch continuation"
self.h = sthread.switch(self.h)
@staticmethod
def wrapped_callback(callback):
def _wrapped_callback(head, arg):
sthread = get_sthread()
this, sthread.cont = sthread.cont, None
this.h = head
this.gate = True
cont = callback(this)
assert not cont.is_empty(), "fatal error: empty return continuation"
cont.gate = False
return cont.h
_wrapped_callback.__name__ = callback.__name__
return _wrapped_callback
from rpython.rlib.rthread import ThreadLocalReference
from continuations import Continuation
class ExecutionContext:
def __init__(self, config):
self.config = config
self.sthread = None
# Startup function is of type ((a ⊸ -) ⊸ -)
self.startup_function = None
self.return_argument = None # Delayed -object.
self.scheduler = None # Start in scheduler.
class GlobalState:
ec = ThreadLocalReference(ExecutionContext, loop_invariant=True)
g = GlobalState()
def init_executioncontext(*args):
ec = ExecutionContext(*args)
g.ec.set(ec)
def get_ec():
ec = g.ec.get()
if isinstance(ec, ExecutionContext):
return ec
import os
os.write(2, "Threads don't support get_ec now.\n")
assert False, "failure"
import os
import core
import interpreter
import inference
from ukanren import (walk, Term, Var, Const, var_to_const, const_to_var,
eq, empty, fresh, disj, conj)
import parser
def greeting_procedure(arg):
os.write(1, "Hello\n")
return arg
greeting = interpreter.Procedure("greeting", greeting_procedure,
Term("1", []), Term("1", []))
class MicroModule(object):
def __init__(self):
self.entries = [greeting]
def lookup(self, name):
for entry in reversed(self.entries):
if entry.name == name:
return entry
def entry_point(config):
def __impl__(argv):
core.init_executioncontext(config)
module = MicroModule()
if len(argv) == 2:
filename = argv[1]
fd = os.open(filename, os.O_RDONLY, 0777)
source = os.read(fd, 64000).decode('utf-8')
t = parser.Tokenizer(parser.Parser())
for char in source:
parser.read_char(t, char)
output = parser.read_done(t)
if output is None:
assert False
output = parser.file_to_list(output, [])
typedecls = parser.list_to_typedecls(output)
decls = parser.list_to_decls(output)
for name in typedecls:
os.write(1, name + " : " + typedecls[name].to_str() + "\n")
for name, term in decls:
os.write(1, name + " = " + term.to_str() + "\n")
for name, term in decls:
state = empty
# handle self-references
inp, state = fresh(state)
outp, state = fresh(state)
decl = interpreter.Declaration(module, name, [], inp, outp)
module.entries.append(decl)
decl.body = interpreter.decl_to_block(term, module, [])
expr = inference.InferBlock(decl.body, inp, outp)
if name in typedecls:
expr = conj(expr,
eq(typedecls[name], Term("⊸", [inp, outp])))
nstates = expr.go(state)
if len(nstates) == 1:
nstate = nstates[0]
table = {}
decl.inp, table = var_to_const(walk(inp, nstate[0]), table)
decl.outp, table = var_to_const(walk(outp, nstate[0]), table)
typename = Term("⊸", [decl.inp, decl.outp]).to_str()
os.write(1, name + " : " + typename + "\n")
else:
module.entries.remove(decl)
os.write(2, name + " not installed due to type error\n")
# dummy function to let inference engine figure out the pieces.
#func1 = [
# i.Op("split", [[i.Op("exch", [])], [i.Op("exch", [])]]),
# i.Op("cur", [[i.Op("exch", [])]]),
# i.Op("alt", [[i.Op("exch", [])], [i.Op("exch", [])]]),
# i.Op("make", [[i.Op("exch", [])], [i.Op("exch", [])], [i.Op("exch", [])]]),
# ]
#body = [i.Op("app", []), i.Op("exch", []), i.Op("split", [[greeting], []])]
inp = Term("⊗", [Term("1", []),
Term("⊸", [Term("1", []), Term("⊥", [])])])
outp = Term("⊥", [])
main = module.lookup("main")
if main is None:
os.write(2, "main not installed\n")
return 1
state = empty
table = {}
i, table, state = const_to_var(main.inp, table, state)
o, table, state = const_to_var(main.outp, table, state)
expr = conj( eq(inp, i), eq(outp, o) )
nstates = expr.go(state)
if len(nstates) == 1:
nstate = nstates[0]
#expr = inference.InferBlock(body, inp, outp)
#nstates = expr.go(state)
#if len(nstates) == 1:
# nstate = nstates[0]
# table = {}
# inp, table = var_to_const(walk(inp, nstate[0]), table)
# outp, table = var_to_const(walk(outp, nstate[0]), table)
# main = i.Declaration(None, "main", body, inp, outp)
func = interpreter.Function(interpreter.unit, [main], main)
ec = core.get_ec()
ec.startup_function = func
ec.return_argument = interpreter.Delayed()
cont = core.Continuation()
ec.return_argument.waiting = cont
ec.scheduler = cont
cont.init(interpreter.launch_new_continuation)
else:
os.write(2, "'main' type does not match an entry point\n")
os.write(2, " " + Term("⊸", [main.inp, main.outp]).to_str() + "\n")
os.write(2, " " + Term("⊸", [inp, outp]).to_str() + "\n")
exit_status = 0
os.write(1, "Bye\n")
return exit_status
return __impl__
def target(driver, args):
driver.exe_name = "lever2"
force_config(driver.config)
return entry_point(driver.config), None
def force_config(config):
config.translation.continuation = True
config.translation.thread = True
if __name__=="__main__":
from rpython.config.translationoption import get_combined_translation_config
import sys
config = get_combined_translation_config(translating=True)
force_config(config)
sys.exit(entry_point(config)(sys.argv))
from ukanren import (walk, Term, Var, Const, Combinator, const_to_var,
eq, empty, fresh, disj, conj, unit)
import interpreter
class InferBlock(Combinator):
def __init__(self, block, inp, outp):
self.block = block
self.inp = inp
self.outp = outp
def go(self, state):
block = self.block
inp = self.inp
outp = self.outp
puzzle = unit
for op in reversed(block):
midp, state = fresh(state)
if isinstance(op, interpreter.Op):
puzzle = conj(puzzle, CheckOp(op, inp, midp))
elif isinstance(op, interpreter.Declaration):
table = {}
a, table, state = const_to_var(op.inp, table, state)
b, table, state = const_to_var(op.outp, table, state)
puzzle = conj(puzzle, conj( eq(inp, a), eq(outp, b)))
elif isinstance(op, interpreter.Procedure):
table = {}
a, table, state = const_to_var(op.inp, table, state)
b, table, state = const_to_var(op.outp, table, state)
puzzle = conj(puzzle, conj( eq(inp, a), eq(outp, b)))
else:
assert False
inp = midp
puzzle = conj(puzzle, eq(inp, outp))
return puzzle.go(state)
class CheckOp(Combinator):
def __init__(self, op, inp, outp):
self.op = op
self.inp = inp
self.outp = outp
def go(self, state):
op = self.op
inp = self.inp
outp = self.outp
opname = op.get_signature()
puzzle = unit
if opname == 'split/2':
a, state = fresh(state)
b, state = fresh(state)
c, state = fresh(state)
d, state = fresh(state)
puzzle = conj(puzzle,
conj( eq(inp, Term("⊗", [a, b])),
eq(outp, Term("⊗", [c, d]))))
puzzle = conj(puzzle,
conj( InferBlock(op.args[0], a, c),
InferBlock(op.args[1], b, d)))
elif opname == 'assl/0':
a, state = fresh(state)
b, state = fresh(state)
c, state = fresh(state)
puzzle = conj(puzzle,
conj( eq(inp, Term("⊗", [a, Term("⊗", [b, c])])),
eq(outp, Term("⊗", [Term("⊗", [a,b]), c]))))
elif opname == 'assr/0':
a, state = fresh(state)
b, state = fresh(state)
c, state = fresh(state)
puzzle = conj(puzzle,
conj( eq(outp, Term("⊗", [Term("⊗", [a,b]), c])),
eq(inp, Term("⊗", [a, Term("⊗", [b, c])]))))
elif opname == 'insl/0':
puzzle = conj(puzzle, eq(outp, Term("⊗", [Term("1", []), inp])))
elif opname == 'dell/0':
puzzle = conj(puzzle, eq(inp, Term("⊗", [Term("1", []), outp])))
elif opname == 'exch/0':
a, state = fresh(state)
b, state = fresh(state)
puzzle = conj(puzzle,
conj( eq(inp, Term("⊗", [a, b])),
eq(outp, Term("⊗", [b, a]))))
elif opname == 'app/0':
a, state = fresh(state)
puzzle = conj(puzzle,
eq(inp, Term("⊗", [Term("⊸", [a, outp]), a])))
elif opname == 'cur/1':
a, state = fresh(state)
b, state = fresh(state)
puzzle = conj(puzzle,
InferBlock(op.args[0], Term("⊗", [inp, a]), b))
puzzle = conj(puzzle,
eq(outp, Term("⊸", [a, b])))
elif opname == 'inl/0':
a, state = fresh(state)
puzzle = conj(puzzle, eq(outp, Term("+", [inp, a])))
elif opname == 'inr/0':
a, state = fresh(state)
puzzle = conj(puzzle, eq(outp, Term("+", [a, inp])))
elif opname == 'alt/2':
a, state = fresh(state)
b, state = fresh(state)
puzzle = conj(puzzle,
eq(inp, Term("+", [a, b])))
puzzle = conj(puzzle,
conj( InferBlock(op.args[0], a, outp),
InferBlock(op.args[1], b, outp)))
elif opname == 'read/0':
puzzle = conj(puzzle, eq(inp, Term("!", [outp])))
elif opname == 'kill/0':
a, state = fresh(state)
puzzle = conj(puzzle, eq(inp, Term("!", [a])))
puzzle = conj(puzzle, eq(outp, Term("1", [])))
elif opname == 'copy/0':
a, state = fresh(state)
ta = Term("!", [a])
puzzle = conj(puzzle, eq(inp, ta))
puzzle = conj(puzzle, eq(outp, Term("⊗", [ta, ta])))
elif opname == 'make/3':
a, state = fresh(state)
puzzle = conj(puzzle,
eq(outp, Term("!", [a])))
puzzle = conj(puzzle,
conj( conj( InferBlock(op.args[0], inp, a),
InferBlock(op.args[1], inp, Term("1", []))),
InferBlock(op.args[2], inp, Term("⊗", [inp, inp]))))
#elif opname == 'absurd/0':
# pass
elif opname == 'callcc/0':
puzzle = conj(puzzle,
eq(inp, Term("⊸", [
Term("⊸", [outp, Term("⊥", [])]),
Term("⊥", [])])))
return puzzle.go(state)
from rpython.rlib import rvmprof
import core
def decl_to_block(term, namespace, output):
sig = term.get_signature()
if sig == "./2":
output = decl_to_block(term.args[0], namespace, output)
output = decl_to_block(term.args[1], namespace, output)
elif sig == "id/0":
pass
elif sig in opnames:
nargs = []
for arg in term.args:
nargs.append(decl_to_block(arg, namespace, []))
output.append(Op(term.name, nargs))
else:
obj = namespace.lookup(term.name)
if obj is None:
assert False
else:
output.append(obj)
return output
class Command:
pass
class Op(Command):
def __init__(self, opname, args):
self.opname = opname
self.args = args
assert opname + '/' + str(len(args)) in opnames
def get_signature(self):
return self.opname + "/" + str(len(self.args))
class Declaration(Command):
def __init__(self, module, name, body, inp, outp):
self.module = module
self.name = name
self.body = body
self.inp = inp
self.outp = outp
class Procedure(Command):
def __init__(self, name, callback, inp, outp):
self.name = name
self.callback = callback
self.inp = inp
self.outp = outp
class Arg:
pass
class Decl(Arg):
def __init__(self, declaration):
self.declaration = declaration
class Cons(Arg):
def __init__(self, a, b):
self.a = a
self.b = b
class Inl(Arg):
def __init__(self, a):
self.a = a
class Inr(Arg):
def __init__(self, a):
self.a = a
class Unit(Arg):
pass
unit = Unit()
class Function(Arg):
def __init__(self, a, body, declaration):
self.a = a
self.body = body
self.declaration = declaration
#class Continuation(Arg):
# def __init__(self, cont):
# self.cont = cont
class Copier(Arg):
def __init__(self, a, read, kill, copy):
self.a = a
self.read = read
self.kill = kill
self.copy = copy
class Delayed(Arg):
def __init__(self):
self.result = None
self.waiting = None
opnames = [
'split/2',
'assl/0',
'assr/0',
'insl/0',
'dell/0',
'exch/0',
'app/0',
'cur/1',
'inl/0',
'inr/0',
'alt/2',
'read/0',
'kill/0',
'copy/0',
'make/3',
#'absurd/0', # TODO. not sure about the rules.
'callcc/0' ]
# rvmprof stuff, to see if it wakes up continuations.
def get_declaration_name(decl):
return "py:%s" % decl.declaration.name
rvmprof.register_code_object_class(Decl, get_declaration_name)
def get_code(block, arg, declaration):
return Decl(declaration)
@rvmprof.vmprof_execute_code("pypy", get_code, Arg)
def eval_block(block, arg, declaration):
for f in reversed(block):
if isinstance(f, Declaration):
arg = eval_block(f.body, arg, f)
elif isinstance(f, Procedure):
arg = f.callback(arg)
elif isinstance(f, Op):
arg = eval(f, arg, declaration)
else:
pass
return arg
def eval(f, arg, declaration):
assert isinstance(f, Op)
if f.opname == "split":
assert isinstance(arg, Cons)
a = eval_block(f.args[0], arg.a, declaration)
b = eval_block(f.args[1], arg.b, declaration)
return Cons(a, b)
elif f.opname == 'assl':
assert isinstance(arg, Cons) and isinstance(arg.b, Cons)
return Cons(Cons(arg.a, arg.b.a), arg.b.b)
elif f.opname == 'assr':
assert isinstance(arg, Cons) and isinstance(arg.a, Cons)
return Cons(arg.a.a, Cons(arg.a.b, arg.b))
elif f.opname == 'insl':
return Cons(unit, arg)
elif f.opname == 'dell':
assert isinstance(arg, Cons) and isinstance(arg.a, Unit)
return arg.b
elif f.opname == 'exch':
assert isinstance(arg, Cons)
return Cons(arg.b, arg.a)
elif f.opname == 'app':
assert isinstance(arg, Cons)
if isinstance(arg.a, Function):
return eval_block(arg.a.body, Cons(arg.a.a, arg.b), arg.a.declaration)
elif isinstance(arg.a, Delayed):
return delayed_value(arg.a, arg.b)
else:
assert False
elif f.opname == 'cur':
return Function(arg, f.args[0], declaration)
elif f.opname == 'inl':
return Inl(arg)
elif f.opname == 'inr':
return Inr(arg)
elif f.opname == 'alt':
if isinstance(arg, Inl):
return eval_block(f.args[0], arg.a, declaration)
elif isinstance(arg, Inr):
return eval_block(f.args[1], arg.a, declaration)
else:
assert False
elif f.opname == 'read':
assert isinstance(arg, Copier)
result = eval_block(arg.read, arg.a, declaration)
return result
elif f.opname == 'kill':
assert isinstance(arg, Copier)
result = eval_block(arg.kill, arg.a, declaration)
assert isinstance(result, Unit)
return result
elif f.opname == 'copy':
assert isinstance(arg, Copier)
result = eval_block(arg.copy, arg.a, declaration)
assert isinstance(result, Cons)
return Cons(Copier(result.a, arg.read, arg.kill, arg.copy),
Copier(result.b, arg.read, arg.kill, arg.copy))
elif f.opname == 'make':
return Copier(arg, f.args[0], f.args[1], f.args[2])
elif f.opname == 'absurd':
assert False
elif f.opname == 'callcc':
assert isinstance(arg, Function) or isinstance(arg, Delayed)
ec = core.get_ec()
ec.startup_function = arg
ec.return_argument = ret = Delayed()
cont = core.Continuation()
ec.return_argument.waiting = cont
cont.init(launch_new_continuation)
result, ret.result = ret.result, None
assert ret.waiting is None
assert result is not None
return result
@core.Continuation.wrapped_callback
def launch_new_continuation(cont):
ec = core.get_ec()
func, ec.startup_function = ec.startup_function, None
delayed, ec.return_argument = ec.return_argument, None
arg = delayed
if isinstance(func, Function):
eval_block(func.body, Cons(func.a, arg), func.declaration)
elif isinstance(func, Delayed):
assert func.result is None
assert func.waiting is not None
func.result = arg
waiting, delayed.waiting = delayed.waiting, None
waiting.switch()
return cont
def delayed_value(delayed, value):
assert delayed.result is None
delayed.result = value
waiting, delayed.waiting = delayed.waiting, None
if waiting is None:
ec = core.get_ec()
scheduler, ec.scheduler = ec.scheduler, None
scheduler.switch()
else:
waiting.switch()
def wait_on_delayed(delayed):
if delayed.result is None:
assert delayed.waiting is None
ec = core.get_ec()
scheduler, ec.scheduler = ec.scheduler, None
delayed.waiting = scheduler
scheduler.switch()
else:
return delayed.result
# -*- coding: utf-8 -*-
import ukanren
from ukanren import Term
def file_to_list(term, output):
if term.name == "__":
output = file_to_list(term.args[0], output)
output.append(term.args[1])
return output
if term.name == "[]":
return output
assert False
def list_to_typedecls(filelist):
output = {}
for term in filelist:
if term.name == "typedecl":
output[term.args[0].name] = term.args[1]
return output
def list_to_decls(filelist):
output = []
for term in filelist:
if term.name == "decl":
output.append((term.args[0].name, term.args[1]))
return output
# top → file
#
# file → file stmt
# | ()
#
# stmt → id : dot
# | id = dot
#
# dot → pop . dot
# | pop
#
# pop → plus ⊸ pop
# | plus
#
# plus → tensor + plus
# | tensor
#
# tensor → shout ⊗ tensor
# | shout
#
# shout → ! shout
# | call
#
# call → call ( dot )
# | ( dot )
# | id
state = [ { u"": (1, 3), u'file': (0, 2), u'id': (1, 3), u'top': (0, 1)},
{ u"": (1, 0)},
{ u"": (1, 1), u'id': (0, 4), u'stmt': (0, 3)},
{ u"": (1, 2), u'id': (1, 2)},
{ u'":"': (0, 6), u'"="': (0, 5)},
{ u'"!"': (0, 8),
u'"("': (0, 9),
u'call': (0, 12),
u'dot': (0, 15),
u'id': (0, 14),
u'plus': (0, 13),
u'pop': (0, 10),
u'shout': (0, 11),
u'tensor': (0, 7)},
{ u'"!"': (0, 8),
u'"("': (0, 9),
u'call': (0, 12),
u'dot': (0, 16),
u'id': (0, 14),
u'plus': (0, 13),
u'pop': (0, 10),
u'shout': (0, 11),
u'tensor': (0, 7)},
{ u"": (1, 11),
u'")"': (1, 11),
u'"+"': (0, 17),
u'"."': (1, 11),
u'"⊸"': (1, 11),
u'id': (1, 11)},
{ u'"!"': (0, 8),
u'"("': (0, 9),
u'call': (0, 12),
u'id': (0, 14),
u'shout': (0, 18)},
{ u'"!"': (0, 8),
u'"("': (0, 9),
u'call': (0, 12),
u'dot': (0, 19),
u'id': (0, 14),
u'plus': (0, 13),
u'pop': (0, 10),
u'shout': (0, 11),
u'tensor': (0, 7)},
{ u"": (1, 7), u'")"': (1, 7), u'"."': (0, 20), u'id': (1, 7)},
{ u"": (1, 13),
u'")"': (1, 13),
u'"+"': (1, 13),
u'"."': (1, 13),
u'"⊗"': (0, 21),
u'"⊸"': (1, 13),
u'id': (1, 13)},
{ u"": (1, 15),
u'"("': (0, 22),
u'")"': (1, 15),
u'"+"': (1, 15),
u'"."': (1, 15),
u'"⊗"': (1, 15),
u'"⊸"': (1, 15),
u'id': (1, 15)},
{ u"": (1, 9),
u'")"': (1, 9),
u'"."': (1, 9),
u'"⊸"': (0, 23),
u'id': (1, 9)},
{ u"": (1, 18),
u'"("': (1, 18),
u'")"': (1, 18),
u'"+"': (1, 18),
u'"."': (1, 18),
u'"⊗"': (1, 18),
u'"⊸"': (1, 18),
u'id': (1, 18)},
{ u"": (1, 5), u'id': (1, 5)},
{ u"": (1, 4), u'id': (1, 4)},
{ u'"!"': (0, 8),
u'"("': (0, 9),
u'call': (0, 12),
u'id': (0, 14),
u'plus': (0, 24),
u'shout': (0, 11),
u'tensor': (0, 7)},
{ u"": (1, 14),
u'")"': (1, 14),
u'"+"': (1, 14),
u'"."': (1, 14),
u'"⊗"': (1, 14),
u'"⊸"': (1, 14),
u'id': (1, 14)},
{ u'")"': (0, 25)},
{ u'"!"': (0, 8),
u'"("': (0, 9),
u'call': (0, 12),
u'dot': (0, 26),
u'id': (0, 14),
u'plus': (0, 13),
u'pop': (0, 10),
u'shout': (0, 11),
u'tensor': (0, 7)},
{ u'"!"': (0, 8),
u'"("': (0, 9),
u'call': (0, 12),
u'id': (0, 14),
u'shout': (0, 11),
u'tensor': (0, 27)},
{ u'"!"': (0, 8),
u'"("': (0, 9),
u'call': (0, 12),
u'dot': (0, 28),
u'id': (0, 14),
u'plus': (0, 13),
u'pop': (0, 10),
u'shout': (0, 11),
u'tensor': (0, 7)},
{ u'"!"': (0, 8),
u'"("': (0, 9),
u'call': (0, 12),
u'id': (0, 14),
u'plus': (0, 13),
u'pop': (0, 29),
u'shout': (0, 11),
u'tensor': (0, 7)},
{ u"": (1, 10),
u'")"': (1, 10),
u'"."': (1, 10),
u'"⊸"': (1, 10),
u'id': (1, 10)},
{ u"": (1, 17),
u'"("': (1, 17),
u'")"': (1, 17),
u'"+"': (1, 17),
u'"."': (1, 17),
u'"⊗"': (1, 17),
u'"⊸"': (1, 17),
u'id': (1, 17)},
{ u"": (1, 6), u'")"': (1, 6), u'id': (1, 6)},
{ u"": (1, 12),
u'")"': (1, 12),
u'"+"': (1, 12),
u'"."': (1, 12),
u'"⊸"': (1, 12),
u'id': (1, 12)},
{ u'")"': (0, 30)},
{ u"": (1, 8), u'")"': (1, 8), u'"."': (1, 8), u'id': (1, 8)},
{ u"": (1, 16),
u'"("': (1, 16),
u'")"': (1, 16),
u'"+"': (1, 16),
u'"."': (1, 16),
u'"⊗"': (1, 16),
u'"⊸"': (1, 16),
u'id': (1, 16)}]
rstate = [
(u"", 1),
(u'top', 1),
(u'file', 2),
(u'file', 0),
(u'stmt', 3),
(u'stmt', 3),
(u'dot', 3),
(u'dot', 1),
(u'pop', 3),
(u'pop', 1),
(u'plus', 3),
(u'plus', 1),
(u'tensor', 3),
(u'tensor', 1),
(u'shout', 2),
(u'shout', 1),
(u'call', 4),
(u'call', 3),
(u'call', 1)]
def get_action(top, sym):
return state[top][sym]
class Tokenizer(object):
def __init__(self, parser):
self.state = 0
self.text = u""
self.parser = parser
class Parser(object):
def __init__(self):
self.stack = []
self.top = 0
self.output = None
def step(self, name, value):
assert isinstance(name, unicode)
self.output = None
if name == u'"⊥"':
return self.step(u"id", value)
#import os
#os.write(2, "debug: " + name.encode('utf-8') + ": " + value.encode('utf-8') + "\n")
arg = get_action(self.top, name)
while arg[0] == 1:
lhs, count = rstate[arg[1]]
objects = []
for _ in range(count):
self.top, obj = self.stack.pop()
objects.append(obj)
objects.reverse()
robject = reduction(arg[1], objects)
if lhs == u"":
self.stack = []
self.top = 0
self.output = robject
return
else:
arg = state[self.top][lhs]
assert arg[0] != 1
self.stack.append((self.top, robject))
self.top = arg[1]
arg = get_action(self.top, name)
self.stack.append((self.top, ukanren.Term(value.encode('utf-8'), [])))
self.top = arg[1]
def reduction(rule, args):
if rule == 0: # top
return args[0]
if rule == 1: # top → file
return args[0]
if rule == 2: # file → file stmt
return Term("__", args)
if rule == 3: # | ()
return Term("[]", [])
if rule == 4: # stmt → id : dot
return Term("typedecl", [args[0], args[2]])
if rule == 5: # | id = dot
return Term("decl", [args[0], args[2]])
if rule == 6: # dot → pop . dot
return Term(".", [args[0], args[2]])
if rule == 7: # | pop
return args[0]
if rule == 8: # pop → plus ⊸ pop
return Term("⊸", [args[0], args[2]])
if rule == 9: # | plus
return args[0]
if rule == 10: # plus → tensor + plus
return Term("+", [args[0], args[2]])
if rule == 11: # | tensor
return args[0]
if rule == 12: # tensor → shout ⊗ tensor
return Term("⊗", [args[0], args[2]])
if rule == 13: # | shout
return args[0]
if rule == 14: # shout → ! shout
return Term("!", [args[1]])
if rule == 15: # | call
term = args[0]
if len(term.args) == 0 and term.name.startswith("$"):
digits = term.name[1:len(term.name)]
if digits.isdigit():
return ukanren.Const(int(digits))
return term
if rule == 16: # call → call ( dot )
args[0].args.append(args[2])
return args[0]
if rule == 17: # | ( dot )
return args[1]
if rule == 18: # | id
return args[0]
assert False
# Express every item with ukanren Term
# foo : x ⊸ x ⊗ y ⊗ _
# foo = split () () . exch
recognized_symbols = [
u"(", u")", u"⊗", u"⊸", u"!", u"+", u"⊥", u"=", u":", u"." ]
def read_char(t, char):
assert isinstance(char, unicode)
#import os
#os.write(2, "debug-char: " + char.encode('utf-8') + "\n")
if t.state == 0:
if char in [u" ", u"\t", u"\n"]:
t.state = 0
elif char == u'#':
t.state = 20
elif char == u"⊗" or char == u"⊸":
t.parser.step(u'"' + char + u'"', char)
elif char in recognized_symbols:
t.parser.step(u'"' + char + u'"', char)
else:
t.text += char
t.state = 1
elif t.state == 1: # retrieving characters.
if char in [u" ", u"\t", u"\n"]:
t.parser.step(u"id", t.text)
t.text = u""
t.state = 0
elif char == u'#':
t.parser.step(u"id", t.text)
t.text = u""
t.state = 20
elif char == u"⊗" or char == u"⊸":
t.parser.step(u"id", t.text)
t.text = u""
t.state = 0
t.parser.step(u'"' + char + u'"', char)
elif char in recognized_symbols:
t.parser.step(u"id", t.text)
t.text = u""
t.state = 0
t.parser.step(u'"' + char + u'"', char)
else:
t.text += char
t.state = 1
elif t.state == 20:
if char == u"\n":
t.state = 0
else:
t.state = 20
else:
raise ReadError(char)
def read_done(t):
if t.state == 1:
t.parser.step(u"id", t.text)
t.state = 0
t.parser.step(u"", u"")
return t.parser.output
class ReadError(Exception):
def __init__(self, char):
self.char = char
def rep(self):
return "When trying to read char 0x%x" % (ord(self.char))
def to_digit(char):
return ord(char) - ord('0')
class TermOrVar(object):
def to_str(self, rbp=0):
assert False
class Term(TermOrVar):
def __init__(self, name, args):
self.name = name
self.args = args
def to_str(self, rbp=0):
sig = self.get_signature()
if rbp > get_rbp(sig):
return "(" + self.to_str() + ")"
elif len(self.args) == 0:
return self.name
elif sig == "./2":
a = self.args[0].to_str(11)
b = self.args[1].to_str(10)
return a + " " + self.name + " " + b
elif sig == "⊸/2":
a = self.args[0].to_str(21)
b = self.args[1].to_str(20)
return a + " " + self.name + " " + b
elif sig == "+/2":
a = self.args[0].to_str(31)
b = self.args[1].to_str(30)
return a + " " + self.name + " " + b
elif sig == "⊗/2":
a = self.args[0].to_str(41)
b = self.args[1].to_str(40)
return a + " " + self.name + " " + b
elif sig == "!/1":
a = self.args[0].to_str(51)
return self.name + a
else:
args = ""
sp = " "
for arg in self.args:
args += sp + "(" + arg.to_str() + ")"
sp = " "
return self.name + args
def get_signature(self):
return self.name + "/" + str(len(self.args))
def get_rbp(sig):
if sig == "./2":
return 10
if sig == "⊸/2":
return 20
if sig == "+/2":
return 30
if sig == "⊗/2":
return 40
if sig == "!/1":
return 50
return 60
class Var(TermOrVar):
def __init__(self, key):
self.key = key
def to_str(self, rbp=0):
return "#" + str(self.key)
class Const(TermOrVar):
def __init__(self, key):
self.key = key
def to_str(self, rbp=0):
return "$" + str(self.key)
def var_to_const(term, table):
if isinstance(term, Var) or isinstance(term, Const):
new_const = table.get(term.to_str(), None)
if new_const is None:
new_const = table[term.to_str()] = Const(len(table))
return new_const, table
elif isinstance(term, Term):
nargs = []
for arg in term.args:
narg, table = var_to_const(arg, table)
nargs.append(narg)
return Term(term.name, nargs), table
else:
assert False
def const_to_var(term, table, state):
if isinstance(term, Var): # The namespace entries may contain variables.
return term, table, state
elif isinstance(term, Const):
new_var = table.get(term.key, None)
if new_var is None:
var, state = fresh(state)
new_var = table[term.key] = var
return new_var, table, state
elif isinstance(term, Term):
nargs = []
for arg in term.args:
narg, table, state = const_to_var(arg, table, state)
nargs.append(narg)
return Term(term.name, nargs), table, state
else:
assert False
def walk(term, subst):
if isinstance(term, Var):
val = subst.get(term.key, None)
if val is None:
return term
else:
return walk(val, subst)
elif isinstance(term, Term):
nargs = []
for arg in term.args:
nargs.append(walk(arg, subst))
return Term(term.name, nargs)
elif isinstance(term, Const):
return term
else:
assert False
def extS(key, term, subst):
if occurs(key, term, subst):
return None
else:
nsubst = subst.copy()
nsubst[key] = term
return nsubst
class Combinator:
def go(self, state):
assert False
class eq(Combinator):
def __init__(self, t1, t2):
self.t1 = t1
self.t2 = t2
def go(self, state):
s, vc = state
res = unify(self.t1, self.t2, s)
if res is None:
#import os
#os.write(1, "fail\n")
#for key, term in s.items():
# os.write(1, "%d = %s\n" % (key, walk(term,s).to_str()))
return []
return [(res, vc)]
def unify(t1, t2, subst):
t1 = walk(t1, subst)
t2 = walk(t2, subst)
#import os
#os.write(1, t1.to_str() + " == " + t2.to_str() + "\n")
if isinstance(t1, Var) and isinstance(t2, Var) and t1.key == t2.key:
return subst
elif isinstance(t1, Var):
return extS(t1.key, t2, subst)
elif isinstance(t2, Var):
return extS(t2.key, t1, subst)
elif isinstance(t1, Term) and isinstance(t2, Term):
if t1.name == t2.name and len(t1.args) == len(t2.args):
for i in range(0, len(t1.args)):
subst = unify(t1.args[i], t2.args[i], subst)
if subst is None:
return None
return subst
else:
return None
elif isinstance(t1, Const) and isinstance(t2, Const):
if t1.key == t2.key:
return subst
else:
return None
else:
assert False
def occurs(key, term, subst):
term = walk(term, subst)
if isinstance(term, Var):
return term.key == key
elif isinstance(term, Term):
for arg in term.args:
if occurs(key, arg, subst):
return True
return False
elif isinstance(term, Const):
return False
else:
assert False
empty = (dict(), 0)
def fresh(state):
s, vc = state
return Var(vc), (s, vc+1)
class Unit(Combinator):
def __init__(self):
pass
def go(self, state):
return [state]
unit = Unit()
class disj(Combinator):
def __init__(self, g1, g2):
self.g1 = g1
self.g2 = g2
def go(self, state):
return self.g1.go(state) + self.g2.go(state)
class conj(Combinator):
def __init__(self, g1, g2):
self.g1 = g1
self.g2 = g2
def go(self, state):
res = []
for nstate in self.g1.go(state):
res.extend(self.g2.go(nstate))
return res
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment