-
-
Save coffindragger/ace343ae5fbc4e2326c7696702ed63f3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from mainsite.celeryapp import app | |
class SimpleTask(LockedTask): | |
task_key = "simple_task" | |
def run(self): | |
return "hello world" | |
simple_task = SimpleTask.as_task(app) | |
class DetailedTask(LockedTask): | |
def get_task_key(self, object_pk, *args, **kwargs): | |
return "detailed_task_{}".format(object_pk) | |
def run(self, object_pk, *args, **kwargs): | |
pass | |
detailed_task = DetailedTask.as_task(app, max_retries=3, default_retry_delay=60) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class LockedTask(object): | |
task_key = None | |
def __init__(self, task, *args, **kwargs): | |
self.task = task | |
self.task_key = self.get_task_key(*args, **kwargs) | |
self.args = args | |
self.kwargs = kwargs | |
def __call__(self): | |
if self.acquire_lock(): | |
# lock acquired | |
try: | |
return self.run(*self.args, **self.kwargs) | |
except Exception as e: | |
return { | |
'error': e | |
} | |
finally: | |
self.release_lock() | |
else: | |
# task already locked | |
return { | |
'locked': True, | |
'resume': self.task.request.id, | |
} | |
def get_task_key(self, *args, **kwargs): | |
if self.task_key: | |
return self.task_key | |
raise NotImplementedError() | |
def run(self, *args, **kwargs): | |
raise NotImplementedError() | |
@classmethod | |
def as_task(cls, celeryapp, **initkwargs): | |
if 'bind' not in initkwargs: | |
initkwargs['bind'] = True | |
def _task(self, *args, **kwargs): | |
task = cls(self, *args, **kwargs) | |
return task() | |
update_wrapper(_task, cls, updated=()) | |
return celeryapp.task(**initkwargs)(_task) | |
def acquire_lock(self, expiration=3*60): | |
"""Create a key based lock in the cache, store our task_id so additional calls can wait for us instead""" | |
return cache.add(self.task_key, self.task.request.id, expiration) | |
def release_lock(self): | |
"""release the task lock""" | |
return cache.delete(self.task_key) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment