Skip to content

Instantly share code, notes, and snippets.

@kissgyorgy
Created July 6, 2022 12:16
Show Gist options
  • Save kissgyorgy/d6e4b860b9e7de783465793f622979c3 to your computer and use it in GitHub Desktop.
Save kissgyorgy/d6e4b860b9e7de783465793f622979c3 to your computer and use it in GitHub Desktop.
Python: shield a coroutine from cancellation and wait until completely finished
import asyncio
from typing import Coroutine
async def force_complete(coro: Coroutine):
"""Shield the Coroutine from cancellation and block the calling coroutine until it's fully completed."""
task = asyncio.create_task(coro)
try:
await asyncio.shield(task)
except asyncio.CancelledError:
await task
raise
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment