Celery Task runner
from celery.utils.log import get_logger
from django.conf import settings
logger = get_logger(__name__)
class TaskRunner:
def __init__(self, func_code, func, args=[], kwargs={}, task_kwargs={}):
self.func_code = func_code # This could be used to save to db
self.func = func
self.args = args
self.kwargs = kwargs
self.task_kwargs = task_kwargs
def _task_kwargs(self):
# retries
retries_policy = getattr(settings, "RETRIES_POLICY")
if not self.task_kwargs.get('retries_policy') and retries_policy:
self.task_kwargs['retries_policy'] = retries_policy
self.task_kwargs['retry'] = True
return self.task_kwargs
def run(self):
return self.func.s(*self.args, **self.kwargs).apply_async(**self._task_kwargs)
except self.func.OperationalError as exc:
logger.exception('Sending task raised: %r', exc)
Add RETRIES_POLICY dictionary to your file

Running task, send_email using this util

task = TaskRunner('send_email', send_email, args=[''], task_kwargs={'countdown': 3}).run()

