Skip to content

Instantly share code, notes, and snippets.

@dtheodor
Last active August 29, 2015 14:23
Show Gist options
  • Save dtheodor/28a09bbdde08364d7f29 to your computer and use it in GitHub Desktop.
Save dtheodor/28a09bbdde08364d7f29 to your computer and use it in GitHub Desktop.
Task callbacks, dependencies, cached results

Task callbacks, dependencies, cached results

Discussion on a celery API and implementation that solves

  1. Ability to dynamically add callbacks to already running tasks
  2. Tasks with dependencies
  3. Caching/invalidating task results
  4. Being able to stop/restart a task

Async result that allows adding callbacks

@app.task
def task1():
    time.sleep(10)
    return "task1"

@app.task
def task2():
    return "task2"

Chaining of tasks before their execution

This is already posisble with celery.

With chain

task2_after_1 = chain(task1.si(), task2.si())

task2_result = task2_after_1.delay()
task1_result = task2_result.parent

Or with callback

task1_result = task1.apply_async((), link=task2.si())
# task2_result ?

Chaining of tasks after their execution

task1_result = task1.delay()
# through the result
task2_result = task1_result.add_callback(task2.si())

The chaining must guarantee task execution, even if at the time the callback is being added the parent task is finishing or has already finished.

Implementation

The add_callback will do two things:

  1. add a new celery event receiver that will listen on task started/succeded/failed, and launch the next task
  2. check task backend to see if result is done (needs to be an atomic operation), if yes, will schedule the task immediately.

Task with dependencies

A task "depends" on another task if it always has to execute it before it executes itself, and then use its result.

Api suggestion:

@app.task
def task1():
    time.sleep(10)
    return "task1"

@app.task(depends_on=(task1,), bind=True)
def task2(self):
    task1_result = self.dependency_input["task1"]
    return "task2"

Invoke task2

task2.delay()
# how would invoking in-process work?
task2()

Implementation

Tasks with dependencies invoke all of their dependencies in their __call__ (or should this be apply_async?) method and add themselves as a callback.

class DependentTask(Task):

    def __call__(self, *args, **kwargs):
        return chord(*self.depends)(self.si())

Cached results and invalidation

Task results are persisted and carry a stale flag. As long as the result is not stale, a task.delay() will immediately fetch the already available result. Otherwise the task body will be executed.

@app.task
def add(left, right):
    time.sleep(10)
    return left + right

res = add.delay(2,3) #executed..
res.get() == 5 #waiting for 10 sec..

res = add.delay(2,3) #not executed
res.get() == 5 #almost instant

res = add.delay(2,4) #different signature, executed...
res.get() == 5 #waiting for 10 sec..

# check result status based on task signatures
add.si(2,3).available == True
add.si(2,3).stale == False
add.si(2,3).stale = True # mark it as stale!
add.si(2,3).available == True

res = add.apply_async((2,3), fetch_stale=True) # retrieve stale result immediately!
res = add.delay(2,3) #executed again...

There's also a global (or per task?) configuration setting on what happens with stale results:

RERUN_ON_STALE = False

Implementation

The concepts above are similar with dogpile.cache, which caches function results based on passed arguments. Maybe an integration with that project would make sense.

Cached results do not depend on any of the add_callback and depends features, but works seamlessly with them.

Stop/restart a task

A task may need to be restarted while it is running. This can mean:

  1. stop the task, and re-run
  2. schedule a task re-run immediately after the current one finishes
res = task1.delay()
# executing...
res.stop()
res.restart()

Implementation

We need a task backend with a field different that the traditional celery task state that can be used to set the stop and restart values. The task implementation should periodically check this field and act accordingly. There's no guarantee that the task will stop, it needs to honour this signal in the task body implementation.

Also the signals task-stop-sent, task-stop-received, task-restart-sent, task-restart-received should be sent.

Can we do the periodic checking with a 2nd thread that sets a variable visible in the task body?

class StoppableTask(Task):

    def update_stop_restart(self):
        # magic that receives whether this task should stop
        self.should_stop, self.should_restart = self.check_backend(self.id) 
        
    def handle_stop_restart(self):
        if self.should_restart:
            raise Retry()
        if self.should_stop:
            raise Abort()

    def __call__(self, *args, **kwargs):
        timer = Timer(5, self.update_stop_restart)
        # call body...

Or subscribe to the task-stop-sent event instead of constantly polling with the timer.

And a task that uses it

@app.task(bind=True)
def add(self, left, right):
    self.handle_stop_restart()
    time.sleep(10)
    self.handle_stop_restart()
    return left + right
@koenvo
Copy link

koenvo commented Jun 23, 2015

@task with dependencies

  • The dependencies should receive arguments, passed in the .delay of task2
  • When there is already a task running for the dependency, task2 must be attached to the running task instead of starting a new one.

@cached results and invalidation
res = add.delay(2,3) #not executed this one should also not be executed when the res.get() the line above was not called. Two add.delay calls should return the same AsyncResult until it is marked as stale / removed.

@dtheodor
Copy link
Author

Hmm yes I didn't think of the fact that dependencies need arguments to be passed as well.

Something like this?

@app.task(depends_on=(task1,), bind=True)
def task2(self, depends_kwargs):
    task1_result = self.dependency_input["task1"]
    return "task2"
class DependentTask(Task):

    def __call__(self, depends_kwargs, *args, **kwargs):        
        return chord(dependency.si(kwargs[d.name]) for d in self.depends)(self.si())

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment