Skip to content

Instantly share code, notes, and snippets.

@wowkin2
Last active April 23, 2018 14:01
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save wowkin2/73b6bcec5634ca1ceeb2fa0604cb630b to your computer and use it in GitHub Desktop.
Save wowkin2/73b6bcec5634ca1ceeb2fa0604cb630b to your computer and use it in GitHub Desktop.
Solution to benchmark blocking and non-blocking http-requests using aiohttp session, requests and requests in async executor.
import time
import asyncio
import concurrent.futures
import requests
URL_SESSION = 'http://localhost:8061/session'
URL_REQUESTS = 'http://localhost:8061/requests'
URL_REQUESTS_EXECUTOR = 'http://localhost:8061/requests_executor'
N = 30
WORKERS = 20
def task(url):
""" Task that do requests to our server and measure time """
start_avg = time.time()
requests.get(url)
end_avg = time.time()
return end_avg - start_avg
async def runner(url):
""" Runner that do N requests to some server endpoint and measure total time """
avg_list = []
start = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=WORKERS) as executor:
loop = asyncio.get_event_loop()
futures = [loop.run_in_executor(executor, task, url) for _ in range(N)]
for response in await asyncio.gather(*futures):
avg_list.append(response)
end = time.time()
print("Took %s seconds (%s average seconds/req.) to do %s requests" % (end - start, sum(avg_list)/N, N))
def main():
loop = asyncio.get_event_loop()
print('Aiohttp session:')
loop.run_until_complete(runner(URL_SESSION))
print('Requests in Executor:')
loop.run_until_complete(runner(URL_REQUESTS_EXECUTOR))
print('Simple Requests:')
loop.run_until_complete(runner(URL_REQUESTS))
if __name__ == '__main__':
main()
import datetime
from aiohttp import web, ClientSession
import asyncio
import concurrent.futures
import requests
DELAY = 5
URL_DELAY = 'https://httpbin.org/delay/%s' % DELAY
async def run_in_custom_executor(request, foo, *args):
""" Example how to run in AsyncIO executor
:param request: aiohttp request
:param foo: blocking function to run in executor
:param args: optional arguments for blocking function
:return: result from blocking function
"""
return await asyncio.ensure_future(
request.app.loop.run_in_executor(request.app['custom_executor'], foo, *args),
loop=request.app.loop)
async def time_view(request):
""" Just to show current time and see what's going on with server """
return web.Response(content_type="text/html", body="""<meta http-equiv="refresh" content="0.5" >
<h1 style="text-align: center">Current time: %s</h1>""" % datetime.datetime.utcnow())
async def init_http_client(app):
app['session'] = ClientSession(loop=app.loop)
async def release_http_client(app):
app['session'].close()
async def session_view(request):
""" View that use aiohttp session to make async requests """
response = await request.app['session'].get(URL_DELAY)
content = await response.json()
return web.Response(body="ok")
def _task():
response = requests.get(URL_DELAY)
content = response.json()
async def requests_view(request):
""" View that use requests library to run blocking requests"""
_task()
return web.Response(body="ok")
async def requests_executor_view(request):
""" View that use requests library to run blocking requests in executor,
to avoid blocking server and add possibility to handle other input requests"""
await run_in_custom_executor(request, _task)
return web.Response(body="ok")
def create_app(_=None):
app = web.Application()
app.router.add_get('/', time_view)
app.router.add_get('/session', session_view)
app.router.add_get('/requests', requests_view)
app.router.add_get('/requests_executor', requests_executor_view)
app.on_startup.append(init_http_client)
app.on_cleanup.append(release_http_client)
app['custom_executor'] = concurrent.futures.ThreadPoolExecutor(max_workers=5)
return app
if __name__ == '__main__':
web.run_app(create_app(), port=8061)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment