Skip to content

Instantly share code, notes, and snippets.

@jettify
Last active August 13, 2018 13:15
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save jettify/d4ac4787da22f5870c24 to your computer and use it in GitHub Desktop.
Save jettify/d4ac4787da22f5870c24 to your computer and use it in GitHub Desktop.
aiohttp dependency injection example
import asyncio
import injections
import aiopg.sa
from aiohttp import web
@injections.has
class SiteHandler:
# this is only place holder, actual connection
# will be injected later
postgres = injections.depends(aiopg.sa.Engine)
async def index(self, request):
async with self.postgres.acquire() as conn:
data = await conn.scalar('SELECT 1;')
print(data)
return web.Response(text='Site Handler')
async def init_postgres(conf, loop):
engine = await aiopg.sa.create_engine(
database=conf['database'],
user=conf['user'],
password=conf['password'],
host=conf['host'],
port=conf['port'],
minsize=conf['minsize'],
maxsize=conf['maxsize'],
loop=loop)
return engine
async def create_container(loop, conf):
inj = injections.Container()
pg = await init_postgres(conf)
inj['postgres'] = pg
return inj
async def init(loop):
conf = {'host': '127.0.0.1',
'port': 8080,
'postgres': {
'database': 'admindemo_blog',
'user': 'admindemo_user',
'password': 'admindemo_user',
'host': '127.0.0.1',
'port': 5432,
'minsize': 1,
'maxsize': 5}}
app = web.Application(loop=loop)
# create container with dependencies
inj = await create_container(loop, conf['postgres'])
handler = SiteHandler()
# inject postgres into site handler
inj.inject(handler)
add_route = app.router.add_route
add_route('GET', '/', handler.index)
host, port = conf['host'], conf['port']
return app, host, port
def main():
loop = asyncio.get_event_loop()
app, host, port = loop.run_until_complete(init(loop))
web.run_app(app, host=host, port=port)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment