Last active
April 3, 2020 16:24
-
-
Save FilippoLeone850/aeee81006cd1cba9728758811c81512f to your computer and use it in GitHub Desktop.
Asynchronous HTTP requests via a coroutine that can parallel execution
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import asyncio | |
from aiohttp import ClientSession | |
class AsyncRequests: | |
def __init__(self, url, times=None, parallel_items=None, debug=False): | |
self.executed = 0 | |
self.request = None | |
self.url = url | |
self.method = 'get' | |
if times and parallel_items: | |
asyncio.run(self.producer(times, parallel_items)) | |
elif times: | |
asyncio.run(self.producer(times)) | |
else: | |
asyncio.run(self.producer()) | |
def __repr__(self): | |
return f"<URL: ({self.url}), Exec: ({self.executed})>" | |
def build_post_request(self, request): | |
""" | |
Example: | |
{ | |
'header' : {'Content-Type': 'JSON'}, | |
'body' : { | |
'data1' : 'yes' | |
} | |
} | |
""" | |
if ('header', 'body') not in request.keys(): | |
# TODO(Filippo): Replace with all()... | |
print('Provide header and body of post request.') | |
self.request = request.lower() | |
def set_method(self, method): | |
if ('get', 'post') in method.lower(): | |
self.method = method | |
return True | |
print('Provide either GET or POST.') | |
return False | |
async def do_get(self, session, sslverify=False): | |
async with session.get(url=self.url, ssl=sslverify) as response: | |
resp = await response.read() | |
# Print response here | |
return resp | |
async def do_post(self, session): | |
async with session.post(url=self.url, ssl=False, headers=self.request['header'], data=self.request['body']) as response: | |
resp = await response.json() | |
return resp | |
async def consumer(self, queue): | |
# Coroutine consumer | |
queueitem = await queue.get() | |
start = time.monotonic() | |
async with ClientSession() as client: | |
for _ in queueitem: | |
if self.method == 'get': | |
await self.do_get(client) | |
else: | |
await self.do_post(client) | |
self.executed += 1 | |
print("<Coroutine> Parallel item completed in {}".format((time.monotonic() - start) / 60)) | |
queue.task_done() | |
async def producer(self, times=1, parallel_items=1): | |
jobs = [] | |
# Creating async queue | |
queue = asyncio.Queue() | |
chunk = times // parallel_items | |
print(f"times={times}, parallel_items={parallel_items}, result={times//parallel_items}") | |
for _ in range(chunk): | |
queue.put_nowait([y for y in range(chunk)]) | |
print(queue.qsize()) | |
# The queue size is equal to the number of chunks to execute | |
for _ in range(queue.qsize()): | |
jobs.append(asyncio.create_task(self.consumer(queue=queue))) | |
await queue.join() | |
await asyncio.gather(*jobs, return_exceptions=True) | |
# Telling the workers to cancel as they already returned a result (on join phase) | |
for job in jobs: | |
job.cancel() | |
print("<Coroutine> Numbers of requests: {}".format(self.executed)) | |
if __name__ == '__main__': | |
start = time.monotonic() | |
req = AsyncRequests("https://google.com", times=100, parallel_items=10) | |
end = time.monotonic() | |
print("<Task> Ended after {} minutes.".format((end - start) / 60)) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment