Created
January 28, 2018 15:49
-
-
Save catern/be870e9e725572e9a128e2c97f324c80 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from typing import Callable, Any | |
class Callcc: | |
"""The awaitable class that callcc returns | |
The bulk of the work here is done by trampoline created by the | |
start() function. We just yield up the callback, and the | |
trampoline actually does the call. | |
""" | |
def __init__(self, func): | |
self.func = func | |
@staticmethod | |
def yield_up(value): | |
return (yield value) | |
def __await__(self): | |
return self.yield_up(self.func) | |
def callcc(func: Callable[[Callable[[Any], Any]], Any]): | |
"""The almighty call-with-current-continuation, now in coroutine form. | |
This function returns an awaitable object, so it must be called | |
like this: | |
retval = await callcc(some_func) | |
Then if some_func looks like this: | |
def some_func(f): | |
... | |
f(42) | |
... | |
Then your execution will resume with retval == 42. | |
Note that this only works when called from coroutines launched | |
with `start`, which is also in this module. | |
Warning: The coroutines provided by this function are SINGLE-USE! | |
You must not call them twice! | |
""" | |
return Callcc(func) | |
class LinearVariable: | |
def __init__(self): | |
"""A class which linearly holds a single value. | |
The value inside this class can be set exactly once, and gotten exactly once. | |
The goal of this class is to allow function A (which can't use | |
callcc) to call function B (which returns its value via | |
invoking a callback). | |
That scenario happens whenever normal code wants to call | |
coroutine code. Coroutines return values by invoking a | |
continuation callback. | |
If we pass LinearVariable.set as the return value callback for | |
a coroutine (specifically, as the return_value_cb argument to | |
the start function) then we can get the return value of the | |
coroutine from normal code. | |
This is also useful as a way to return a value from an | |
asynchronous generator. | |
""" | |
self.value = None | |
self.been_set = False | |
self.been_got = False | |
def set(self, value): | |
if self.been_set: | |
raise Exception("can't set LinearVariable twice") | |
self.value = value | |
self.been_set = True | |
def get(self): | |
if not self.been_set: | |
raise Exception("can't get value of unset LinearVariable") | |
if self.been_got: | |
raise Exception("can't get value of LinearVariable twice") | |
val = self.value | |
self.been_got = True | |
self.value = None | |
return val | |
def allow_one_call(f, msg): | |
"Wraps a function to allow only one call, throwing an exception on subsequence calls" | |
been_called = False | |
def wrapped(*args, **kwargs): | |
nonlocal been_called | |
if been_called: | |
raise Exception(msg) | |
been_called = True | |
return f(*args, **kwargs) | |
return wrapped | |
def start(coroutine, return_cb=None, exception_cb=None): | |
"""Starts a coroutine running. | |
return_cb: A callback which will receive the return value of | |
the coroutine. | |
If the coroutine makes a blocking call before yielding for the | |
first time, start() will block. However, if the coroutine is | |
written to not use blocking operations, or at least doesn't call | |
them before its first yield, start() will return immediately | |
(after reaching the first yield). | |
In either case, this function will return None. | |
When the coroutine finishes executing, its return value will be | |
passed to return_cb, if supplied. If return_cb is not | |
supplied, the coroutine's return value will be discarded. | |
""" | |
def trampoline(value): | |
try: | |
f = coroutine.send(value) | |
except StopIteration as e: | |
if return_cb: | |
return_cb(e.value) | |
return | |
except Exception as e: | |
if exception_cb: | |
exception_cb(e) | |
return | |
else: | |
raise | |
f(allow_one_call(trampoline, "Continuations produced by callcc must only be called once")) | |
trampoline(None) | |
# this, I guess, is something like what an event loop should look like | |
# we'll require all IO goes through the framework. | |
# we'll need to have a wrapper for pathlib.Path | |
# then we'll explicitly pass it in, all the way. | |
# then things will automatically be able to switch between async and not-async | |
# not that that's useful... | |
# but, they'll be able to run multiple in the same process, which is nice. | |
# yeah this is the right way to make it clean and nice. | |
# COROUTINES! | |
async def event_loop(): | |
# this is some kind of global mutable variable that things re-enter | |
waiters = [] | |
while True: | |
happened_events = await callcc(selector.register_callback_for(waiter.event for waiter in waiters)) | |
# then at the top-level, we do selector.block_and_callback() to feed events back in. | |
# in a loop, if we so choose. | |
for event in happening_events: | |
event.dispatch() | |
# if there are no more waiters in this event loop, we're done, get out | |
if len(waiters) == 0: | |
break | |
class LinearChannel: | |
def __init__(self): | |
self._cb = None | |
import collections | |
self._pending = collections.deque() | |
def send(self, value): | |
if self._cb is not None: | |
cb = self._cb | |
self._cb = None | |
cb(value) | |
else: | |
self._pending.append(value) | |
async def recv(self): | |
if self._cb is not None: | |
raise Exception("shared access to LinearChannel") | |
if len(self._pending) != 0: | |
return self._pending.popleft() | |
def register_cb(cb): | |
self._cb = cb | |
return (await callcc(register_cb)) | |
def pop_pending(self): | |
while len(self._pending) != 0: | |
yield self._pending.popleft() | |
class MonitorMultipleCoroutines: | |
def __init__(self): | |
self.chan = LinearChannel() | |
async def gimme_some_event(self): | |
return (await self.chan.recv()) | |
def gimme_pending_events(self): | |
yield from self.chan.pop_pending() | |
def add_and_start(self, coroutine_object, tag=None): | |
start(coroutine_object, | |
return_cb=lambda val: self.chan.send((tag, val, None)), | |
exception_cb=lambda e: self.chan.send((tag, None, e)) | |
) | |
class ToplevelMonitor: | |
def __init__(self): | |
self.monitor = MonitorMultipleCoroutines() | |
def check(self): | |
for tag, ret, exc in self.monitor.gimme_pending_events(): | |
if exc: | |
print("process", tag, "threw", exc) | |
else: | |
print("process", tag, "returned", ret) | |
def add_and_start(self, coroutine_object, tag=None): | |
self.monitor.add_and_start(coroutine_object, tag=tag) | |
async def reader(chan, return_on, tag="got val"): | |
while True: | |
val = await chan.recv() | |
print(tag, val) | |
if val == return_on: | |
print("returning", val) | |
return val | |
class Channel: | |
def __init__(self): | |
self.cbs = [] | |
def send(self, value): | |
cbs = self.cbs | |
self.cbs = [] | |
for cb in cbs: | |
cb(value) | |
def register_callback(self, cb): | |
self.cbs.append(cb) | |
async def recv(self): | |
return (await callcc(self.register_callback)) | |
def fd_reader(obj): | |
async def read(): | |
buf = os.read(obj.fileno(), 4096) | |
if len(buf) == 0: | |
raise Exception(f"got eof on {obj.fileno()}") | |
else: | |
return buf | |
return read | |
def fd_writer(obj): | |
async def write(data): | |
# print("about to write to", obj.fileno(), file=sys.stderr) | |
sent = os.write(obj.fileno(), data) | |
if sent != len(data): | |
raise Exception(f"partial write to {obj.fileno()}") | |
return write | |
import inspect | |
def store_generator_return_value(f): | |
"""Decorates a generator function to return an iterable object which stores the generator's return value | |
Works on both regular and async generators! | |
""" | |
if inspect.isasyncgenfunction(f): | |
def wrapped(*args, **kwargs): | |
return StoreAsyncGeneratorReturnValue(f(*args, **kwargs)) | |
return wrapped | |
elif inspect.isgeneratorfunction(f): | |
def wrapped(*args, **kwargs): | |
return StoreGeneratorReturnValue(f(*args, **kwargs)) | |
return wrapped | |
else: | |
raise TypeError("passed function is not a generator or async generator:", f) | |
class StoreGeneratorReturnValue: | |
def __init__(self, gen): | |
self.var = LinearVariable() | |
self.gen = gen | |
def get(self): | |
return self.var.get() | |
def __iter__(self): | |
return self | |
def __next__(self): | |
if self.var.been_set: | |
raise StopIteration() | |
try: | |
return self.gen.__next__() | |
except StopIteration as e: | |
self.var.set(e.value) | |
raise | |
class StoreAsyncGeneratorReturnValue: | |
def __init__(self, gen): | |
self.var = LinearVariable() | |
self.gen = gen | |
def get(self): | |
return self.var.get() | |
def __aiter__(self): | |
return self | |
async def __anext__(self): | |
if self.var.been_set: | |
raise StopAsyncIteration() | |
try: | |
return (await self.gen.__anext__()) | |
except StopAsyncIterationWithValue as e: | |
self.var.set(e.value) | |
raise StopAsyncIteration | |
except StopAsyncIteration: | |
self.var.set(None) | |
raise StopAsyncIteration | |
class StopAsyncIterationWithValue(Exception): | |
def __init__(self, value): | |
self.value = value | |
# What if I want a single generator to generate two streams? | |
# that's not the real issue. | |
# the real issue is, how do I iterate over two async streams? | |
# I want get whichever one returns a value first. | |
# and dispatch on that. | |
# dispatch is easy enough. | |
# but essentially I __anext__ them all, passing the appropriate dispatch continuations | |
# and then my function returns, having tail-called itself out of existence. | |
# I think I grasped something interesting | |
# Python generators use yield to make a "callback" into a for loop that is iterating over your generator. | |
# The old style of doing coroutines in Python "used up" the generator language feature, using "yield" instead as a callback into the event loop. | |
# But this was annoying, so they added a new coroutine interface, essentially the same as the generator interface, | |
# which offered a dedicated channel through which you can callback into the event loop. | |
# Then they added support for using the old generator interface at the same time as the new coroutine interface, | |
# so you could once again use yield to callback into for loops. | |
# The problem is that each of these kinds of special callbacks are language-level features in Python. | |
# what you really want is a generic way to make these callbacks. | |
# Essentially, what would it look like to generate multiple streams? | |
def example(f, g): | |
f(1) | |
g("foo") | |
f(2) | |
g("bar") | |
f(3) | |
f(4) | |
# And what behavior would you want? | |
# Basically, you'd want f to block until it requests another value from you. | |
# No, wait! | |
# If I want to be able to callcc... | |
# Essentially, I want functions I call to be able to send the value I pass in, to somewhere up stream, | |
# and then block or something? | |
# Ok, how would we implement generators using callcc? | |
# Essentially, it would look like this... | |
def example_generator(callback: Callable[[int, Any], None]): | |
for value in range(5): | |
callcc(lambda k: callback(value, k)) | |
# Then, in callback... | |
def example_user(): | |
def process(value, cont): | |
print(value) | |
cont() | |
example_generator(process) | |
# Basically, callcc mode is very strong. | |
# So... | |
# What am I using yield to implement, when I use it to implement callcc? | |
# I'm calling a callback, passing it some value and also my current continuation. | |
# When I use yield to implement callcc, the callback is simple: | |
# It just applies the value to my current continuation. | |
# can I turn example_user into a more for-loopy style? | |
# using some kind of adapter? | |
def example_for_user(): | |
class IterationWrapper: | |
def __init__(self, gen_cont): | |
self.gen_cont = gen_cont | |
def __iter__(self): | |
return self | |
def __next__(self): | |
var = LinearVariable() | |
def bind(my_cont): | |
def set(value, their_cont): | |
my_cont(value) | |
self.gen_cont = their_cont | |
self.gen_cont(set) | |
return callcc(bind) | |
pass | |
# okay. | |
# so I think I clearly should be using callbacks in the stderr log thing. | |
# if those callbacks are async, they can be transformed into a for-loop straightforwardly using callcc magic. | |
# callbacks... | |
# the only remaining issue with that is exceptions. | |
# callbacks are painful with exceptions, because what if the code in the middle binds a handler? | |
# but I think it's unimportant | |
# also, in this concrete use case, it's unimportant. | |
# wait a second. | |
# if I do it with callbacks, I lose out theoretically. | |
# I want a type where... | |
# i can detect the end explicitly | |
# oh hmm but i don't have dispatching overhead with callbacks! | |
# so the choice is | |
# request -> (errlog -> ()) -> result | |
# or | |
# request -> Stream errlog result | |
# where Stream is: | |
# () -> ((errlog, Stream errlog result) | result) | |
# the latter seems more specific. | |
# let's rewrite the latter... | |
# request -> ((errlog, Stream errlog result) | result) | |
# request -> () -> errlog, (() -> errlog, (() -> errlog, (() -> Stream errlog result | result) | result) | result) | |
# compare: | |
# (elem -> ()) -> () | |
# Stream elem = More (() -> elem) | End | |
# which desugars anyway to | |
# (elem, Stream elem) | |
# (elem, (elem, Stream elem)) | |
# can I map the state machine around to that? | |
# this is really sweet and all, but... | |
# what if I want to actually have a generator? | |
# oh! at that time, I can use the coroutine thing :) | |
def agen_return(value=None): | |
"""Return a value while inside an asynchronous generator. | |
It's not yet possible to return a value inside an asynchronous | |
generator using a return statement, so this should be used | |
instead. | |
We can't just directly raise StopAsyncIteration inside the | |
asynchronous generator, because that's prevented by the Python | |
runtime. (and translated into a RuntimeError) | |
""" | |
raise StopAsyncIterationWithValue(value) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment