Skip to content

Instantly share code, notes, and snippets.

@grudko
Created December 2, 2016 13:16
Show Gist options
  • Save grudko/4c1dcd2a2eb29bc3d98cd19bfd916cbd to your computer and use it in GitHub Desktop.
Save grudko/4c1dcd2a2eb29bc3d98cd19bfd916cbd to your computer and use it in GitHub Desktop.
Asyncio aiohttp example with PonyORM
#!/usr/bin/env python3
import asyncio
from asyncio_extras import threadpool
from aiohttp import web
from pony.orm import *
db = Database()
class User(db.Entity):
id = PrimaryKey(int, auto=True)
name = Required(str)
db.bind('sqlite', './test.sqlite', create_db=True)
db.generate_mapping(create_tables=True)
with db_session:
u1 = User(name='One')
u2 = User(name='Two')
u3 = User(name='Three')
async def sample_handler(request):
id = int(request.match_info.get('id', 0))
async with threadpool():
with db_session:
user = User.get(id=id)
name = user.name if user else 'Anonymous'
return web.Response(text="Hello, %s" % name)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
app = web.Application(loop=loop)
app.router.add_route('GET', '/', sample_handler)
app.router.add_route('GET', '/{id}', sample_handler)
web.run_app(app)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment