Skip to content

Instantly share code, notes, and snippets.

@charles-l
Last active February 1, 2020 21:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save charles-l/6e222cef50d2a55e0d0adea6013cb5f1 to your computer and use it in GitHub Desktop.
Save charles-l/6e222cef50d2a55e0d0adea6013cb5f1 to your computer and use it in GitHub Desktop.
from typing import Callable, Any
from dataclasses import dataclass
class _CpsA: # (x, k) -> ()
@staticmethod
def lift(f) -> '_CpsA':
return _CpsA(lambda x, k: k(f(x)))
def __init__(self, cps: Callable[[Any, Callable], None]):
self.cps = cps
def next(self, g):
g = CpsA(g)
# Basically cons-ing onto the next function functiong
# (we build the stack backwards so when we pass `x` in during the run it gets
# sent through the entire stack)
# In other words, CPS can be viewed as turning a sequence of funcalls into a linked list
return _CpsA(lambda x, k: self.cps(x, lambda y: g.cps(y, k)))
def run(self, x):
return self.cps(x, lambda y: y)
def CpsA(f):
if isinstance(f, _CpsA): # identity
return f
if callable(f): # function
return _CpsA.lift(f) # lifted into CpsA
assert False, f
class SimpleEventA(_CpsA):
def __init__(self, eventname):
self.eventname = eventname
def _f(target, k):
def handler(event):
# might have to try/catch unbind
target.unbind(eventname, handler)
k((target, event))
target.bind(eventname, handler)
super().__init__(_f)
def AsyncA(f):
if isinstance(f, _AsyncA):
return f
if callable(f): # function
return _AsyncA.lift(f)
assert False, f
@dataclass
class Repeat:
value: Any
@dataclass
class Done:
value: Any
class _AsyncA: # (x, p, k) -> ()
def __init__(self, cps):
self.acps = cps
def next(self, g):
g = AsyncA(g)
def inner(x, p, k):
self.acps(x, p, lambda y, q: g.acps(y, q, k))
return _AsyncA(inner)
def run(self, x, p=None) -> 'ProgressA':
p = p if p else ProgressA()
self.acps(x, p, lambda y, p: y)
return p
def repeat(self):
def repeatf(x, p, k):
def inner_repeatf(y, q):
if isinstance(y, Repeat):
repeatf(y.value, q, k)
elif isinstance(y, Done):
k(y.value, q)
else:
raise TypeError("Unknown type to repeat on", type(y))
self.acps(x, p, inner_repeatf)
return _AsyncA(repeatf)
def product(self, g):
g = AsyncA(g)
def productf(x, p, k):
c = [2]
out = [None, None]
def set_out(i, y):
out[i] = y
# barrier
c[0] -= 1
if c[0] == 0:
k((out[0], out[1]), p)
self.next(lambda y1: set_out(0, y1)).run(x[0], p)
g.next(lambda y2: set_out(1, y2)).run(x[1], p)
return _AsyncA(productf)
def or_(self, g):
g = AsyncA(g)
def or_f(x, p, k):
prog = [ProgressA(), ProgressA()]
def end(i):
prog[i].cancel()
prog[i] = None
prog[0].next(lambda _: end(1)).run(None)
prog[1].next(lambda _: end(0)).run(None)
def cancel():
if prog[0]: prog[0].cancel()
if prog[1]: prog[1].cancel()
def join(y, q):
p.advance(cancel)
k(y, q)
p.add_canceller(cancel)
self.acps(x, prog[0], join)
g.acps(x, prog[1], join)
return _AsyncA(or_f)
@staticmethod
def lift(f: Callable[[Any], Any]):
return _AsyncA(lambda x, p, k: k(f(x), p))
def ConstA(x):
return AsyncA(lambda _: x)
class ProgressA(_AsyncA):
def __init__(self):
self.cancellers = []
self.observers = []
super().__init__(lambda x, p, k: self.observers.append(lambda y: k(y, p)))
def add_canceller(self, canceller):
self.cancellers.append(canceller)
def advance(self, canceller):
try:
i = self.cancellers.index(canceller)
del self.cancellers[i]
except:
pass
while self.observers:
self.observers.pop()(None)
def cancel(self):
while self.cancellers:
self.cancellers.pop()()
class EventA(_AsyncA):
def __init__(self, eventname):
self.eventname = eventname
def inner(target, p, k):
def cancel():
# might have to try/catch unbind
target.unbind(eventname, handler)
def handler(event):
p.advance(cancel)
cancel()
k((target, event), p)
p.add_canceller(cancel)
target.bind(eventname, handler)
super().__init__(inner)
import unittest
from arrowlets import *
class Dummy:
def __init__(self):
self.event_bindings = {}
def bind(self, event, f):
if event in self.event_bindings:
self.event_bindings[event].add(f)
else:
self.event_bindings[event] = {f}
def unbind(self, event, f):
if event not in self.event_bindings or f not in self.event_bindings[event]:
return
self.event_bindings[event].remove(f)
def trigger(self, event):
assert event in self.event_bindings, f'"{event}" event not found'
# we construct a list to force a copy of this to be created
# so each function gets triggered, but we may remove it in the
# middle of iteration
for f in list(self.event_bindings[event]):
f(event)
class TestDummy(unittest.TestCase):
def test_dummy(self):
d = Dummy()
flag = [False]
def f(_):
flag[0] = True
d.bind('event', f)
d.trigger('event')
self.assertTrue(flag[0])
flag[0] = False
d.unbind('event', f)
d.trigger('event')
self.assertFalse(flag[0])
class ArrowletsTest(unittest.TestCase):
def test_CpsA(self):
add1 = lambda x: x + 1
add2 = CpsA(add1).next(CpsA(add1))
self.assertEqual(add2.run(1), 3)
def test_SimpleEventA(self):
nclicks = [0]
def clickTargetA(x):
target, event = x
nclicks[0] += 1
return target
d = Dummy()
SimpleEventA('click').next(clickTargetA).run(d)
d.trigger('click')
self.assertEqual(nclicks[0], 1)
nclicks = [0]
SimpleEventA('click').next(clickTargetA)\
.next(SimpleEventA('click'))\
.next(clickTargetA).run(d)
d.trigger('click')
d.trigger('click') # unbinds after two clicks
d.trigger('click')
self.assertEqual(nclicks[0], 2)
def test_EventA(self):
def clickTargetA(x):
target, event = x
nclicks[0] += 1
return target
target = Dummy()
###
nclicks = [0]
p = EventA('click')\
.next(clickTargetA)\
.next(EventA('click'))\
.next(clickTargetA)\
.run(target)
target.trigger('click')
target.trigger('click')
self.assertEqual(nclicks[0], 2)
###
nclicks = [0]
p = EventA('click')\
.next(clickTargetA)\
.next(EventA('click'))\
.next(clickTargetA)\
.run(target)
target.trigger('click') # nclicks++
p.cancel() # cancel the chain
self.assertEqual(nclicks[0], 1)
target.trigger('click') # nclicks shouldn't increment here
self.assertEqual(nclicks[0], 1)
###
nclicks = [0]
t = Dummy()
t.text = 'nothing here'
p = EventA('click')\
.next(clickTargetA)\
.next(EventA('click'))\
.next(clickTargetA)\
.run(t)
def f(_):
t.text = 'clicked!'
p.next(f).run(None)
self.assertEqual(t.text, 'nothing here')
self.assertEqual(nclicks[0], 0)
t.trigger('click')
self.assertEqual(nclicks[0], 1)
self.assertEqual(t.text, 'clicked!')
t.trigger('click')
self.assertEqual(nclicks[0], 2)
self.assertEqual(t.text, 'clicked!')
def test_ConstA(self):
ConstA(1).next(lambda x: self.assertEqual(x, 1)).run(2)
def test_repeat(self):
def bubblesort(x):
lst, i, j = x
if j + 1 < i:
if lst[j] > lst[j + 1]:
lst[j], lst[j+1] = lst[j+1], lst[j]
return Repeat((lst, i, j+1))
elif i > 0:
return Repeat((lst, i-1, 0))
else:
return Done(lst)
bubblesortA = AsyncA(bubblesort).repeat()
lst = [1,3,2,0,4]
bubblesortA.run((lst, 4, 0))
self.assertEqual(lst, [0,1,2,3,4])
def test_product(self):
target1 = Dummy()
target2 = Dummy()
def clickTargetsA(a):
t1, t2 = a[0][0], a[1][0]
t1.text = 'clicked a'
t2.text = 'clicked b'
EventA('click').product(EventA('click'))\
.next(clickTargetsA)\
.run((target1, target2))
target1.trigger('click')
target2.trigger('click')
self.assertEqual(target1.text, 'clicked a')
self.assertEqual(target2.text, 'clicked b')
def test_or(self):
def WriteA(s):
def f(x):
target, event = x
target.text = s
return target
return f
heads_elem = Dummy()
tails_elem = Dummy()
heads_elem.text = ''
tails_elem.text = ''
heads = ConstA(heads_elem)
tails = ConstA(tails_elem)
heads.next(EventA('click')).next(WriteA('I win!'))\
.or_(tails.next(EventA('click')).next(WriteA('You lose!'))).run(None)
heads_elem.trigger('click')
self.assertEqual(heads_elem.text, 'I win!')
self.assertEqual(tails_elem.text, '')
###
heads_elem.text = ''
tails_elem.text = ''
heads.next(EventA('click')).next(WriteA('I win!'))\
.or_(tails.next(EventA('click')).next(WriteA('You lose!'))).run(None)
tails_elem.trigger('click')
self.assertEqual(heads_elem.text, '')
self.assertEqual(tails_elem.text, 'You lose!')
def test_dragndrop(self):
target = Dummy()
proxy = Dummy()
proxy.target = target
proxy.status = 'dropped'
def DragElementA(target):
# hack because I don't care anymore...
return ConstA(target).next(lambda _: proxy)
def setupA(x):
proxy, event = x
proxy.status = 'setup'
return proxy
def dragA(x):
proxy, event = x
proxy.status = 'dragging'
return proxy
def dropA(x):
proxy, event = x
proxy.status = 'dropped'
return proxy
def cancelA(x):
proxy, event = x
proxy.status = 'cancelled'
return proxy
dragOrDropA = (
(EventA('mousemove').next(dragA)).next(Repeat)\
.or_((EventA('mouseup').next(dropA).next(Done)))).repeat()
dragDropOrCancelA = (
(EventA('mousemove').next(dragA).next(dragOrDropA))\
.or_((EventA('mouseup').next(cancelA))))
dragAndDropA = (
(EventA('mousedown').next(setupA).next(dragDropOrCancelA)))
DragElementA('dragtarget').next(dragAndDropA).run(None)
proxy.trigger('mousedown')
proxy.trigger('mousemove')
self.assertEqual(proxy.status, 'dragging')
if __name__ == '__main__':
unittest.main()

TBH, I still don't fully understand Arrows. I can sort of understand the control flow, but it's hard to follow in a dynamically typed language, and error-prone to implement new arrows.

I tried to use type annotations to help, but I couldn't figure out how to represent the Arrow typeclass. Protocols didn't work, so I'm not sure if it's possible...

This approach is very anti-python IMHO. Since python doesn't support multiline anonymous lambdas, there are a lot of nested inner functions (with bad names). Also, the implementaion in the paper uses a lot of mutation of free variables, which python doesn't support, so that's why there are single-element lists sprinkled throughout the code.

I'm going to try to find an alternative approach that's more pythonic. I don't see how this approach will scale since it's already difficult for me to debug.

Also, it's probably a terrible idea to use this library for anything real. Python doesn't have tail-call optimizations so the stack will just grow indefinitely (particularly an issue for repeat) since CPS isn't implemented with a trampoline.

@charles-l
Copy link
Author

TODO: maybe add a demo of drag and drop using TK.

@charles-l
Copy link
Author

other experiments can be found here: https://git.sr.ht/~nch/glue

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment