Skip to content

Instantly share code, notes, and snippets.

@amitnabarro
Created December 12, 2017 10:05
Show Gist options
  • Save amitnabarro/a7d54610136669cdcfecf5f26180d74d to your computer and use it in GitHub Desktop.
Save amitnabarro/a7d54610136669cdcfecf5f26180d74d to your computer and use it in GitHub Desktop.
Asyncio Periodic
import asyncio
import aiohttp
import time
running = True
async def check_status_periodically():
async with aiohttp.ClientSession() as session:
while running:
async with session.head('https://www.google.com') as response:
print(response.status, time.strftime("%H:%M:%S", time.localtime(time.time())))
await asyncio.sleep(2)
try:
loop = asyncio.get_event_loop()
asyncio.ensure_future(check_status_periodically())
loop.run_forever()
except KeyboardInterrupt:
running = False
finally:
pending = asyncio.Task.all_tasks()
loop.run_until_complete(asyncio.gather(*pending))
loop.close()
print('\nDone')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment