Skip to content

Instantly share code, notes, and snippets.

@melvinkcx
Created November 18, 2022 04:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save melvinkcx/0c1e2088ea3cb692098eca6c835818a2 to your computer and use it in GitHub Desktop.
Save melvinkcx/0c1e2088ea3cb692098eca6c835818a2 to your computer and use it in GitHub Desktop.
run_with_wait doesn't cancel but wait for all tasks to be completed when the main task is completed, unlike asyncio.run()
from asyncio import coroutines, events, tasks
def run_with_wait(main, *, debug=None):
"""
Difference from asyncio.run():
- asyncio.run() cancels all tasks when `main` is completed.
any tasks created using `asyncio.create_task()` will hence be cancelled.
- run_with_wait() waits for all tasks to be completed using
`run_until_complete()` and `asyncio.tasks.gather()`
"""
if events._get_running_loop() is not None:
raise RuntimeError(
"asyncio.run() cannot be called from a running event loop")
if not coroutines.iscoroutine(main):
raise ValueError("a coroutine was expected, got {!r}".format(main))
loop = events.new_event_loop()
try:
events.set_event_loop(loop)
if debug is not None:
loop.set_debug(debug)
return loop.run_until_complete(main)
finally:
try:
_wait_for_all_tasks(loop)
loop.run_until_complete(loop.shutdown_asyncgens())
loop.run_until_complete(loop.shutdown_default_executor())
finally:
events.set_event_loop(None)
loop.close()
def _wait_for_all_tasks(loop):
all_tasks = tasks.all_tasks(loop)
loop.run_until_complete(tasks.gather(*all_tasks, return_exceptions=True))
for task in all_tasks:
if task.exception() is not None:
loop.call_exception_handler({
'message': 'unhandled exception during asyncio.run() shutdown',
'exception': task.exception(),
'task': task,
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment