Skip to content

Instantly share code, notes, and snippets.

@f0t0n
Created August 22, 2016 19:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save f0t0n/f5bd3e41a85e70504ce3d249f5e8d14f to your computer and use it in GitHub Desktop.
Save f0t0n/f5bd3e41a85e70504ce3d249f5e8d14f to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""Example for aiohttp.web.Application.on_startup signal handler
"""
import asyncio
import aiohttp
from aiohttp.web import Application, run_app, Response
async def fake_redis_msg(request):
return Response(text='Received fake Redis message...')
async def fake_zmq_msg(request):
return Response(text='Received fake ZeroMQ message...')
async def get_fake_message(where):
async with aiohttp.ClientSession(loop=app.loop) as session:
res = await session.get('http://127.0.0.1:8080/{}'.format(where))
return await res.text()
async def quick_1(app):
for x in range(5):
print('quck_1')
async def quick_2(app):
for x in range(5):
print('quck_2')
async def listen_to_redis(app):
await asyncio.sleep(0.01)
while True:
print('Listening to Redis...')
print(await get_fake_message('redis'))
await asyncio.sleep(0.5)
async def listen_to_zeromq(app):
await asyncio.sleep(0.01)
while True:
print('Listening to ZeroMQ...')
print(await get_fake_message('zeromq'))
await asyncio.sleep(0.5)
async def run_all_long_running(app):
return await asyncio.gather(listen_to_redis(app),
listen_to_zeromq(app),
loop=app.loop)
async def init(loop):
app = Application(loop=loop)
app.router.add_get('/redis', fake_redis_msg)
app.router.add_get('/zeromq', fake_zmq_msg)
app.on_startup.append(quick_1)
app.on_startup.append(quick_2)
app.on_startup.append(run_all_long_running)
return app
loop = asyncio.get_event_loop()
app = loop.run_until_complete(init(loop))
run_app(app)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment