Skip to content

Instantly share code, notes, and snippets.

@messa

messa/aiohttp_example.py

Last active Jun 19, 2020
Embed
What would you like to do?
#!/usr/bin/env python3
from aiohttp import ClientSession
import asyncio
import logging
from pprint import pprint
from reprlib import repr as smart_repr
logger = logging.getLogger(__name__)
async def main():
logging.basicConfig(level=logging.DEBUG)
urls = '''
https://reqres.in/api/users/1
https://reqres.in/api/users/2
https://reqres.in/api/users/3
https://reqres.in/api/users/4
https://reqres.in/api/users/5
https://reqres.in/api/users/6
https://reqres.in/api/users/7
https://reqres.in/api/users/8
https://reqres.in/api/users/9
https://reqres.in/api/users/10
'''.split()
users = {}
tasks = [asyncio.create_task(retrieve_users(urls, users)) for i in range(4)]
await asyncio.wait(tasks)
pprint(users)
async def retrieve_users(urls, users):
async with ClientSession() as session:
while urls:
url = urls.pop()
try:
await retrieve_user(session, url, users)
except Exception as e:
logger.error('Failed to download user from %s: %r', url, e)
async def retrieve_user(session, url, users):
logger.debug('Downloading %s', url)
async with session.get(url) as response:
response.raise_for_status()
data = await response.json()
logger.debug('Downloaded %s -> %s', url, smart_repr(data))
users[data['data']['id']] = data['data']
if __name__ == '__main__':
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment