Skip to content

Instantly share code, notes, and snippets.

@countrymarmot
Created August 6, 2014 06:28
Show Gist options
  • Save countrymarmot/1e066203d46957be09ee to your computer and use it in GitHub Desktop.
Save countrymarmot/1e066203d46957be09ee to your computer and use it in GitHub Desktop.
test celery
"""tasks.py
"""
from __future__ import absolute_import
from celery import Celery
from celery import exceptions
app = Celery('tasks', broker='sqla+sqlite:///msg.db', backend='db+sqlite:///result.db')
app.conf.update(CELERY_TASK_SERIALIZER='json', CELERY_ACCEPT_CONTENT=['json'], CELERY_RESULT_SERIALIZER='json')
@app.task
def add(x, y):
return x + y
@app.task(max_retires=3)
def test_retry(msg):
try:
print msg
raise Exception('error')
return 'OK.'
except Exception as e:
raise test_retry.retry(exc=e, countdown=2)
@app.task(max_retires=3)
def test_retry2(x):
if x > 10:
raise test_retry2.retry(countdown=2)
else:
return x
@app.task(soft_time_limit=2)
def test_timeout():
try:
import time
time.sleep(3)
return 'OK.'
except exceptions.SoftTimeLimitExceeded:
return 'Timout.'
if __name__ == '__main__':
app.worker_main()
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
"""test_celery.py
"""
from tasks import test_timeout, test_retry
t = test_retry.delay("hello")
print t.get()
t = test_timeout.delay()
print t.get()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment