Skip to content

Instantly share code, notes, and snippets.

@rohansb
Last active April 6, 2017 18:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rohansb/8fa125ec34d23a319e647993507d327c to your computer and use it in GitHub Desktop.
Save rohansb/8fa125ec34d23a319e647993507d327c to your computer and use it in GitHub Desktop.
"""
Helper module for spawning multiple asynchronous http requests using co-routines & futures,
powered by `asyncio` & `aiohttp` libraries, compatible only with `python 3.5 and above`
referring -
a task: as a individual unit of work, that awaits on response from a http request
a job: as a combined unit of all the tasks, that gathers all responses together
"""
import asyncio
import logging
import time
from abc import abstractmethod
import aiohttp
from werkzeug.exceptions import HTTPException
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
logger.setLevel(level='DEBUG')
class AsyncHttpRequestsBase(object):
"""
Class that spawns a batch of Asynchronous requests for a job, specifying a list of n/w bound urls
"""
def __init__(self, urls_list=None):
self.urls_list = urls_list
self.job_response = []
@property
def urls_count(self):
return len(self.urls_list)
@staticmethod
async def get_task(index_url, url):
"""
Async procedure that does an io bound call and awaits response
:param index_url: process id or task id for which the async call is being done
:param url: url for which the async call is being done
:return: response from a task request
"""
# note execution start time
start = time.time()
logger.info('started task: {} '.format(index_url))
aio_response = None
try:
# io bound GET over http that ensures a future response using await
aio_response = await aiohttp.request('GET', url)
# TODO: you could process the response here, as soon as it is available
# self.process_task_response(aio_response)
# json_response = await aio_response.json()
except HTTPException as e:
logger.debug('External server error: ' + str(e))
else:
# tidbit: `try-else` clause is executed when the try clause does not raise an exception
# anything that was awaited, close out in the end in else clause
aio_response.close()
logger.info('finished task: {} , process took: {} '.format(index_url, time.time() - start))
return aio_response
async def wait_all_get_tasks(self):
"""
Method that awaits on all the tasks to complete
sets self.job_response attribute upon completion
"""
# note when execution started for all the processes
start_all = time.time()
# create a list of sub-tasks where each sub-task using the URLS in itself is an async procedure
waited_tasks = [
asyncio.ensure_future(self.get_task(index_url=i+1, url=self.urls_list[i]))
for i in range(self.urls_count)
]
# wait on all async tasks that returns a list of async sub-tasks
# assign all the responses to self objects job_response
self.job_response, _ = await asyncio.wait(waited_tasks) # return a tuple, we need only the result
logger.info('finished all tasks!! Entire process took: {} '.format(time.time() - start_all))
def process_task_response(self, aio_response):
# TODO: implement a method (async) to process individual url response
pass
@abstractmethod
def process_job_response(self, get_raw=False):
"""
An abstract method (sync) that should be overridden by derived classes, expected to implement how all of the
responses would be processed
Recommended: to handle exceptions while processing the response in derived classes
:param get_raw: return the raw response without processing, in which case you process response outside the class
e.g.
response_obj = self.job_response
if get_raw:
:return response_obj
# let's fetch the Date from response header
processed_response = response_obj.headers.get('Date')
return processed_response
"""
raise NotImplementedError
def run_job(self):
if not self.urls_count:
logger.debug('Attribute self.urls_list is empty')
return 'URLs list is empty'
# create an event loop for the job
io_event_loop = asyncio.get_event_loop()
# run & wait until all tasks finish
io_event_loop.run_until_complete(future=self.wait_all_get_tasks())
# process the response
io_event_loop.run_until_complete(future=self.process_job_response())
# close the event loop
io_event_loop.close()
# return a processed response from method to be implemented by derived classes
return None
class AsyncRequestsDatatank(AsyncHttpRequestsBase):
async def process_job_response(self, get_raw=False):
if not self.job_response:
logger.debug('Job response empty, nothing to be processed')
return None
# upon successful execution, self.job_response returns a set of asyncio.Task objects (futures)
# https://docs.python.org/3/library/asyncio-task.html#asyncio.Task
try:
for task in self.job_response:
# task.result() returns object of type aiohttp.ClientResponse
# http://aiohttp.readthedocs.io/en/stable/client_reference.html#aiohttp.ClientResponse
task_result = task.result()
# logger.info('task_result : ' + str(task_result))
task_result_generator = await task_result.json(content_type='application/json')
logger.info('task_result_generator : ' + str(task_result_generator))
except aiohttp.client_exceptions.ClientConnectionError as e:
logger.debug('Failed to process the job response: ' + str(e))
if __name__ == '__main__':
url_list = [
'https://jsonplaceholder.typicode.com/todos/1',
'https://jsonplaceholder.typicode.com/todos/2',
'https://jsonplaceholder.typicode.com/todos/3',
'https://jsonplaceholder.typicode.com/todos/4',
'https://jsonplaceholder.typicode.com/todos/5',
'https://jsonplaceholder.typicode.com/todos/6',
'https://jsonplaceholder.typicode.com/todos/7',
'https://jsonplaceholder.typicode.com/todos/8',
]
job1 = AsyncRequestsDatatank(urls_list=url_list)
job1.run_job()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment