Last active
November 27, 2018 15:38
-
-
Save kazkansouh/e0da42009e9879cf5a55bfe03bcd4089 to your computer and use it in GitHub Desktop.
Investigate an issue with aiohttp when using StreamReader as the body of a post resulting in poor performance.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/python3 | |
# | |
# Demonstrates an issue identified on aiohttp version 3.2.1, 3.3.0, | |
# where the upload between frontend and backend stalls (or goes very | |
# slowly by only sending chunks of 128 bytes due to tcp window being | |
# full). This only happens when both the sender and receiver are in | |
# the same event loop and lots of small chunks are sent triggering the | |
# performance hit. To mitigate this performance hit, the size of the | |
# chunks should be maximised. The issue is exasperated in versions of | |
# aiohttp <= 3.3.0 as the default iterator used tries to read the | |
# input stream line by line and send those lines as chunks. When | |
# sending binary data, this can be undesirable (either long or short | |
# lines are read). | |
# | |
# Issue does not appear on 3.3.1+ | |
# | |
# This has been mitigated by changing the default iterator for | |
# StreamReader to address long lines in commit: | |
# 5afccb1e2f43b390f7d20ac230eab29663d590dc | |
# | |
# https://github.com/aio-libs/aiohttp/commit/5afccb1e2f43b390f7d20ac230eab29663d590dc | |
# | |
# Alternative workaround entails directly specifying the iterator that | |
# is passed into the post request for the StreamReader. Hence, | |
# defining the chunk sizes. | |
from aiohttp import web, ClientSession, hdrs, payload | |
import asyncio | |
import argparse | |
def frontendpost(session, destination="localhost:8080"): | |
async def aux(request): | |
print("frontend post incoming") | |
body = request.content | |
# un-comment below line as workaround on versions of aiohttp < 3.3.1 | |
# body = payload.AsyncIterablePayload(request.content.iter_any()) | |
headers = {hdrs.CONTENT_TYPE: request.content_type} | |
async with session.post('http://{}/backend'.format(destination), | |
data=body, | |
headers=headers, | |
chunked=True) as resp: | |
print("backend return code: {}".format(resp.status)) | |
print("backend return text: {}".format(await resp.text())) | |
if resp.status == 200: | |
return web.Response(text="<html><body>received ok</body></html>\n", | |
content_type="text/html", | |
status=resp.status) | |
else: | |
return web.Response(text="<html><body>not processed</body></html>\n", | |
content_type="text/html", | |
status=resp.status) | |
return aux | |
async def backendpost(request): | |
print("\nbackend post incoming") | |
body = request.content | |
size = 0 | |
while True: | |
buf = await body.read(4096) | |
size += len(buf) | |
if len(buf) <= 0: | |
break | |
ct = request.content_type | |
print("backend has received {} of length {}".format(ct,size)) | |
return web.Response(text="<html><body>backend received ok</body></html>\n", content_type="text/html") | |
async def start_server(session, backend, port): | |
app = web.Application() | |
if backend is None: | |
print("starting server on localhost:{} with internal backend".format(port)) | |
app.add_routes([web.post('/frontend', frontendpost(session, "localhost:{}".format(port)))]) | |
app.add_routes([web.post('/backend', backendpost)]) | |
else: | |
print("starting server on localhost:{} with backend at {}".format(port, backend)) | |
app.add_routes([web.post('/frontend', frontendpost(session, backend))]) | |
runner = web.AppRunner(app) | |
await runner.setup() | |
site = web.TCPSite(runner, 'localhost', port) | |
await site.start() | |
return runner | |
async def client(session, port, contents): | |
headers = {hdrs.CONTENT_TYPE: "application/octet-stream"} | |
async with session.post('http://localhost:{}/frontend'.format(port), | |
data=contents, | |
headers=headers) as resp: | |
print("frontend return code: {}".format(resp.status)) | |
print("frontend return text: {}".format(await resp.text())) | |
def records(size,count): | |
result = bytearray(size*count) | |
for i in range(0,size*count,size): | |
result[i] = 0x0a | |
return result | |
async def main(backend, port, test): | |
async with ClientSession() as svrsess: | |
runner = await start_server(svrsess, backend, port) | |
if test: | |
async with ClientSession() as clisess: | |
# no problems, used as brush cleaner | |
# await client(clisess, port, b'hello') | |
# causes poor performance on versions when | |
# sender/receiver are in same event loop < 3.3.1 | |
await client(clisess, port, records(350,158915)) | |
# causes crash for too long max line on versions < 3.3.1 | |
# await client(clisess, port, bytes(56177568)) | |
await runner.cleanup() | |
else: | |
print("running server until ctrl-c") | |
while True: | |
await asyncio.sleep(60) | |
# default setup will load up the front and backend servers and then | |
# perform a transitive http post from client -> frontend -> backend. | |
# | |
# possible to use two instances of this script to separate out the | |
# front and back ends into two different processes, where the | |
# performance issue is mitigated. this is further mitigated by using | |
# the correct iterator. | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser("aiohttp issue investigator") | |
parser.add_argument("--backend", | |
type=str, | |
default=None, | |
help="location of backend to forward requests to, when omitted backend is internalised") | |
parser.add_argument("--port", | |
type=int, | |
default=8080, | |
help="port to run server on [default 8080]") | |
parser.add_argument("--no-test", | |
dest="test", | |
action="store_const", | |
default=True, | |
const=False, | |
help="do not run test, only run web sever until ctrl-c") | |
args = parser.parse_args() | |
print("starting main loop") | |
try: | |
asyncio.get_event_loop().run_until_complete(main(args.backend, | |
args.port, | |
args.test)) | |
except KeyboardInterrupt: | |
pass | |
print("main loop ended") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment