Skip to content

Instantly share code, notes, and snippets.

Last active June 11, 2021 15:52
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save sirrobot01/091a9bd4d8441dc511497e8b2ce367ae to your computer and use it in GitHub Desktop.
Celery Task runner
from celery.utils.log import get_logger
from django.conf import settings
logger = get_logger(__name__)
class TaskRunner:
def __init__(self, func_code, func, args=[], kwargs={}, task_kwargs={}):
self.func_code = func_code # This could be used to save to db
self.func = func
self.args = args
self.kwargs = kwargs
self.task_kwargs = task_kwargs
def _task_kwargs(self):
# retries
retries_policy = getattr(settings, "RETRIES_POLICY")
if not self.task_kwargs.get('retries_policy') and retries_policy:
self.task_kwargs['retries_policy'] = retries_policy
self.task_kwargs['retry'] = True
return self.task_kwargs
def run(self):
return self.func.s(*self.args, **self.kwargs).apply_async(**self._task_kwargs)
except self.func.OperationalError as exc:
logger.exception('Sending task raised: %r', exc)
Copy link

Add RETRIES_POLICY dictionary to your file

Running task, send_email using this util

task = TaskRunner('send_email', send_email, args=[''], task_kwargs={'countdown': 3}).run()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment