Skip to content

Instantly share code, notes, and snippets.

@thehesiod
Created June 7, 2017 22:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save thehesiod/f562d7b998eda92d4806a144f16bcf2f to your computer and use it in GitHub Desktop.
Save thehesiod/f562d7b998eda92d4806a144f16bcf2f to your computer and use it in GitHub Desktop.
asyncio timeout testcase
import asyncio
import aiohttp.web
async def stream_handler(request):
# Without the Content-Type, most (all?) browsers will not render
# partially downloaded content. Note, the response type is
# StreamResponse not Response.
resp = aiohttp.web.StreamResponse(status=200,
reason='OK',
headers={'Content-Type': 'text/html'})
# The StreamResponse is a FSM. Enter it with a call to prepare.
await resp.prepare(request)
while True:
try:
# Technically, subprocess blocks, so this is a dumb call
# to put in an async example. But, it's a tiny block and
# still mocks instantaneous for this example.
resp.write(b"</strong><br>\n")
# Yield to the scheduler so other processes do stuff.
await resp.drain()
# This also yields to the scheduler, but your server
# probably won't do something like this.
await asyncio.sleep(1)
except Exception as e:
# So you can observe on disconnects and such.
print(repr(e))
raise
return resp
async def build_server():
loop = asyncio.get_event_loop()
app = aiohttp.web.Application()
app.router.add_route('GET', "/", stream_handler)
return await loop.create_server(app.make_handler(), 'localhost', 8080)
async def main():
asyncio.ensure_future(build_server())
async with aiohttp.ClientSession(read_timeout=(1, None)) as session:
response = await session.get('http://localhost:8080/')
await response.read()
if __name__ == '__main__':
_loop = asyncio.get_event_loop()
_loop.run_until_complete(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment