Skip to content

Instantly share code, notes, and snippets.

@vulcan25
Last active December 27, 2019 15:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vulcan25/23cae415aafec35abad21f150015a7ef to your computer and use it in GitHub Desktop.
Save vulcan25/23cae415aafec35abad21f150015a7ef to your computer and use it in GitHub Desktop.
Flask: Example of periodically refreshing a page, pending rq job completion.
from flask import Flask, redirect, url_for, render_template_string
from time import sleep
from rq import Queue
from rq.job import Job
from redis import Redis
r = Redis(host='redisserver')
q = Queue(connection=r)
def slow_func(data):
sleep(5)
return 'Processed %s' % (data,)
app = Flask(__name__)
template_str='''<html>
<head>
{% if refresh %}
<meta http-equiv="refresh" content="5">
{% endif %}
</head>
<body>{{result}}</body>
</html>'''
def get_template(data, refresh=False):
return render_template_string(template_str, result=data, refresh=refresh)
@app.route('/result/<string:id>')
def result(id):
job = Job.fetch(id, connection=r)
status = job.get_status()
if status in ['queued', 'started', 'deferred', 'failed']:
return get_template(status, refresh=True)
elif status == 'finished':
result = job.result
# If this is a string, we can simply return it:
return get_template(result)
@app.route('/process/<string:data>')
def process(data):
job = q.enqueue(slow_func, data)
return redirect(url_for('result', id=job.id))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment