Skip to content

Instantly share code, notes, and snippets.

@IlianIliev
Created July 26, 2021 12:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save IlianIliev/9aba04a74a4faddf0749533205d3b001 to your computer and use it in GitHub Desktop.
Save IlianIliev/9aba04a74a4faddf0749533205d3b001 to your computer and use it in GitHub Desktop.
Playing with graceful shutdown
import asyncio
tasks = []
async def f():
print("start f")
# await asyncio.sleep(5)
import time; time.sleep(5)
print("end f")
async def executer():
tasks.append(asyncio.create_task(f()))
await asyncio.gather(*tasks)
async def main():
try:
await executer()
except:
await asyncio.gather(*tasks)
async def complete_tasks(loop):
try:
await asyncio.wait_for(asyncio.gather(*tasks, loop=loop), timeout=10, loop=loop)
except asyncio.TimeoutError:
print("Some tasks were not finished on time")
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
except:
loop.run_until_complete(complete_tasks(loop))
finally:
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment