Skip to content

Instantly share code, notes, and snippets.

@ktmud
Created May 20, 2022 21:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ktmud/9d15f76484bec2588e3d3c10d5b2c8fd to your computer and use it in GitHub Desktop.
Save ktmud/9d15f76484bec2588e3d3c10d5b2c8fd to your computer and use it in GitHub Desktop.
aiohttp server with file system based router
import pkgutil
from aiohttp import web
ALLOWED_METHODS = ["GET", "POST"]
def load_route_handlers(path, mod):
routes = []
for method in ALLOWED_METHODS:
if hasattr(mod, method):
handler = getattr(mod, method)
async def async_handler(request):
response = handler(request)
if asyncio.iscoroutine(response):
response = await response
if isinstance(response, str):
return web.Response(text=response)
return web.json_response(response)
logger.info("Loaded route: %s %s", method, path)
routes.append(getattr(web, method.lower())(path, async_handler))
return routes
def create_app():
app = web.Application()
routes = load_route_handlers("/", of.cli_service.routes)
for module_info in pkgutil.walk_packages(of.cli_service.routes.__path__):
mod = module_info.module_finder.find_module(module_info.name).load_module()
routes += load_route_handlers(f"/{module_info.name.replace('.', '/')}", mod)
app.add_routes(routes)
return app
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment