Created
July 29, 2022 15:24
-
-
Save graingert/b75049f6d3a27caee02d31304fdf5490 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
__all__ = 'run', | |
import functools | |
import asyncio | |
from asyncio import coroutines | |
from asyncio import events | |
from asyncio import tasks | |
import contextlib | |
@contextlib.contextmanager | |
def runner(*, debug=None): | |
if events._get_running_loop() is not None: | |
raise RuntimeError( | |
"asyncio.run() cannot be called from a running event loop") | |
loop = events.new_event_loop() | |
try: | |
events.set_event_loop(loop) | |
if debug is not None: | |
loop.set_debug(debug) | |
yield loop | |
finally: | |
try: | |
_cancel_all_tasks(loop) | |
loop.run_until_complete(loop.shutdown_asyncgens()) | |
loop.run_until_complete(loop.shutdown_default_executor()) | |
finally: | |
events.set_event_loop(None) | |
loop.close() | |
def _cancel_all_tasks(loop): | |
to_cancel = tasks.all_tasks(loop) | |
if not to_cancel: | |
return | |
for task in to_cancel: | |
task.cancel() | |
loop.run_until_complete(tasks.gather(*to_cancel, return_exceptions=True)) | |
for task in to_cancel: | |
if task.cancelled(): | |
continue | |
if task.exception() is not None: | |
loop.call_exception_handler({ | |
'message': 'unhandled exception during asyncio.run() shutdown', | |
'exception': task.exception(), | |
'task': task, | |
}) | |
@contextlib.contextmanager | |
def runner_context(): | |
with runner() as loop: | |
send_future = loop.create_future() | |
recieve_future = loop.create_future() | |
async def main(): | |
nonlocal send_future, recieve_future | |
while True: | |
fn = await send_future | |
try: | |
v = await fn() | |
except BaseException as e: | |
recieve_future.set_exception(e) | |
else: | |
recieve_future.set_result(v) | |
del v | |
send_future = asyncio.get_running_loop().create_future() | |
recieve_future = asyncio.get_running_loop().create_future() | |
main_task = loop.create_task(main()) | |
async def _wrap_awaitable(v): | |
return await v | |
def run(async_fn, /, *args, **kwargs): | |
send_future.set_result(functools.partial(async_fn, *args, **kwargs)) | |
return loop.run_until_complete(_wrap_awaitable(recieve_future)) | |
yield run | |
import contextvars | |
contextvar = contextvars.ContextVar("contextvar", default=42) | |
async def demo(): | |
print(f"inner {contextvar.get()=}") | |
contextvar.set("demo1") | |
async def demo2(): | |
print(f"inner2 {contextvar.get()=}") | |
contextvar.set("demo2") | |
async def demo3(): | |
print(f"inner3 {contextvar.get()=}") | |
contextvar.set("demo3") | |
with runner_context() as run: | |
print(f"outer {contextvar.get()=}") | |
run(demo) | |
print(f"outer {contextvar.get()=}") | |
run(demo2) | |
print(f"outer {contextvar.get()=}") | |
run(demo3) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment