Skip to content

Instantly share code, notes, and snippets.

@kazkansouh
Last active November 27, 2018 15:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kazkansouh/e0da42009e9879cf5a55bfe03bcd4089 to your computer and use it in GitHub Desktop.
Save kazkansouh/e0da42009e9879cf5a55bfe03bcd4089 to your computer and use it in GitHub Desktop.
Investigate an issue with aiohttp when using StreamReader as the body of a post resulting in poor performance.
#! /usr/bin/python3
#
# Demonstrates an issue identified on aiohttp version 3.2.1, 3.3.0,
# where the upload between frontend and backend stalls (or goes very
# slowly by only sending chunks of 128 bytes due to tcp window being
# full). This only happens when both the sender and receiver are in
# the same event loop and lots of small chunks are sent triggering the
# performance hit. To mitigate this performance hit, the size of the
# chunks should be maximised. The issue is exasperated in versions of
# aiohttp <= 3.3.0 as the default iterator used tries to read the
# input stream line by line and send those lines as chunks. When
# sending binary data, this can be undesirable (either long or short
# lines are read).
#
# Issue does not appear on 3.3.1+
#
# This has been mitigated by changing the default iterator for
# StreamReader to address long lines in commit:
# 5afccb1e2f43b390f7d20ac230eab29663d590dc
#
# https://github.com/aio-libs/aiohttp/commit/5afccb1e2f43b390f7d20ac230eab29663d590dc
#
# Alternative workaround entails directly specifying the iterator that
# is passed into the post request for the StreamReader. Hence,
# defining the chunk sizes.
from aiohttp import web, ClientSession, hdrs, payload
import asyncio
import argparse
def frontendpost(session, destination="localhost:8080"):
async def aux(request):
print("frontend post incoming")
body = request.content
# un-comment below line as workaround on versions of aiohttp < 3.3.1
# body = payload.AsyncIterablePayload(request.content.iter_any())
headers = {hdrs.CONTENT_TYPE: request.content_type}
async with session.post('http://{}/backend'.format(destination),
data=body,
headers=headers,
chunked=True) as resp:
print("backend return code: {}".format(resp.status))
print("backend return text: {}".format(await resp.text()))
if resp.status == 200:
return web.Response(text="<html><body>received ok</body></html>\n",
content_type="text/html",
status=resp.status)
else:
return web.Response(text="<html><body>not processed</body></html>\n",
content_type="text/html",
status=resp.status)
return aux
async def backendpost(request):
print("\nbackend post incoming")
body = request.content
size = 0
while True:
buf = await body.read(4096)
size += len(buf)
if len(buf) <= 0:
break
ct = request.content_type
print("backend has received {} of length {}".format(ct,size))
return web.Response(text="<html><body>backend received ok</body></html>\n", content_type="text/html")
async def start_server(session, backend, port):
app = web.Application()
if backend is None:
print("starting server on localhost:{} with internal backend".format(port))
app.add_routes([web.post('/frontend', frontendpost(session, "localhost:{}".format(port)))])
app.add_routes([web.post('/backend', backendpost)])
else:
print("starting server on localhost:{} with backend at {}".format(port, backend))
app.add_routes([web.post('/frontend', frontendpost(session, backend))])
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, 'localhost', port)
await site.start()
return runner
async def client(session, port, contents):
headers = {hdrs.CONTENT_TYPE: "application/octet-stream"}
async with session.post('http://localhost:{}/frontend'.format(port),
data=contents,
headers=headers) as resp:
print("frontend return code: {}".format(resp.status))
print("frontend return text: {}".format(await resp.text()))
def records(size,count):
result = bytearray(size*count)
for i in range(0,size*count,size):
result[i] = 0x0a
return result
async def main(backend, port, test):
async with ClientSession() as svrsess:
runner = await start_server(svrsess, backend, port)
if test:
async with ClientSession() as clisess:
# no problems, used as brush cleaner
# await client(clisess, port, b'hello')
# causes poor performance on versions when
# sender/receiver are in same event loop < 3.3.1
await client(clisess, port, records(350,158915))
# causes crash for too long max line on versions < 3.3.1
# await client(clisess, port, bytes(56177568))
await runner.cleanup()
else:
print("running server until ctrl-c")
while True:
await asyncio.sleep(60)
# default setup will load up the front and backend servers and then
# perform a transitive http post from client -> frontend -> backend.
#
# possible to use two instances of this script to separate out the
# front and back ends into two different processes, where the
# performance issue is mitigated. this is further mitigated by using
# the correct iterator.
if __name__ == "__main__":
parser = argparse.ArgumentParser("aiohttp issue investigator")
parser.add_argument("--backend",
type=str,
default=None,
help="location of backend to forward requests to, when omitted backend is internalised")
parser.add_argument("--port",
type=int,
default=8080,
help="port to run server on [default 8080]")
parser.add_argument("--no-test",
dest="test",
action="store_const",
default=True,
const=False,
help="do not run test, only run web sever until ctrl-c")
args = parser.parse_args()
print("starting main loop")
try:
asyncio.get_event_loop().run_until_complete(main(args.backend,
args.port,
args.test))
except KeyboardInterrupt:
pass
print("main loop ended")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment