Skip to content

Instantly share code, notes, and snippets.

Created September 7, 2017 16:00
Show Gist options
  • Save anonymous/ab184e1f6f06510bb487cba8bd7ebc78 to your computer and use it in GitHub Desktop.
Save anonymous/ab184e1f6f06510bb487cba8bd7ebc78 to your computer and use it in GitHub Desktop.
@app.task(bind=True)
def celery_task_that_does_a_thing(self, first_argument, optional_argument='something'):
lock_key = "celery_task_that_does_a_thing_{}".format(first_argument)
if acquire_task_lock(lock_key, self.request.id):
# lock acquired
try:
return _do_the_thing(first_argument,optional_argument)
except Exception as e:
return {
'error': e
}
finally:
release_task_lock(lock_key)
else:
# task already locked
return {
'locked': True,
'resume': resume_task_code(lock_key)
}
def _do_the_thing(first_argument, optional_argument='something'):
return {
'result': "a thing"
}
@ottonomy
Copy link

ottonomy commented Sep 7, 2017

Maybe a decorator that takes a key generation function. There's probably bugs in this, but something like this:

def my_key_function(*args, **kwargs):
    return 'celery_task_that_does_a_thing_{}'.format(args[0])

def acquire_lock(key_function):
    def decorator(func):
        def wrapper(self, *args, **kwargs):
            lock_key = key_function(*args, **kwargs)
            if acquire_task_lock(lock_key, self.request.id):
                # lock acquired
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    return {
                        'error': e
                    }
                finally:
                    release_task_lock(lock_key)
        
            else:
                # task already locked
                return {
                    'locked': True,
                    'resume': resume_task_code(lock_key)
                }
        return wrapper
    return decorator


@app.task(bind=True)
@acquire_lock(key_function=my_key_function)
def celery_task_that_does_a_thing(self, first_argument, optional_argument='something')
    return {'result': 'a thing'}

@jfflbnntt
Copy link

I was thinking key_function could be replaced by something like key_name="celery_task_that_does_a_thing", as long as the key_function basically remains the same and just formats the function name together with the first argument.

@coffindragger
Copy link

someone suggested an abstract base class, it's still fairly boilerplatey but maybe the cleanest?

@app.task(bind=True)
def celery_task_that_does_a_thing(self, first_argument, optional_argument='something'):
    task = CeleryTaskThatDoesAThing(first_argument, optional_argument=optional_argument)
    return task()


class CeleryTaskThatDoesAThing(LockedTask):
    def get_task_key(self, first_argument, *args, **kwargs)
        return "{}_{}".format(self.__name__, first_argument)

    def run(self):
        return {
            'result': "a thing"
        }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment