Skip to content

Instantly share code, notes, and snippets.

@ask
Created March 26, 2010 10:52
Show Gist options
  • Save ask/344764 to your computer and use it in GitHub Desktop.
Save ask/344764 to your computer and use it in GitHub Desktop.
from datetime import datetime, timedelta
from dateutil.parser import parse as iso8601
from celery.task.base import Task
from celery.utils import timedelta_seconds
class ExpireTask(Task):
"""Task with an expiration time.
If the task is expired, the task is not run after all.
The :meth:`apply_async` method supports an additional ``expires`` argument,
a :class:`datetime.timedelta` instance.
.. attribute:: default_expires
A :class:`datetime.timedelta` object to define the default expiration time.
The default is ``None`` (never expire.)
"""
default_expires = None
abstract = True
def __call__(self, *args, **kwargs):
expires = kwargs.pop("expires", None)
date_sent = kwargs.pop("date_sent", None)
if expires and date_sent:
if iso8601(date_sent) > timedelta(seconds=expires):
return
return self.run(*args, **kwargs)
@classmethod
def apply_async(self, args=None, kwargs=None, expires=None, **options):
kwargs = kwargs or {}
kwargs.setdefault("date_sent", datetime.now())
if expires is None:
expires = self.default_expires
kwargs["expires"] = expires
kwargs.setdefault("expires", self.default_expires)
return super(ExpireTask, self).apply_async(args, kwargs, **options)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment