Skip to content

Instantly share code, notes, and snippets.

@coolreader18
Created July 14, 2020 20:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save coolreader18/676caf6c84d8abd4a278481bbeb1f14c to your computer and use it in GitHub Desktop.
Save coolreader18/676caf6c84d8abd4a278481bbeb1f14c to your computer and use it in GitHub Desktop.
import a
import functools
ready = object()
go = object()
def run(coro, *, payload=None, error=False):
send = coro.throw if error else coro.send
try:
cmd = send(payload)
except StopIteration:
return
if cmd is ready:
coro.send((
lambda *args: run(coro, payload=args),
lambda *args: run(coro, payload=args, error=True),
))
elif cmd is go:
pass
else:
raise RuntimeError(f"expected cmd to be ready or go, got {cmd}")
class JSFuture:
def __init__(self, prom):
self._prom = prom
def __await__(self):
done, error = yield ready
self._prom.then(done, error)
res, = yield go
return res
def wrap_prom_func(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
await JSFuture(func(*args, **kwargs))
return wrapper
async def timeout(delay):
prom = a.Promise()
a.set_timeout(prom.resolve, delay)
await JSFuture(prom)
async def main(delay):
await timeout(delay)
print("hi before")
await timeout(1000)
print("hello")
run(main(0))
run(main(500))
run(main(750))
a.exc.run_until_done()
import time
from collections import deque
import abc
from typing import (
Union,
Tuple,
TypeVar,
Iterable,
Any,
cast,
Coroutine,
Generic,
Optional,
Callable,
)
class ExecutorExit(BaseException):
def __init__(self, value):
self.value = value
class Executor:
def __init__(self):
self.todo = deque()
self.done = []
def addcoro(self, coro, obj):
try:
fut = coro.send(obj)
except StopIteration as e:
self.done.append(e.value)
return
if not isinstance(fut, Future):
raise TypeError(f"expected Future, got {type(fut)}")
self.todo.append((coro, fut))
def run(self, coro):
self.addcoro(coro, None)
self.run_until_done()
return self.done[-1]
T = TypeVar("T")
def many(self, coros: Iterable[Coroutine[Any, Any, T]]) -> Iterable[T]:
for coro in coros:
self.addcoro(coro, None)
self.run_until_done()
return tuple(self.done)
def run_until_done(self):
while self.todo:
coro, fut = self.todo.popleft()
ret = fut.poll(self)
if type(ret) is bool:
obj = None
completed = ret
elif type(ret) is tuple:
completed, obj = ret
else:
raise TypeError("Expected bool or (bool, obj)")
if completed:
self.addcoro(coro, obj)
else:
self.todo.append((coro, fut))
class Future(metaclass=abc.ABCMeta):
def poll(self, exc: Executor) -> Union[bool, Tuple[bool, object]]:
raise NotImplementedError()
def __await__(self):
yield self
class sleep(Future):
def __init__(self, delay: Union[int, float]):
self._timeout = time.time() + delay
def poll(self, exc):
return time.time() >= self._timeout
D = TypeVar("D", bound=Union[int, float])
async def main(delay: D) -> D:
await sleep(delay)
print("before", delay)
await sleep(2)
print("after", delay)
await sleep(1)
print("after again", delay)
return delay
exc = Executor()
# print(exc.many((main(1), main(2), main(3))))
T = TypeVar("T")
E = TypeVar("E")
class Promise(Generic[T, E]):
def __init__(self):
self._thens = []
self._errors = []
def resolve(self, val: T = None):
for f in self._thens:
f(val)
def reject(self, err: E):
for f in self._errors:
f(err)
def then(
self,
done: Optional[Callable[[T], None]] = None,
err: Optional[Callable[[E], None]] = None,
):
if done is not None:
self._thens.append(done)
if err is not None:
self._errors.append(err)
def set_timeout(func, timeout):
timeout = timeout / 1000
async def cb():
await sleep(timeout)
func()
exc.addcoro(cb(), None)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment