Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
Fork join pattern using a dynamic fork step
class ForkJoinTask(object):
# Other stuff
def signature(cls, options):
"""Returns a Celery signature to describe the complete job-step sequence.
options (dict): The shared options for the job (auth, request, etc.)
A Celery signature.
return dynamic_fork_join_task(cls.setup_step, cls.process_step, cls.join_step, options)
def dynamic_fork_join_task(setup_step, process_step, join_step, bound_args):
"""Creates a parallel Celery fork/join task from provided functions that does not rely on a
statically defined number of process workers. This means that the number of workers can be
automatically and dynamically scaled to the size of the task.
setup_step (celery task): A "setup" step for the whole job
process_step (celery task): A "process" step that runs in parallel after setup
join_step (celery task): A "join" step to recombine parallel results
bound_args (dict): Any bound arguments that can be accessed by all steps
A new Celery job that performs a setup/process/join work pattern.
The returned job's steps will all be partially applied over bound_args.
setup_sig = setup_step.signature(**bound_args)
process_sig = process_step.signature(**bound_args)
join_sig = join_step.signature(**bound_args)
intermediate_task_sig = intermediate_task.signature(header=process_sig, callback=join_sig)
return chain(setup_sig, intermediate_task_sig)
def dynamic_chord(it, header, callback):
"""Create and execute a dynamic defined celery chord.
A chord consists of a header group and a callback, where the callback is a task that should
execute after all of the tasks in the header are complete. This function creates a task
that dynamically scales the header group to the number of items in the iterable, then passes
the combined result to the callback.
it: An iterable of work to perform
header: A celery task signature that will be scaled to consume each item from the iterable
of work
callback: A celery task signature that will consume the combined results of the completed
header tasks
Returns: A promise of a future calculation
# This converts a header signature into something that is cloneable
header = signature(header)
# Define the dynamic step as a celery chord. The workload determines the size of the header
dynamically_defined_chord_step = chord([header.clone([arg, ]) for arg in it], callback)
# Execute the dynamic step, which returns a promise of future calculation
return dynamically_defined_chord_step()
# We need to statically define this task so that it registers with celery
intermediate_task = taskHelpers.new_zwork_lifecycle_task(dynamic_chord, queue=ForkJoinTask.queue)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment