Skip to content

Instantly share code, notes, and snippets.

@bmbouter
Created October 18, 2018 20:25
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bmbouter/222b79bf453f8f9a77069e86e3b65ac8 to your computer and use it in GitHub Desktop.
Save bmbouter/222b79bf453f8f9a77069e86e3b65ac8 to your computer and use it in GitHub Desktop.
import asyncio
import logging
import os
import signal
from aiohttp import web
log = logging.getLogger(__name__)
class Handler:
def __init__(self):
self.ready_to_close = False
async def close_200_OK_via_signal(self, request):
log.warning('pid={pid}, client connected, now going to sleep'.format(pid=os.getpid()))
response = web.StreamResponse()
await response.prepare(request)
while not self.ready_to_close:
await response.write(b'0' * 1024)
await asyncio.sleep(0.2)
await response.write_eof()
return response
handler = Handler()
async def my_web_app(*args, **kwargs):
app = web.Application()
app.add_routes([web.get('/', handler.close_200_OK_via_signal)])
return app
def usr_signal(signum, frame):
handler.ready_to_close = True
signal.signal(signal.SIGUSR1, usr_signal)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment