Skip to content

Instantly share code, notes, and snippets.

@graingert
Created August 8, 2021 09:27
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save graingert/00798d4d60ca080f1b1080e5d311b757 to your computer and use it in GitHub Desktop.
gimcrack.py
import curio
import collections
import threading
class Locals(threading.local):
synchrotron = None
tlocals = Locals()
import sys, os
class Synchrotron:
def __init__(self, delegate):
self._delegate = delegate.__await__()
self._companion = None
self._companion_trap = None
self._failure = None
self._trap = None
self._value = None
self._exc_info = (None, None, None)
def __await__(self):
return self
def __iter__(self):
return self
def send(self, *args):
if self._companion is not None:
try:
return self._companion.send(*args)
except StopIteration as e:
self._companion = None
if self._exc_info != (None, None, None):
_, err, tb = self._exc_info
raise err.with_traceback(tb)
if (old_trap := self._trap) is not None:
# self._value = v
self._trap = None
return old_trap
old_synchrotron, tlocals.synchrotron = tlocals.synchrotron, self
trap = None
try:
try:
trap = self._delegate.send(*args)
finally:
tlocals.synchrotron = old_synchrotron
finally:
exc_info = sys.exc_info()
if (companion_trap := self._companion_trap) is not None:
self._trap = trap
self._exc_info = exc_info
self._companion_trap = None
return companion_trap
if exc_info != (None, None, None):
_, err, tb = exc_info
raise err.with_traceback(tb)
return trap
def throw(self, *args):
if self._companion is not None:
try:
return self._companion.throw(*args)
except StopIteration as e:
self._companion = None
if self._exc_info != (None, None, None):
_, err, tb = self._exc_info
raise err.with_traceback(tb)
if (old_trap := self._trap) is not None:
# self._value = v
self._trap = None
return old_trap
old_synchrotron, tlocals.synchrotron = tlocals.synchrotron, self
try:
try:
trap = self._delegate.throw(*args)
finally:
tlocals.synchrotron = old_synchrotron
finally:
exc_info = sys.exc_info()
if (companion_trap := self._companion_trap) is not None:
self._trap = trap
self._exc_info = exc_info
self._companion_trap = None
return companion_trap
if exc_info != (None, None, None):
_, err, tb = exc_info
raise err.with_traceback(tb)
return trap
def __next__(self):
return self.send(None)
def close(self):
try:
if (companion := self._companion) is not None:
companion.close()
finally:
self._delegate.close()
async def enable_synchro(async_fn, /, *args, **kwargs):
return await Synchrotron(async_fn(*args, **kwargs))
def synchro(async_fn, /, *args, **kwargs):
synchrotron = tlocals.synchrotron
assert isinstance(synchrotron, Synchrotron)
assert synchrotron._companion is None
coro = async_fn(*args, **kwargs)
trap = coro.send(None)
synchrotron._companion = coro
synchrotron._companion_trap = trap
async def fn(event, msg):
await event.wait()
print(f"done {msg}")
async def companion(event):
for i in range(3):
await curio.sleep(0.3)
print(f"sleep {i}")
await event.set()
async def set_event(event1, event2):
synchro(companion, event1)
print("I sleep")
await curio.sleep(0)
synchro(companion, event2)
print("I return")
async def main():
event1 = curio.Event()
event2 = curio.Event()
async with curio.TaskGroup() as tg:
await tg.spawn(enable_synchro, fn, event1, "event1")
await tg.spawn(enable_synchro, fn, event2, "event2")
await tg.spawn(enable_synchro, set_event, event1, event2)
curio.run(main)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment