Skip to content

Instantly share code, notes, and snippets.

@harrywang
Created May 12, 2015 20:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save harrywang/26a2f0706d5a5eddef5c to your computer and use it in GitHub Desktop.
Save harrywang/26a2f0706d5a5eddef5c to your computer and use it in GitHub Desktop.
task.py
import random
import time
from flask import current_app, render_template
from flask.ext.mail import Message
from . import mail, celery
@celery.task
def send_celery_email(msg):
"""Background task to send an email with Flask-Mail."""
with app.app_context():
mail.send(msg)
@celery.task(bind=True)
def long_task(self):
"""Background task that runs a long function with progress reports."""
verb = ['Starting up', 'Booting', 'Repairing', 'Loading', 'Checking']
adjective = ['master', 'radiant', 'silent', 'harmonic', 'fast']
noun = ['solar array', 'particle reshaper', 'cosmic ray', 'orbiter', 'bit']
message = ''
total = random.randint(10, 50)
for i in range(total):
if not message or random.random() < 0.25:
message = '{0} {1} {2}...'.format(random.choice(verb),
random.choice(adjective),
random.choice(noun))
self.update_state(state='PROGRESS',
meta={'current': i, 'total': total,
'status': message})
time.sleep(1)
return {'current': 100, 'total': 100, 'status': 'Task completed!',
'result': 42}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment