Skip to content

Instantly share code, notes, and snippets.

@mpcabd
Created October 11, 2012 19:37
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save mpcabd/3874979 to your computer and use it in GitHub Desktop.
Save mpcabd/3874979 to your computer and use it in GitHub Desktop.
# This work is licensed under the GNU Public License (GPL).
# To view a copy of this license, visit http://www.gnu.org/copyleft/gpl.html
# For more information visit this blog post http://mpcabd.igeex.biz/python-celery-asynchronous-task-decorator/
# Written by Abd Allah Diab (mpcabd)
# Email: mpcabd ^at^ gmail ^dot^ com
# Website: http://mpcabd.igeex.biz
from django.utils.decorators import available_attrs
from django.utils.functional import wraps
def asynchronous_task(check_if_active = False):
def wrapper(f):
try:
@wraps(f, assigned=available_attrs(f))
def _wrapper(*args, **kwargs):
try:
if check_if_active:
from celery.task.control import inspect
insp = inspect()
d = insp.active()
if not d:
raise Exception
from celery_tasks import execute
execute.delay(f.__module__, f.func_name, *args, **kwargs)
except Exception as e:
f(*args, **kwargs)
_wrapper.func = f
return _wrapper
except Exception:
return f
return wrapper
# This work is licensed under the GNU Public License (GPL).
# To view a copy of this license, visit http://www.gnu.org/copyleft/gpl.html
# For more information visit this blog post http://mpcabd.igeex.biz/python-celery-asynchronous-task-decorator/
# Written by Abd Allah Diab (mpcabd)
# Email: mpcabd ^at^ gmail ^dot^ com
# Website: http://mpcabd.igeex.biz
import os, sys
from celery.task import task
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
sys.path.insert(0, os.path.dirname(__file__))
##########################
# Uncomment the following code block to enable it to work with Django.
# Either put this file next to the settings.py file, or change the block code to import your settings module from the correct path.
##########################
#from django.core.management import setup_environ
#import settings
#setup_environ(settings)
@task(name='execute')
def execute(module_name, method_name, *args, **kwargs):
try:
m = __import__(module_name)
if m.__name__ != module_name:
modules = module_name.split('.')[1:]
for module in modules:
m = getattr(m, module)
f = getattr(m, method_name)
if hasattr(f, 'func'):
f = f.func
return f(*args, **kwargs)
except Exception, e:
raise e
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment