Skip to content

Instantly share code, notes, and snippets.

@linnil1
Last active December 3, 2021 08:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save linnil1/864c384fb246919800abfee2a3803a4c to your computer and use it in GitHub Desktop.
Save linnil1/864c384fb246919800abfee2a3803a4c to your computer and use it in GitHub Desktop.
aiohttp + aiocron
# Example of running aiohttp and aiocron together
#
# Packages: aiohttp==3.8.1 aiocron==1.8
# Tested python version: python:3.10.0
# Issue: aircron not working once aiohttp start
# Solution Concept: Both share the same event loop
import logging
import asyncio
import aiocron
from aiohttp import web
logging.basicConfig(level=logging.DEBUG)
# Create a new event_loop
# because get_event_loop is deprecated in python3.10
loop = asyncio.new_event_loop()
# A cronjob that run every minutes
# Assign the created event loop to aiocron
@aiocron.crontab("* * * * *", loop=loop)
async def task_run_every_min():
logging.debug("A cronjob is running now")
await asyncio.sleep(3)
logging.debug("A cronjob is Done")
# A handler for '/' endpoint
async def say_hello(request):
logging.debug("Endpoint hello is triggered")
await asyncio.sleep(1)
return web.json_response({"status": "hello"})
if __name__ == "__main__":
app = web.Application()
app.add_routes([web.get('/', say_hello)])
# Though loop parameter is not documented yet, you can find it at
# https://github.com/aio-libs/aiohttp/blob/master/aiohttp/web.py
web.run_app(app, port=8081, loop=loop)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment