Discussion on a celery API and implementation that solves
- Ability to dynamically add callbacks to already running tasks
- Tasks with dependencies
- Caching/invalidating task results
- Being able to stop/restart a task
@app.task
def task1():
time.sleep(10)
return "task1"
@app.task
def task2():
return "task2"
This is already posisble with celery.
With chain
task2_after_1 = chain(task1.si(), task2.si())
task2_result = task2_after_1.delay()
task1_result = task2_result.parent
Or with callback
task1_result = task1.apply_async((), link=task2.si())
# task2_result ?
task1_result = task1.delay()
# through the result
task2_result = task1_result.add_callback(task2.si())
The chaining must guarantee task execution, even if at the time the callback is being added the parent task is finishing or has already finished.
The add_callback will do two things:
- add a new celery event receiver that will listen on task started/succeded/failed, and launch the next task
- check task backend to see if result is done (needs to be an atomic operation), if yes, will schedule the task immediately.
A task "depends" on another task if it always has to execute it before it executes itself, and then use its result.
Api suggestion:
@app.task
def task1():
time.sleep(10)
return "task1"
@app.task(depends_on=(task1,), bind=True)
def task2(self):
task1_result = self.dependency_input["task1"]
return "task2"
Invoke task2
task2.delay()
# how would invoking in-process work?
task2()
Tasks with dependencies invoke all of their dependencies in their __call__
(or should this be apply_async
?) method and add themselves as a callback.
class DependentTask(Task):
def __call__(self, *args, **kwargs):
return chord(*self.depends)(self.si())
Task results are persisted and carry a stale
flag. As long as the result is not stale, a task.delay()
will immediately fetch the already available result. Otherwise the task body will be executed.
@app.task
def add(left, right):
time.sleep(10)
return left + right
res = add.delay(2,3) #executed..
res.get() == 5 #waiting for 10 sec..
res = add.delay(2,3) #not executed
res.get() == 5 #almost instant
res = add.delay(2,4) #different signature, executed...
res.get() == 5 #waiting for 10 sec..
# check result status based on task signatures
add.si(2,3).available == True
add.si(2,3).stale == False
add.si(2,3).stale = True # mark it as stale!
add.si(2,3).available == True
res = add.apply_async((2,3), fetch_stale=True) # retrieve stale result immediately!
res = add.delay(2,3) #executed again...
There's also a global (or per task?) configuration setting on what happens with stale results:
RERUN_ON_STALE = False
The concepts above are similar with dogpile.cache
, which caches function results based on passed arguments. Maybe an integration with that project would make sense.
Cached results do not depend on any of the add_callback
and depends
features, but works seamlessly with them.
A task may need to be restarted while it is running. This can mean:
- stop the task, and re-run
- schedule a task re-run immediately after the current one finishes
res = task1.delay()
# executing...
res.stop()
res.restart()
We need a task backend with a field different that the traditional celery task state that can be used to set the stop
and restart
values. The task implementation should periodically check this field and act accordingly. There's no guarantee that the task will stop, it needs to honour this signal in the task body implementation.
Also the signals task-stop-sent
, task-stop-received
, task-restart-sent
, task-restart-received
should be sent.
Can we do the periodic checking with a 2nd thread that sets a variable visible in the task body?
class StoppableTask(Task):
def update_stop_restart(self):
# magic that receives whether this task should stop
self.should_stop, self.should_restart = self.check_backend(self.id)
def handle_stop_restart(self):
if self.should_restart:
raise Retry()
if self.should_stop:
raise Abort()
def __call__(self, *args, **kwargs):
timer = Timer(5, self.update_stop_restart)
# call body...
Or subscribe to the task-stop-sent
event instead of constantly polling with the timer.
And a task that uses it
@app.task(bind=True)
def add(self, left, right):
self.handle_stop_restart()
time.sleep(10)
self.handle_stop_restart()
return left + right
@task with dependencies
task2
must be attached to the running task instead of starting a new one.@cached results and invalidation
res = add.delay(2,3) #not executed
this one should also not be executed when theres.get()
the line above was not called. Twoadd.delay
calls should return the sameAsyncResult
until it is marked as stale / removed.