Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Celery Task runner
from celery.utils.log import get_logger
from django.conf import settings
logger = get_logger(__name__)
class TaskRunner:
def __init__(self, func_code, func, args=[], kwargs={}, task_kwargs={}):
self.func_code = func_code # This could be used to save to db
self.func = func
self.args = args
self.kwargs = kwargs
self.task_kwargs = task_kwargs
@property
def _task_kwargs(self):
# retries
retries_policy = getattr(settings, "RETRIES_POLICY")
if not self.task_kwargs.get('retries_policy') and retries_policy:
self.task_kwargs['retries_policy'] = retries_policy
self.task_kwargs['retry'] = True
return self.task_kwargs
def run(self):
try:
return self.func.s(*self.args, **self.kwargs).apply_async(**self._task_kwargs)
except self.func.OperationalError as exc:
logger.exception('Sending task raised: %r', exc)
@sirrobot01

This comment has been minimized.

Copy link
Owner Author

@sirrobot01 sirrobot01 commented Jun 11, 2021

Add RETRIES_POLICY dictionary to your settings.py file

Running task, send_email using this util

task = TaskRunner('send_email', send_email, args=['myemail@email.com'], task_kwargs={'countdown': 3}).run()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment