Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@graingert
Created July 29, 2022 15:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save graingert/b75049f6d3a27caee02d31304fdf5490 to your computer and use it in GitHub Desktop.
Save graingert/b75049f6d3a27caee02d31304fdf5490 to your computer and use it in GitHub Desktop.
__all__ = 'run',
import functools
import asyncio
from asyncio import coroutines
from asyncio import events
from asyncio import tasks
import contextlib
@contextlib.contextmanager
def runner(*, debug=None):
if events._get_running_loop() is not None:
raise RuntimeError(
"asyncio.run() cannot be called from a running event loop")
loop = events.new_event_loop()
try:
events.set_event_loop(loop)
if debug is not None:
loop.set_debug(debug)
yield loop
finally:
try:
_cancel_all_tasks(loop)
loop.run_until_complete(loop.shutdown_asyncgens())
loop.run_until_complete(loop.shutdown_default_executor())
finally:
events.set_event_loop(None)
loop.close()
def _cancel_all_tasks(loop):
to_cancel = tasks.all_tasks(loop)
if not to_cancel:
return
for task in to_cancel:
task.cancel()
loop.run_until_complete(tasks.gather(*to_cancel, return_exceptions=True))
for task in to_cancel:
if task.cancelled():
continue
if task.exception() is not None:
loop.call_exception_handler({
'message': 'unhandled exception during asyncio.run() shutdown',
'exception': task.exception(),
'task': task,
})
@contextlib.contextmanager
def runner_context():
with runner() as loop:
send_future = loop.create_future()
recieve_future = loop.create_future()
async def main():
nonlocal send_future, recieve_future
while True:
fn = await send_future
try:
v = await fn()
except BaseException as e:
recieve_future.set_exception(e)
else:
recieve_future.set_result(v)
del v
send_future = asyncio.get_running_loop().create_future()
recieve_future = asyncio.get_running_loop().create_future()
main_task = loop.create_task(main())
async def _wrap_awaitable(v):
return await v
def run(async_fn, /, *args, **kwargs):
send_future.set_result(functools.partial(async_fn, *args, **kwargs))
return loop.run_until_complete(_wrap_awaitable(recieve_future))
yield run
import contextvars
contextvar = contextvars.ContextVar("contextvar", default=42)
async def demo():
print(f"inner {contextvar.get()=}")
contextvar.set("demo1")
async def demo2():
print(f"inner2 {contextvar.get()=}")
contextvar.set("demo2")
async def demo3():
print(f"inner3 {contextvar.get()=}")
contextvar.set("demo3")
with runner_context() as run:
print(f"outer {contextvar.get()=}")
run(demo)
print(f"outer {contextvar.get()=}")
run(demo2)
print(f"outer {contextvar.get()=}")
run(demo3)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment