Skip to content

Instantly share code, notes, and snippets.

@FilippoLeone850
Last active April 3, 2020 16:24
Show Gist options
  • Save FilippoLeone850/aeee81006cd1cba9728758811c81512f to your computer and use it in GitHub Desktop.
Save FilippoLeone850/aeee81006cd1cba9728758811c81512f to your computer and use it in GitHub Desktop.
Asynchronous HTTP requests via a coroutine that can parallel execution
import time
import asyncio
from aiohttp import ClientSession
class AsyncRequests:
def __init__(self, url, times=None, parallel_items=None, debug=False):
self.executed = 0
self.request = None
self.url = url
self.method = 'get'
if times and parallel_items:
asyncio.run(self.producer(times, parallel_items))
elif times:
asyncio.run(self.producer(times))
else:
asyncio.run(self.producer())
def __repr__(self):
return f"<URL: ({self.url}), Exec: ({self.executed})>"
def build_post_request(self, request):
"""
Example:
{
'header' : {'Content-Type': 'JSON'},
'body' : {
'data1' : 'yes'
}
}
"""
if ('header', 'body') not in request.keys():
# TODO(Filippo): Replace with all()...
print('Provide header and body of post request.')
self.request = request.lower()
def set_method(self, method):
if ('get', 'post') in method.lower():
self.method = method
return True
print('Provide either GET or POST.')
return False
async def do_get(self, session, sslverify=False):
async with session.get(url=self.url, ssl=sslverify) as response:
resp = await response.read()
# Print response here
return resp
async def do_post(self, session):
async with session.post(url=self.url, ssl=False, headers=self.request['header'], data=self.request['body']) as response:
resp = await response.json()
return resp
async def consumer(self, queue):
# Coroutine consumer
queueitem = await queue.get()
start = time.monotonic()
async with ClientSession() as client:
for _ in queueitem:
if self.method == 'get':
await self.do_get(client)
else:
await self.do_post(client)
self.executed += 1
print("<Coroutine> Parallel item completed in {}".format((time.monotonic() - start) / 60))
queue.task_done()
async def producer(self, times=1, parallel_items=1):
jobs = []
# Creating async queue
queue = asyncio.Queue()
chunk = times // parallel_items
print(f"times={times}, parallel_items={parallel_items}, result={times//parallel_items}")
for _ in range(chunk):
queue.put_nowait([y for y in range(chunk)])
print(queue.qsize())
# The queue size is equal to the number of chunks to execute
for _ in range(queue.qsize()):
jobs.append(asyncio.create_task(self.consumer(queue=queue)))
await queue.join()
await asyncio.gather(*jobs, return_exceptions=True)
# Telling the workers to cancel as they already returned a result (on join phase)
for job in jobs:
job.cancel()
print("<Coroutine> Numbers of requests: {}".format(self.executed))
if __name__ == '__main__':
start = time.monotonic()
req = AsyncRequests("https://google.com", times=100, parallel_items=10)
end = time.monotonic()
print("<Task> Ended after {} minutes.".format((end - start) / 60))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment