Skip to content

Instantly share code, notes, and snippets.

/aiohttp.py Secret

Created July 28, 2015 07:19
Show Gist options
  • Save anonymous/2b0432082e0171683beb to your computer and use it in GitHub Desktop.
Save anonymous/2b0432082e0171683beb to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import asyncio
import aiohttp
from aiohttp import web
class WebsocketEchoHandler:
@asyncio.coroutine
def __call__(self, request):
ws = web.WebSocketResponse()
ws.start(request)
print('Connection opened')
try:
while True:
msg = yield from ws.receive()
ws.send_str(msg.data + '/answer')
except:
pass
finally:
print('Connection closed')
return ws
if __name__ == "__main__":
app = aiohttp.web.Application()
app.router.add_route('GET', '/ws', WebsocketEchoHandler())
loop = asyncio.get_event_loop()
handler = app.make_handler()
f = loop.create_server(
handler,
'127.0.0.1',
8080,
)
srv = loop.run_until_complete(f)
print("Server started at {sock[0]}:{sock[1]}".format(
sock=srv.sockets[0].getsockname()
))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
loop.run_until_complete(handler.finish_connections(1.0))
srv.close()
loop.run_until_complete(srv.wait_closed())
loop.run_until_complete(app.finish())
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment