Skip to content

Instantly share code, notes, and snippets.

@shashfrankenstien
Last active December 4, 2021 18:33
Show Gist options
  • Save shashfrankenstien/5cfa8821d74c24fb0a01b979d434e5bb to your computer and use it in GitHub Desktop.
Save shashfrankenstien/5cfa8821d74c24fb0a01b979d434e5bb to your computer and use it in GitHub Desktop.
Example usage of flask_production task scheduler and monitor - https://github.com/shashfrankenstien/Flask_Production
'''
Example ussage of https://github.com/shashfrankenstien/Flask_Production
'''
from flask import Flask
from flask_production import CherryFlask, TaskScheduler
from flask_production.plugins import TaskMonitor
import time
import random
app = Flask(__name__)
sched = TaskScheduler()
TaskMonitor(app, sched=sched, display_name="Web Test")
@app.route("/", methods=['GET'])
def main():
return 'Main page'
def wash_car():
"""
This is a dummy job that is scheduled to wash my car
Note: objects in the mirror are closer than they appear
"""
if random.choice([0,1]):
count = 50
while count > 0:
time.sleep(0.1)
print("washing..")
count -= 1
print("The car was washed")
else:
time.sleep(1)
raise Exception("car wash failed!")
def another_task():
print("another_task")
def yet_another_task_but_it_will_fail():
print("another_task")
1/0
if __name__ == "__main__":
sched.every(20).do(wash_car, do_parallel=True)
sched.every(30).do(lambda: wash_car(), do_parallel=True)
sched.every("day").at("8:00").do(another_task)
sched.every(20).do(yet_another_task_but_it_will_fail, do_parallel=True)
CherryFlask(app, sched).run()
# Task monitor webpage: http://localhost:8080/@taskmonitor
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment