Skip to content

Instantly share code, notes, and snippets.

@alfredfrancis
Last active May 16, 2020 16:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alfredfrancis/bfa81f4707f8fb2691a9bc1b160c45a9 to your computer and use it in GitHub Desktop.
Save alfredfrancis/bfa81f4707f8fb2691a9bc1b160c45a9 to your computer and use it in GitHub Desktop.
Flask service to fetch latest celery task status
@app.route('/status/<task_id>')
def taskstatus(project_id,task_id):
task = long_process_task.AsyncResult(task_id)
if task.state == 'PENDING':
# job did not start yet
response = {
'state': task.state,
'current': 0,
'total': 1,
'status': 'Pending...'
}
elif task.state != 'FAILURE':
response = {
'state': task.state,
'current': task.info.get('current', 0),
'total': task.info.get('total', 1),
'status': task.info.get('status', '')
}
if 'result' in task.info:
response['result'] = task.info['result']
else:
# something went wrong in the background job
response = {
'state': task.state,
'current': 1,
'total': 1,
'status': str(task.info), # this is the exception raised
}
return jsonify(response)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment