Created
February 17, 2016 18:12
-
-
Save demoray/e1e3175f27bff33df2fa to your computer and use it in GitHub Desktop.
A method for a luigi task to run a set of Luigi tasks and their prerequisites serially
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import luigi | |
import types | |
class MyTasks(luigi.Task): | |
@staticmethod | |
def run_tasks(tasks): | |
def _get(subtasks): | |
subtasks = luigi.task.flatten(subtasks) | |
return [x for x in subtasks if not x.complete()] | |
while True: | |
tasks = _get(tasks) | |
if not tasks: | |
break | |
required = _get(tasks[0].requires()) | |
if required: | |
tasks = required + tasks | |
continue | |
generator = tasks[0].run() | |
if isinstance(generator, types.GeneratorType): | |
for subtasks in generator: | |
required = _get(subtasks) | |
if required: | |
tasks = required + tasks | |
break |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment