Skip to content

Instantly share code, notes, and snippets.

@codemation
Created December 17, 2020 13:42
Show Gist options
  • Save codemation/185754db70deffac652bb9a3c2b46ed0 to your computer and use it in GitHub Desktop.
Save codemation/185754db70deffac652bb9a3c2b46ed0 to your computer and use it in GitHub Desktop.
# Cron Pattern
import asyncio
from fastapi import FastAPI
server = FastAPI()
server.cron_jobs = asyncio.Queue()
def cron(interval: int):
def cron_setup(f):
async def cron_job():
await f()
await asyncio.sleep(interval)
await server.cron_jobs.put(cron_job)
asyncio.create_task(server.cron_jobs.put(cron_job))
return f
return cron_setup
async def cron_worker():
print(f"starting cron_worker")
while True:
try:
job = await server.cron_jobs.get()
result = await job()
except Exception as e:
if not isinstance(e, asyncio.CancelledError):
log.exception(f"error with cron worker")
continue
break
@cron(60)
async def important_recurring_task():
print(f"starting important_recurring_task")
await asyncio.sleep(10)
print(f"finished important_recurring_task")
@server.on_event('startup')
async def startup_tasks():
asyncio.create_task(cron_worker())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment