Skip to content

Instantly share code, notes, and snippets.

@revmischa
Last active November 19, 2021 01:31
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save revmischa/796ac8d99b90c1b3aa10f3e90ef67fd7 to your computer and use it in GitHub Desktop.
Save revmischa/796ac8d99b90c1b3aa10f3e90ef67fd7 to your computer and use it in GitHub Desktop.
Deferred jobs with sqlalchemy/flask/apscheduler
# sample usage:
class MyFlaskApp(Flask):
...
def start_heartbeat(self):
"""Schedule periodic heartbeat updates."""
if hasattr(self, 'heartbeat_sched'):
raise Exception("Heartbeat already started")
sched = self.get_scheduler()
self.heartbeat_sched = sched
sched.start()
sched.add_job(
func="jetbridge.app:scheduler_app_heartbeat",
trigger='interval',
seconds=self.config.get('HEARTBEAT_INTERVAL_SECONDS', 10),
id='heartbeat_{}'.format(self.env_host()),
args=dict(env_host=self.env_host()),
replace_existing=True,
)
def startup_job(self):
"""Insert a deferred job to log server startup success."""
sched = self.get_scheduler()
sched.add_deferred_job(
func="jetbridge.app:scheduler_app_startup",
id='startup_{}'.format(self.env_host()),
args=dict(env_host=self.env_host()),
replace_existing=True,
)
def get_scheduler(self, scheduler_type=None):
"""Get a new APScheduler instance configured for our application."""
self._init_scheduler_config()
sched = MyJobScheduler(scheduler=scheduler_type, app=self)
return sched
def _init_scheduler_config(self):
"""Load settings for Flask-APScheduler."""
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor
config = self.config
# some lazy-loaded dynamic defaults
dsn = config.get('SQLALCHEMY_DATABASE_URI')
self.config['SCHEDULER_JOBSTORES'] = {
'default': SQLAlchemyJobStore(url=dsn)
}
config['SCHEDULER_EXECUTORS'] = {
'default': ProcessPoolExecutor(config.get('SCHEDULER_WORKER_PROCS', 1))
}
class MyJobScheduler(APScheduler):
"""Add some methods to Flask-APScheduler."""
def start_paused(self, paused=True):
"""Run in paused mode."""
self.scheduler.start(paused=paused)
def add_deferred_job(self, **kwargs):
"""Insert a job to be run outside of Flask-land.
This is the main method for adding a job to be serialized and run on a "clock" worker instance.
https://devcenter.heroku.com/articles/clock-processes-python
It takes the same format of arguments as Flask-APScheduler's add_job, such as func, trigger, seconds/minutes/hours, id, args.
The job is inserted via a new paused scheduler.
Make sure to specify an ID that can be used to coalesce unique jobs to ensure it is only run once.
Be sure to specify `replace_existing=True` if this ID may be non-unique and you wish to replace an older scheduled job.
"""
if 'id' not in kwargs:
raise ValueError("Please specify an ID that uniquely identifies this job.")
self.start_paused(paused=True)
self.add_job(**kwargs)
# can I shutdown/delete here?
# probably unnecessary if using replace_existing=True
def add_job_safe(self, scheduler, **kwargs):
session = get_db().session
job = None
jid = kwargs['id']
try:
with session.begin_nested():
job = scheduler.get_job(jid) # explodes if not found! :/
if job:
raise Exception("job already exists")
job = scheduler.add_job(**kwargs)
except Exception as e:
log.error("Failed to add job {}, error: {}".format(jid, e))
finally:
session.commit()
return job
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment