Created
August 18, 2015 08:09
-
-
Save yusuke0519/6ce6d133cf271b0a9e50 to your computer and use it in GitHub Desktop.
[Luigi]My Workflows
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import luigi | |
import luigi.scheduler | |
import luigi.worker | |
from components import * | |
class MyWorkflow(luigi.Task): | |
''' | |
An example luigi workflow task, which runs a whole workflow of other luigi | |
tasks when executed. | |
''' | |
# Here we define the whole workflow in the requires function | |
def requires(self): | |
task_a = TaskA() | |
task_b = TaskB(upstream_task=task_a) | |
task_c = TaskC(upstream_task=task_b) | |
return task_c | |
# Define a simple marker file to show that the workflow has completed | |
def output(self): | |
return luigi.LocalTarget(self.input().path + '.workflow_complete') | |
# Just write some text to the marker file | |
def run(self): | |
with self.output().open('w') as outfile: | |
outfile.write('Workflow complete!') | |
# If this file is executed as a script, let luigi take care of it: | |
if __name__ == '__main__': | |
luigi.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Dynamic Taskを必要とするタスクを含む場合に便利そう