Skip to content

Instantly share code, notes, and snippets.

@demoray
Created February 17, 2016 18:12
Show Gist options
  • Save demoray/e1e3175f27bff33df2fa to your computer and use it in GitHub Desktop.
Save demoray/e1e3175f27bff33df2fa to your computer and use it in GitHub Desktop.
A method for a luigi task to run a set of Luigi tasks and their prerequisites serially
#!/usr/bin/env python
import luigi
import types
class MyTasks(luigi.Task):
@staticmethod
def run_tasks(tasks):
def _get(subtasks):
subtasks = luigi.task.flatten(subtasks)
return [x for x in subtasks if not x.complete()]
while True:
tasks = _get(tasks)
if not tasks:
break
required = _get(tasks[0].requires())
if required:
tasks = required + tasks
continue
generator = tasks[0].run()
if isinstance(generator, types.GeneratorType):
for subtasks in generator:
required = _get(subtasks)
if required:
tasks = required + tasks
break
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment