Skip to content

Instantly share code, notes, and snippets.

@graingert
Created January 28, 2022 16:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save graingert/80ab56e3ec2d9631ca9dca4b0a0528d0 to your computer and use it in GitHub Desktop.
Save graingert/80ab56e3ec2d9631ca9dca4b0a0528d0 to your computer and use it in GitHub Desktop.
from __future__ import annotations
import sys
import asyncio
import concurrent.futures
import contextlib
class _Client:
def __init__(self, loop):
self._loop = loop
def run(self, fn, /, *args, **kwargs):
task = None
loop = self._loop
async def wrap():
nonlocal task
task = asyncio.current_task()
return await fn(*args, **kwargs)
err = None
fut = asyncio.run_coroutine_threadsafe(wrap(), loop)
try:
while True:
try:
return fut.result()
except KeyboardInterrupt as e:
err = None
loop.call_soon_threadsafe(task.cancel)
except asyncio.CancelledError:
if err is not None:
raise err
raise
@contextlib.contextmanager
def client():
with concurrent.futures.ThreadPoolExecutor(1) as pool:
loop = None
started = concurrent.futures.Future()
async def main():
event = asyncio.Event()
started.set_result((asyncio.get_running_loop(), event))
await event.wait()
finished = pool.submit(asyncio.run, main())
for fut in concurrent.futures.as_completed((started, finished)):
if fut is started:
loop, event = started.result()
try:
yield _Client(loop)
finally:
loop.call_soon_threadsafe(event.set)
if fut is finished:
assert finished.result() is None
assert finished.result() is None
def main():
async def example():
print("press Ctrl+C 10 times")
for i in range(1, 11):
try:
await asyncio.Event().wait()
except asyncio.CancelledError:
print(f"cancelled {i=}")
with client() as c:
c.run(example)
if __name__ == "__main__":
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment