Skip to content

Instantly share code, notes, and snippets.

@amitripshtos
Created August 22, 2018 09:53
Show Gist options
  • Save amitripshtos/d4840406f7d6374ef8abcc2dca937583 to your computer and use it in GitHub Desktop.
Save amitripshtos/d4840406f7d6374ef8abcc2dca937583 to your computer and use it in GitHub Desktop.
Alternative to celery beat for people who got burned hard
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.job import Job
import json
import logging
from apscheduler.triggers.cron import CronTrigger
import time
from celery import Celery
from typing import List
# You must install APScheduler and celery in order to use that code
# In addition, you must override get_tasks, get_celery_application, update_periodic_tasks_from_database methods
logger = logging.getLogger(__name__)
class PeriodicTask:
def __init__(self, name: str, task: str, args: list, kwargs: dict, cron: CronTrigger) -> None:
self.name = name
self.task = task
self.args = args
self.kwargs = kwargs
self.cron = cron
def get_tasks() -> List[PeriodicTask]:
"""
Adjust this method to get list of periodic tasks from a file/database/whatever
:return:
"""
return [
PeriodicTask(name='test task', task='tasks.test_task', args=[], kwargs={}, cron=CronTrigger(minute=1))
]
def get_celery_application() -> Celery:
"""
Adjust this method to get the Celery object in your project.
:return:
"""
pass
def update_periodic_tasks_from_database(scheduler: BackgroundScheduler) -> None:
"""
Adjust this method to update changed tasks from file/database/whatever using scheduler.get_job to get the job, and then you can update it as you want.
:param scheduler:
:return:
"""
pass
class CeleryPoolExecutor(ThreadPoolExecutor):
"""
A threaded pool executor that will dispatch celery tasks instead running the tasks itself.
"""
def _do_submit_job(self, job: Job, run_times: int) -> None:
try:
logger.info('About to start task {} (Known next run time: {})'.format(job.name, job.next_run_time))
try:
get_celery_application().send_task('api.tasks.{task_name}'.format(task_name=job.name), args=job.args, kwargs=job.kwargs)
except Exception as e:
logger.warning('Could not send task through celery. Exception: {}'.format(repr(e)))
self._run_job_success(job.id, [])
except Exception as e:
logger.error('Failed to start task {} by scheduler. Exception was: {}'.format(job.name, repr(e)))
self._run_job_error(job.id, e, [])
def dummy_func(*args, **kwargs) -> None:
"""
A dummy task required for APScheduler as a mandatory field.
We are using APScheduler to only schedule celery tasks, and not as a job runner,
therefore we will provide this dummy function instead a real one.
:param args:
:param kwargs:
:return:
"""
pass
def run():
scheduler = BackgroundScheduler()
scheduler.add_jobstore(MemoryJobStore(), "default")
scheduler.add_executor(CeleryPoolExecutor(), 'default')
scheduler.remove_all_jobs()
# Populate scheduler tasks by the database
for periodic_task in get_tasks():
scheduler.add_job(
func=dummy_func,
trigger=CronTrigger(**json.loads(periodic_task.cron)),
args=json.loads(periodic_task.args) if periodic_task.args else None,
kwargs=json.loads(periodic_task.kwargs) if periodic_task.kwargs else None,
name=periodic_task.task,
coalesce=True,
id=periodic_task.name,
)
logger.info('Scheduler started')
try:
scheduler.start()
while True:
update_periodic_tasks_from_database(scheduler)
time.sleep(5)
except Exception as e:
logger.info('Shutting down scheduler')
scheduler.shutdown()
if __name__ == "__main__":
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment