Created
July 1, 2020 17:14
-
-
Save palchicz/f13ddef9c4c58f3b17ea7e2192aa9104 to your computer and use it in GitHub Desktop.
Fork join pattern using a dynamic fork step
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class ForkJoinTask(object): | |
# Other stuff | |
@classmethod | |
def signature(cls, options): | |
"""Returns a Celery signature to describe the complete job-step sequence. | |
Args: | |
options (dict): The shared options for the job (auth, request, etc.) | |
Returns: | |
A Celery signature. | |
""" | |
return dynamic_fork_join_task(cls.setup_step, cls.process_step, cls.join_step, options) | |
def dynamic_fork_join_task(setup_step, process_step, join_step, bound_args): | |
"""Creates a parallel Celery fork/join task from provided functions that does not rely on a | |
statically defined number of process workers. This means that the number of workers can be | |
automatically and dynamically scaled to the size of the task. | |
Args: | |
setup_step (celery task): A "setup" step for the whole job | |
process_step (celery task): A "process" step that runs in parallel after setup | |
join_step (celery task): A "join" step to recombine parallel results | |
bound_args (dict): Any bound arguments that can be accessed by all steps | |
Returns: | |
A new Celery job that performs a setup/process/join work pattern. | |
The returned job's steps will all be partially applied over bound_args. | |
""" | |
setup_sig = setup_step.signature(**bound_args) | |
process_sig = process_step.signature(**bound_args) | |
join_sig = join_step.signature(**bound_args) | |
intermediate_task_sig = intermediate_task.signature(header=process_sig, callback=join_sig) | |
return chain(setup_sig, intermediate_task_sig) | |
def dynamic_chord(it, header, callback): | |
"""Create and execute a dynamic defined celery chord. | |
A chord consists of a header group and a callback, where the callback is a task that should | |
execute after all of the tasks in the header are complete. This function creates a task | |
that dynamically scales the header group to the number of items in the iterable, then passes | |
the combined result to the callback. | |
Args: | |
it: An iterable of work to perform | |
header: A celery task signature that will be scaled to consume each item from the iterable | |
of work | |
callback: A celery task signature that will consume the combined results of the completed | |
header tasks | |
Returns: A promise of a future calculation | |
""" | |
# This converts a header signature into something that is cloneable | |
header = signature(header) | |
# Define the dynamic step as a celery chord. The workload determines the size of the header | |
dynamically_defined_chord_step = chord([header.clone([arg, ]) for arg in it], callback) | |
# Execute the dynamic step, which returns a promise of future calculation | |
return dynamically_defined_chord_step() | |
# We need to statically define this task so that it registers with celery | |
intermediate_task = taskHelpers.new_zwork_lifecycle_task(dynamic_chord, queue=ForkJoinTask.queue) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment