Skip to content

Instantly share code, notes, and snippets.

@inaz2
Last active September 7, 2023 14:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save inaz2/cfb3376144c6f03c7b611a30d6605f2e to your computer and use it in GitHub Desktop.
Save inaz2/cfb3376144c6f03c7b611a30d6605f2e to your computer and use it in GitHub Desktop.
Emulating fake algebraic effects in Python 3
import itertools
from dataclasses import dataclass
from typing import Any, Callable
Handler = Callable[[Any], Any]
@dataclass(frozen=True, match_args=True)
class Effect:
expr: tuple
@dataclass(frozen=True, match_args=True)
class Value:
x: tuple
class Eff:
UNMATCHED = object()
def __init__(self, handler: Handler) -> None:
self.k: Continuation = Continuation(handler)
self.ret: Any = None
@classmethod
def handle(cls, handler: Handler):
return cls(handler)
@classmethod
def perform(cls, *args):
eff = cls.handle(lambda pat: Eff.UNMATCHED)
eff.k.perform(*args)
eff.evaluate()
return eff
def __str__(self) -> str:
return "Eff(handler={!r})".format(self.k.handler.__name__)
def __enter__(self) -> Callable[..., None]:
return self.k.perform
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
self.evaluate()
def evaluate(self) -> Any:
self.ret = self.k()
return self.ret
class Continuation:
def __init__(self, handler: Handler) -> None:
self.handler: Handler = handler
self.results: list[Any] = []
self.effects: list[Effect] = []
def __str__(self) -> str:
return "Continuation(results={!r})".format(self.results)
def perform(self, *args) -> None:
effect = Effect(args)
self.effects.append(effect)
def dup(self):
k = Continuation(self.handler)
k.results = self.results[:]
k.effects = self.effects[:]
return k
def __call__(self, *args) -> Any:
k = self.dup()
k.results.extend(args)
try:
effect = k.effects.pop(0)
except IndexError as e:
value = Value(args)
return k.handler(value)
pat = (effect, k)
ret = k.handler(pat)
if ret is Eff.UNMATCHED:
if callable(effect.expr[0]):
f, args = effect.expr[0], effect.expr[1:]
f(*args)
ret = k.__call__()
return ret
# https://www.eff-lang.org/try/
def eff_printing() -> None:
Eff.perform(print, "Hello, world!\n")
def should_ignore(*args):
return print("should_ignore passed:", *args)
def handler1(pat):
match pat:
case Effect([f, msg]), k if f == print:
print("I see you tried to print " + msg + ". Not so fast!\n")
case _:
return Eff.UNMATCHED
with Eff.handle(handler1) as perform:
perform(print, "A")
perform(should_ignore, "X")
perform(print, "B")
perform(print, "C")
perform(print, "D")
def handler2(pat):
match pat:
case Effect([f, msg]), k if f == print:
print("I see you tried to print " + msg + ". Okay, you may.\n")
k()
case _:
return Eff.UNMATCHED
with Eff.handle(handler2) as perform:
perform(print, "A")
perform(should_ignore, "X")
perform(print, "B")
perform(print, "C")
perform(print, "D")
def collect(effect):
match effect:
case Value(x):
return (x, "")
case Effect([f, msg]), k if f == print:
result, msgs = k()
return (result, msg + msgs)
case _:
return Eff.UNMATCHED
eff = Eff.handle(collect)
with eff as perform:
perform(print, "A")
perform(should_ignore, "X")
perform(print, "B")
perform(print, "C")
perform(print, "D")
print("collected = {}".format(eff.ret))
def eff_non_determinism() -> None:
def choose_true(pat):
match pat:
case Effect(["let_if_decide", f]), k:
return k(f(True))
case Effect(["evaluate", f]), k:
return f(*k.results)
eff = Eff.handle(choose_true)
with eff as perform:
perform("let_if_decide", lambda b: 10 if b else 20)
perform("let_if_decide", lambda b: 0 if b else 5)
perform("evaluate", lambda x, y: x - y)
print("choose_true: x - y = {}".format(eff.ret))
def choose_max(pat):
match pat:
case Effect(["let_if_decide", f]), k:
return max(k(f(True)), k(f(False)))
case Effect(["evaluate", f]), k:
return f(*k.results)
eff = Eff.handle(choose_max)
with eff as perform:
perform("let_if_decide", lambda b: 10 if b else 20)
perform("let_if_decide", lambda b: 0 if b else 5)
perform("evaluate", lambda x, y: x - y)
print("choose_max: x - y = {}".format(eff.ret))
def choose_all(pat):
match pat:
case Value(x):
return [x[0]]
case Effect(["let_if_decide", f]), k:
return k(f(True)) + k(f(False))
case Effect(["evaluate", f]), k:
# NOTE: require applying Continuation to pass the result to Value(x) handler
return k(f(*k.results))
eff = Eff.handle(choose_all)
with eff as perform:
perform("let_if_decide", lambda b: 10 if b else 20)
perform("let_if_decide", lambda b: 0 if b else 5)
perform("evaluate", lambda x, y: x - y)
print("choose_all: x - y = {}".format(eff.ret))
# FIXME: finding pythagorean triples
# FIXME: eight queens problem
if __name__ == "__main__":
eff_printing()
eff_non_determinism()
$ python3 --version
Python 3.10.12
$ python3 fake_eff.py
Hello, world!
I see you tried to print A. Not so fast!
I see you tried to print A. Okay, you may.
should_ignore passed: X
I see you tried to print B. Okay, you may.
I see you tried to print C. Okay, you may.
I see you tried to print D. Okay, you may.
should_ignore passed: X
collected = ((), 'ABCD')
choose_true: x - y = 10
choose_max: x - y = 20
choose_all: x - y = [10, 5, 20, 15]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment