Last active
December 3, 2021 08:53
-
-
Save linnil1/864c384fb246919800abfee2a3803a4c to your computer and use it in GitHub Desktop.
aiohttp + aiocron
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Example of running aiohttp and aiocron together | |
# | |
# Packages: aiohttp==3.8.1 aiocron==1.8 | |
# Tested python version: python:3.10.0 | |
# Issue: aircron not working once aiohttp start | |
# Solution Concept: Both share the same event loop | |
import logging | |
import asyncio | |
import aiocron | |
from aiohttp import web | |
logging.basicConfig(level=logging.DEBUG) | |
# Create a new event_loop | |
# because get_event_loop is deprecated in python3.10 | |
loop = asyncio.new_event_loop() | |
# A cronjob that run every minutes | |
# Assign the created event loop to aiocron | |
@aiocron.crontab("* * * * *", loop=loop) | |
async def task_run_every_min(): | |
logging.debug("A cronjob is running now") | |
await asyncio.sleep(3) | |
logging.debug("A cronjob is Done") | |
# A handler for '/' endpoint | |
async def say_hello(request): | |
logging.debug("Endpoint hello is triggered") | |
await asyncio.sleep(1) | |
return web.json_response({"status": "hello"}) | |
if __name__ == "__main__": | |
app = web.Application() | |
app.add_routes([web.get('/', say_hello)]) | |
# Though loop parameter is not documented yet, you can find it at | |
# https://github.com/aio-libs/aiohttp/blob/master/aiohttp/web.py | |
web.run_app(app, port=8081, loop=loop) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment