Skip to content

Instantly share code, notes, and snippets.

@pruthvi6767
Created November 14, 2019 18:15
Show Gist options
  • Save pruthvi6767/b0bad4894110042e3c768e9ee69fa7a0 to your computer and use it in GitHub Desktop.
Save pruthvi6767/b0bad4894110042e3c768e9ee69fa7a0 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import abc
import sys
import urllib3
from http.client import HTTPSConnection
import json
import asyncio
import time
import logging
import aiohttp.client as async_http_client
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
conn = urllib3.PoolManager(2)
logs = logging.getLogger()
logs.level = logging.DEBUG
logs.addHandler(logging.StreamHandler(stream=sys.stdout))
class MyError(Exception):
def __init__(self, msg):
self.msg = msg
super()
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
people = []
# connection pool
# Timeouts been discarded and wait for complete responses
conn = urllib3.PoolManager(20)
async def get_ships_and_planets_for_person(person):
print("calling ships/person")
if person["starships"]:
# person["starships"] = await asyncio.gather([starship["name"]
# for starship in list(map(get_starwars_data, person['starships']))])
person["starships"] = await get_urls(person["starships"])
if person["homeworld"]:
home = await get_starwars_data(person["homeworld"])
person["homeworld"] = home["name"]
return {"name": person["name"], "homeworld": person["homeworld"],
"starships": person["starships"]
}
async def get_starwars_data(url):
try:
async with async_http_client.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
return await response.json()
else:
raise MyError(msg="error-custom")
# await response
# if response.done() and response.result().status == 200:
# data=json.loads(response.data.decode('utf-8'))
# return data
except Exception as e:
print("Error reading from {} ".format(url))
logs.log(logging.INFO, e)
async def main(start_url):
while start_url:
# people_response = asyncio.create_task(get_starwars_data(start_url))
# with ThreadPoolExecutor
# people_futures = asyncio.run_coroutine_threadsafe(
# get_starwars_data(start_url))
# for
# for people in people_futures:
# if people.
# if people_response.done():
# people.extend(
# list(await asyncio.gather(map(get_ships_and_planets_for_person,
# people_response.result()['results']))))
# map_people_task = asyncio.create_task(map(get_ships_and_planets_for_person,
# people_response.result()['results']))
# await map_people_task
# print(people_response.result())
response = conn.request('GET', start_url, headers={
"Content-Type": "Application/JSON"}, timeout=60)
if response.status == 200:
people_response = json.loads(response.data.decode('utf-8'))
for person in people_response['results']:
tasks = []
person_task = asyncio.create_task(
get_ships_and_planets_for_person(person))
person_task.add_done_callback(extend_people_result)
tasks.append(person_task)
await asyncio.gather(*tasks)
if people_response['next'] is not None:
start_url = people_response['next']
else:
print(time.process_time())
print(people)
print(len(people))
sys.exit(0)
def extend_people_result(person):
people.extend(person)
async def get_urls(urls) -> list:
for i, uri in enumerate(urls):
urls[i] = await get_starwars_data(uri)
return urls
if __name__ == '__main__':
start_url = 'https://swapi.co/api/people/'
loop = asyncio.get_event_loop()
loop.run_until_complete(main(start_url))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment