Skip to content

Instantly share code, notes, and snippets.

@tarekziade
Created December 2, 2022 20:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tarekziade/782069e1955794d4692882f91f420adf to your computer and use it in GitHub Desktop.
Save tarekziade/782069e1955794d4692882f91f420adf to your computer and use it in GitHub Desktop.
Python asyncio cancellable sleeps
# taken from https://stackoverflow.com/a/37211337
def _make_sleep():
async def sleep(delay, result=None, *, loop=None):
coro = asyncio.sleep(delay, result=result)
task = asyncio.ensure_future(coro, loop=loop)
sleep.tasks.add(task)
try:
return await task
except asyncio.CancelledError:
return result
finally:
sleep.tasks.remove(task)
sleep.tasks = set()
sleep.cancel_all = lambda: sum(task.cancel() for task in sleep.tasks)
return sleep
cancellable_sleep = _make_sleep()
# how to use =>
await cancellable_sleep(60)
# calling this will immediatly cancel all sleeps
cancellable_sleep.cancel_all()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment