Skip to content

Instantly share code, notes, and snippets.

@rcarmo
Forked from gdamjan/aiohttp-server.py
Created February 25, 2017 22:04
Show Gist options
  • Star 7 You must be signed in to star a gist
  • Fork 4 You must be signed in to fork a gist
  • Save rcarmo/3f0772f2cbe0612b699dcbb839edabeb to your computer and use it in GitHub Desktop.
Save rcarmo/3f0772f2cbe0612b699dcbb839edabeb to your computer and use it in GitHub Desktop.
Example: asyncio and aiohttp, handling longpoll, eventsource and websocket requests with a queue and background workers
from aiohttp import web
from threading import Thread
import asyncio
import time, uuid
loop = asyncio.get_event_loop()
def long_blocking_thing(sleep):
time.sleep(sleep)
return 42
async def worker(q):
await q.put(b'coroutine: hello')
for i in range(1,11):
await asyncio.sleep(1)
await q.put(b'coroutine: ping %d' % i)
await q.put(b'The end!')
await q.put(None)
def start_background_work(q=None):
if q is None:
q = asyncio.Queue()
def _thread():
# you can run non-asyncio code in a separate thread
# but use run_coroutine_threadsafe to fill in the queue
result = long_blocking_thing(5)
asyncio.run_coroutine_threadsafe(q.put(b'thread: hello %d' % result), loop)
Thread(target=_thread).start()
loop.create_task(worker(q))
return q
# stupid session implementation
session_store = {}
async def longpoll(request):
response = web.Response()
session_key = request.cookies.get('aio-session')
if session_key is None:
session_key = str(uuid.uuid4())
response.set_cookie('aio-session', session_key)
q = session_store.get(session_key)
if q is None:
# new session start some background work
q = start_background_work()
session_store[session_key] = q
msg = await q.get()
if msg is None:
# coroutines are done now
del session_store[session_key]
return web.Response(status=204)
response.body = msg + b'\n' # newline for curl :)
return response
async def sse_handler(request):
if request.headers.get('accept') != 'text/event-stream':
return web.Response(status=406)
stream = web.StreamResponse()
stream.headers['Content-Type'] = 'text/event-stream'
stream.headers['Cache-Control'] = 'no-cache'
stream.headers['Connection'] = 'keep-alive'
stream.enable_chunked_encoding()
await stream.prepare(request)
q = start_background_work()
while True:
msg = await q.get()
if msg is None:
break
stream.write(b"data: %s\r\n\r\n" % msg)
await stream.write_eof()
return stream
async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
q = start_background_work()
while True:
msg = await q.get()
# or done, pending = await asyncio.wait([q.get(), ws.receive()], return_when=asyncio.FIRST_COMPLETED)
if msg is None:
break
else:
ws.send_bytes(msg)
await ws.close()
return ws
app = web.Application()
app.router.add_route('GET', '/', longpoll)
app.router.add_route('GET', '/ws', websocket_handler)
app.router.add_route('GET', '/sse', sse_handler)
async def init(loop):
handler = app.make_handler()
srv = await loop.create_server(handler, '0.0.0.0', 8080)
print('serving on', srv.sockets[0].getsockname())
return srv
def main():
loop.run_until_complete(init(loop))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
main()
import aiohttp
import asyncio
async def main():
session = aiohttp.ClientSession()
while True:
r = await session.get("http://localhost:8080/")
if r.status == 204:
r.close()
break
print(await r.text(), end='')
session.close()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
import aiohttp
import asyncio
import sys
async def main():
headers = {'accept': 'text/event-stream'}
r = await aiohttp.get("http://localhost:8080/sse", headers=headers)
while True:
msg = await r.content.readline()
if not msg:
break
print(msg)
await r.release()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
import aiohttp
import asyncio
async def main():
ws = await aiohttp.ws_connect('http://localhost:8080/ws')
while True:
msg = await ws.receive()
if msg.tp == aiohttp.MsgType.binary:
print(msg.data)
elif msg.tp == aiohttp.MsgType.closed:
break
elif msg.tp == aiohttp.MsgType.error:
break
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment