Created
August 8, 2021 09:27
Star
You must be signed in to star a gist
gimcrack.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import curio | |
import collections | |
import threading | |
class Locals(threading.local): | |
synchrotron = None | |
tlocals = Locals() | |
import sys, os | |
class Synchrotron: | |
def __init__(self, delegate): | |
self._delegate = delegate.__await__() | |
self._companion = None | |
self._companion_trap = None | |
self._failure = None | |
self._trap = None | |
self._value = None | |
self._exc_info = (None, None, None) | |
def __await__(self): | |
return self | |
def __iter__(self): | |
return self | |
def send(self, *args): | |
if self._companion is not None: | |
try: | |
return self._companion.send(*args) | |
except StopIteration as e: | |
self._companion = None | |
if self._exc_info != (None, None, None): | |
_, err, tb = self._exc_info | |
raise err.with_traceback(tb) | |
if (old_trap := self._trap) is not None: | |
# self._value = v | |
self._trap = None | |
return old_trap | |
old_synchrotron, tlocals.synchrotron = tlocals.synchrotron, self | |
trap = None | |
try: | |
try: | |
trap = self._delegate.send(*args) | |
finally: | |
tlocals.synchrotron = old_synchrotron | |
finally: | |
exc_info = sys.exc_info() | |
if (companion_trap := self._companion_trap) is not None: | |
self._trap = trap | |
self._exc_info = exc_info | |
self._companion_trap = None | |
return companion_trap | |
if exc_info != (None, None, None): | |
_, err, tb = exc_info | |
raise err.with_traceback(tb) | |
return trap | |
def throw(self, *args): | |
if self._companion is not None: | |
try: | |
return self._companion.throw(*args) | |
except StopIteration as e: | |
self._companion = None | |
if self._exc_info != (None, None, None): | |
_, err, tb = self._exc_info | |
raise err.with_traceback(tb) | |
if (old_trap := self._trap) is not None: | |
# self._value = v | |
self._trap = None | |
return old_trap | |
old_synchrotron, tlocals.synchrotron = tlocals.synchrotron, self | |
try: | |
try: | |
trap = self._delegate.throw(*args) | |
finally: | |
tlocals.synchrotron = old_synchrotron | |
finally: | |
exc_info = sys.exc_info() | |
if (companion_trap := self._companion_trap) is not None: | |
self._trap = trap | |
self._exc_info = exc_info | |
self._companion_trap = None | |
return companion_trap | |
if exc_info != (None, None, None): | |
_, err, tb = exc_info | |
raise err.with_traceback(tb) | |
return trap | |
def __next__(self): | |
return self.send(None) | |
def close(self): | |
try: | |
if (companion := self._companion) is not None: | |
companion.close() | |
finally: | |
self._delegate.close() | |
async def enable_synchro(async_fn, /, *args, **kwargs): | |
return await Synchrotron(async_fn(*args, **kwargs)) | |
def synchro(async_fn, /, *args, **kwargs): | |
synchrotron = tlocals.synchrotron | |
assert isinstance(synchrotron, Synchrotron) | |
assert synchrotron._companion is None | |
coro = async_fn(*args, **kwargs) | |
trap = coro.send(None) | |
synchrotron._companion = coro | |
synchrotron._companion_trap = trap | |
async def fn(event, msg): | |
await event.wait() | |
print(f"done {msg}") | |
async def companion(event): | |
for i in range(3): | |
await curio.sleep(0.3) | |
print(f"sleep {i}") | |
await event.set() | |
async def set_event(event1, event2): | |
synchro(companion, event1) | |
print("I sleep") | |
await curio.sleep(0) | |
synchro(companion, event2) | |
print("I return") | |
async def main(): | |
event1 = curio.Event() | |
event2 = curio.Event() | |
async with curio.TaskGroup() as tg: | |
await tg.spawn(enable_synchro, fn, event1, "event1") | |
await tg.spawn(enable_synchro, fn, event2, "event2") | |
await tg.spawn(enable_synchro, set_event, event1, event2) | |
curio.run(main) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment