Skip to content

Instantly share code, notes, and snippets.

@syedjafer
Created September 6, 2022 02:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save syedjafer/9dc46c5cf4248ff7e575643f8280e3fd to your computer and use it in GitHub Desktop.
Save syedjafer/9dc46c5cf4248ff7e575643f8280e3fd to your computer and use it in GitHub Desktop.
from flask import Flask,jsonify
from flask_sse import sse
import logging
from apscheduler.schedulers.background import BackgroundScheduler
from flask_cors import CORS
import datetime
from helper import get_data,get_schd_time
app = Flask(__name__)
CORS(app)
app.config["REDIS_URL"] = "redis://localhost:6379/0"
app.register_blueprint(sse, url_prefix='/events')
log = logging.getLogger('apscheduler.executors.default')
log.setLevel(logging.INFO)
fmt = logging.Formatter('%(levelname)s:%(name)s:%(message)s')
h = logging.StreamHandler()
h.setFormatter(fmt)
log.addHandler(h)
def server_side_event():
""" Function to publish server side event """
with app.app_context():
print(get_data())
sse.publish(get_data(), type='dataUpdate')
print("Event Scheduled at ",datetime.datetime.now())
sched = BackgroundScheduler(daemon=True)
sched.add_job(server_side_event,'interval',seconds=get_schd_time())
sched.start()
@app.route('/')
def index():
return jsonify(get_data())
if __name__ == '__main__':
app.run(debug=True,host='0.0.0.0',port=5000)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment